Instantly share code, notes, and snippets.

Embed
What would you like to do?

autoscale: true

How to use AWS Lambda in Document Processing Pipeline

@suzu_v VOYAGE GROUP 2016/04/22 at AWS Tokyo Office


私について


アジェンダ

  • 広告配信システムの文章解析基盤でLambdaがどう使われているのかを説明します
  • API Gateway + Lambdaではなく、Kinesis Stream + Lambdaの事例です

発表中にも気軽に質問してください!


用途・要件・背景

  • 広告配信のためにページ内文章を見てその内容を分析・分類し、配信に活用したい
  • ページはクロールして取得、なるべくクロールしてから早く分類したい
  • 1日に取得・分析したいページは100万ページほど

開発からリリースまでは3ヶ月程度


方針

  • 運用に手間かけたくないのでなるべくマネージドサービスをつかう
  • 分析・分類・文書検索あたりは今後色々な手法を試せるように作る
  • 各コンポーネントはそれぞれ独立して動作し、1つが落ちても全体に影響がないようにする

構成要素

  • Webクローラ (EC2 / Go): URLを指定してコンテンツを取得するdaemon
  • 本文抽出機 (Lambda / Java8): 本文であると推定される部分を抜き出す
    • Lucene / Kuromojiをつかいたいので
  • 分類器 (EC2 / Go): 本文やページから得られる情報を元に文章をカテゴリ分けなどを担当する
  • ドキュメントストア (EC2 / Elasticsearch): クロールして分類済みのコンテンツを格納し、検索可能にする
  • API (EC2 + ELB / Go): 分類結果を返すinternalなHTTP API


アーキテクチャ

Kinesis Streamを重宝しています

  • ピークで~100MB/sでクローラがコンテンツをfetch
  • それを直接Kinesis StreamにPutRecordsで挿入
  • クローラはGo製 (with aws-sdk-go)、書き込みのリトライやバッファリングもしている
  • 冪等性の担保はElasticsearchで

なぜKinesis Streamか

  • PutRecords / GetRecords が安定している
  • リアルタイムにクロール結果を解析するためのデータのバッファとして重宝している
  • Lambdaと連携することでストリーム処理用アプリケーションも簡単に書ける

なぜLambdaか

  • Kinesis Streamとの連携が簡単
  • 検証にもぽちっと新しいLambda Function作ればいいので手軽
  • Kinesis Streamのデータはshardにデータがあるので同じデータでのテストも手軽
    • Testing in Production (Data)

Lambdaの良い点

  • Kinesis Applicationを自前で書くとシャードのやりくりが面倒
    • そのあたりをLambda側のwrapperがいい感じにしてくれる
  • デプロイが楽
    • ビルドして実行可能バイナリをs3におけばそれを利用できる
    • daemon管理などを考えなくていい

Javaでの実装例


実装例 in Java

Kinesisのレコード形式と対となるPOJOオブジェクトを作成

public class KinesisMessageModel implements Serializable{
    public String id;
    public String url;
    public String body;
    public String title;
    public String description;
	// ...
}

see: 例: ハンドラーの入出力に POJO を使用する (Java) - AWS Lambda


データを加工して次のKinesis Streamへ

public class Boiler {
	// Kinesis Streamからのデータをうけとるハンドラ
    public void recordHandler(KinesisEvent event) throws IOException {
        PutRecordsRequest putRecordsRequest
			= getPutRecordsRequest(this.kinesisOutputStreamName);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList
			= new ArrayList<>();
		// 1つのeventには複数のレコードが入っている batch sizeで設定可能。
        for(KinesisEventRecord rec : event.getRecords()) {
            KinesisMessageModel record = toClass(rec);
            PutRecordsRequestEntry putRecordsRequestEntry
				= new PutRecordsRequestEntry();
			// レコードの加工(実際にはここで本文抽出をしています)
            ByteBuffer data = ByteBuffer.wrap(new ObjectMapper().writeValueAsString(record));
            putRecordsRequestEntry.setData(data);
            putRecordsRequestEntry.setPartitionKey(record.getSomeKey());
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
		}
		// 次のKinesis StreamへのPutRecordsの組み立てている
        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult
			= this.kinesis.putRecords(putRecordsRequest);
	}
}

Java実装の所感

  • さくっと書くならnode.jsのほうが楽
    • Javaの場合はblueprintがない & Lambda Consoleからさくっと試すことはできない
  • パッケージングはMavenでやっていて、Maven Shade Pluginでuber jarをつくっておいています。
    • uber jar: 依存ライブラリなどを全部1つのjarにいれたjarのこと
    • 形態素解析用の辞書もjarにいれています

Lambda 関数ハンドラー (Java) - AWS Lambda Apache Maven Shade Plugin – Introduction


[fit] 実装にあたって気をつけること

  • エラーハンドリング
    • 1つでも変なレコードがくるとKinesis Stream側のレコードがexpireするまでLambdaがretryしつづける
    • failさせると停止してしまうので、skipするように実装すること

Lambda作成: aws-cli

aws lambda create-function --region ap-northeast-1
--function-name my-lambda-function
--code S3Bucket=mybucket,S3Key=path/to/my.jar
--role arn:aws:iam::999999999999:role/lambda_kinesis_rw
--runtime java8
--handler com.your.app.Handler::recordHandler
--description "my kinesis stream!"
--timeout 15 --memory-size 512

aws lambda create-event-source-mapping
--event-source-arn arn:aws:kinesis:ap-northeast-1:999999999999:stream/your-stream
--function-name my-lambda-function
--enable --batch-size 100 --starting-position TRIM_HORIZON

デプロイ方法: aws-cli

  • Pull Request -> merge -> build (on Travis CI) -> S3

  • Travis CIでuber jarをつくっています

  • あとは update-function-code で反映

    aws lambda update-function-code --function-name my-lambda-function --s3-bucket mybucket --s3-key path/to/my.jar


Lambdaでのロギング

  • Log4jをつかっています
  • エラーログなどはCloudWatch Logsから見ることができる
  • 手元では再現しない不具合などがある場合にはCloudWatch Logsから見ること

AWS Lambda の Amazon CloudWatch ログへのアクセス - AWS Lambda ロギング (Java) - AWS Lambda


まとめ

  • Lambda + Kinesis Streamで文章をリアルタイム分類することができるようになりました
  • Lambda, さくっとつかえておすすめです
@suzuken

This comment has been minimized.

Show comment
Hide comment
Owner

suzuken commented Apr 22, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment