Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?

autoscale: true

How to use AWS Lambda in Document Processing Pipeline

@suzu_v VOYAGE GROUP 2016/04/22 at AWS Tokyo Office


私について


アジェンダ

  • 広告配信システムの文章解析基盤でLambdaがどう使われているのかを説明します
  • API Gateway + Lambdaではなく、Kinesis Stream + Lambdaの事例です

発表中にも気軽に質問してください!


用途・要件・背景

  • 広告配信のためにページ内文章を見てその内容を分析・分類し、配信に活用したい
  • ページはクロールして取得、なるべくクロールしてから早く分類したい
  • 1日に取得・分析したいページは100万ページほど

開発からリリースまでは3ヶ月程度


方針

  • 運用に手間かけたくないのでなるべくマネージドサービスをつかう
  • 分析・分類・文書検索あたりは今後色々な手法を試せるように作る
  • 各コンポーネントはそれぞれ独立して動作し、1つが落ちても全体に影響がないようにする

構成要素

  • Webクローラ (EC2 / Go): URLを指定してコンテンツを取得するdaemon
  • 本文抽出機 (Lambda / Java8): 本文であると推定される部分を抜き出す
    • Lucene / Kuromojiをつかいたいので
  • 分類器 (EC2 / Go): 本文やページから得られる情報を元に文章をカテゴリ分けなどを担当する
  • ドキュメントストア (EC2 / Elasticsearch): クロールして分類済みのコンテンツを格納し、検索可能にする
  • API (EC2 + ELB / Go): 分類結果を返すinternalなHTTP API


アーキテクチャ

Kinesis Streamを重宝しています

  • ピークで~100MB/sでクローラがコンテンツをfetch
  • それを直接Kinesis StreamにPutRecordsで挿入
  • クローラはGo製 (with aws-sdk-go)、書き込みのリトライやバッファリングもしている
  • 冪等性の担保はElasticsearchで

なぜKinesis Streamか

  • PutRecords / GetRecords が安定している
  • リアルタイムにクロール結果を解析するためのデータのバッファとして重宝している
  • Lambdaと連携することでストリーム処理用アプリケーションも簡単に書ける

なぜLambdaか

  • Kinesis Streamとの連携が簡単
  • 検証にもぽちっと新しいLambda Function作ればいいので手軽
  • Kinesis Streamのデータはshardにデータがあるので同じデータでのテストも手軽
    • Testing in Production (Data)

Lambdaの良い点

  • Kinesis Applicationを自前で書くとシャードのやりくりが面倒
    • そのあたりをLambda側のwrapperがいい感じにしてくれる
  • デプロイが楽
    • ビルドして実行可能バイナリをs3におけばそれを利用できる
    • daemon管理などを考えなくていい

Javaでの実装例


実装例 in Java

Kinesisのレコード形式と対となるPOJOオブジェクトを作成

public class KinesisMessageModel implements Serializable{
    public String id;
    public String url;
    public String body;
    public String title;
    public String description;
	// ...
}

see: 例: ハンドラーの入出力に POJO を使用する (Java) - AWS Lambda


データを加工して次のKinesis Streamへ

public class Boiler {
	// Kinesis Streamからのデータをうけとるハンドラ
    public void recordHandler(KinesisEvent event) throws IOException {
        PutRecordsRequest putRecordsRequest
			= getPutRecordsRequest(this.kinesisOutputStreamName);
        List<PutRecordsRequestEntry> putRecordsRequestEntryList
			= new ArrayList<>();
		// 1つのeventには複数のレコードが入っている batch sizeで設定可能。
        for(KinesisEventRecord rec : event.getRecords()) {
            KinesisMessageModel record = toClass(rec);
            PutRecordsRequestEntry putRecordsRequestEntry
				= new PutRecordsRequestEntry();
			// レコードの加工(実際にはここで本文抽出をしています)
            ByteBuffer data = ByteBuffer.wrap(new ObjectMapper().writeValueAsString(record));
            putRecordsRequestEntry.setData(data);
            putRecordsRequestEntry.setPartitionKey(record.getSomeKey());
            putRecordsRequestEntryList.add(putRecordsRequestEntry);
		}
		// 次のKinesis StreamへのPutRecordsの組み立てている
        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult
			= this.kinesis.putRecords(putRecordsRequest);
	}
}

Java実装の所感

  • さくっと書くならnode.jsのほうが楽
    • Javaの場合はblueprintがない & Lambda Consoleからさくっと試すことはできない
  • パッケージングはMavenでやっていて、Maven Shade Pluginでuber jarをつくっておいています。
    • uber jar: 依存ライブラリなどを全部1つのjarにいれたjarのこと
    • 形態素解析用の辞書もjarにいれています

Lambda 関数ハンドラー (Java) - AWS Lambda Apache Maven Shade Plugin – Introduction


[fit] 実装にあたって気をつけること

  • エラーハンドリング
    • 1つでも変なレコードがくるとKinesis Stream側のレコードがexpireするまでLambdaがretryしつづける
    • failさせると停止してしまうので、skipするように実装すること

Lambda作成: aws-cli

aws lambda create-function --region ap-northeast-1
--function-name my-lambda-function
--code S3Bucket=mybucket,S3Key=path/to/my.jar
--role arn:aws:iam::999999999999:role/lambda_kinesis_rw
--runtime java8
--handler com.your.app.Handler::recordHandler
--description "my kinesis stream!"
--timeout 15 --memory-size 512

aws lambda create-event-source-mapping
--event-source-arn arn:aws:kinesis:ap-northeast-1:999999999999:stream/your-stream
--function-name my-lambda-function
--enable --batch-size 100 --starting-position TRIM_HORIZON

デプロイ方法: aws-cli

  • Pull Request -> merge -> build (on Travis CI) -> S3

  • Travis CIでuber jarをつくっています

  • あとは update-function-code で反映

    aws lambda update-function-code --function-name my-lambda-function --s3-bucket mybucket --s3-key path/to/my.jar


Lambdaでのロギング

  • Log4jをつかっています
  • エラーログなどはCloudWatch Logsから見ることができる
  • 手元では再現しない不具合などがある場合にはCloudWatch Logsから見ること

AWS Lambda の Amazon CloudWatch ログへのアクセス - AWS Lambda ロギング (Java) - AWS Lambda


まとめ

  • Lambda + Kinesis Streamで文章をリアルタイム分類することができるようになりました
  • Lambda, さくっとつかえておすすめです
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment