autoscale: true
@suzu_v VOYAGE GROUP 2016/04/22 at AWS Tokyo Office
- すずけん, https://github.com/suzuken, @suzu_v
- Gopherです / 今日はJavaの話をします
- http://fluct.jp で広告配信 / 解析基盤のソフトウェアエンジニアをしています
- 広告配信システムの文章解析基盤でLambdaがどう使われているのかを説明します
- API Gateway + Lambdaではなく、Kinesis Stream + Lambdaの事例です
発表中にも気軽に質問してください!
- 広告配信のためにページ内文章を見てその内容を分析・分類し、配信に活用したい
- ページはクロールして取得、なるべくクロールしてから早く分類したい
- 1日に取得・分析したいページは100万ページほど
開発からリリースまでは3ヶ月程度
- 運用に手間かけたくないのでなるべくマネージドサービスをつかう
- 分析・分類・文書検索あたりは今後色々な手法を試せるように作る
- 各コンポーネントはそれぞれ独立して動作し、1つが落ちても全体に影響がないようにする
- Webクローラ (EC2 / Go): URLを指定してコンテンツを取得するdaemon
- 本文抽出機 (Lambda / Java8): 本文であると推定される部分を抜き出す
- Lucene / Kuromojiをつかいたいので
- 分類器 (EC2 / Go): 本文やページから得られる情報を元に文章をカテゴリ分けなどを担当する
- ドキュメントストア (EC2 / Elasticsearch): クロールして分類済みのコンテンツを格納し、検索可能にする
- API (EC2 + ELB / Go): 分類結果を返すinternalなHTTP API
Kinesis Streamを重宝しています
- ピークで~100MB/sでクローラがコンテンツをfetch
- それを直接Kinesis StreamにPutRecordsで挿入
- クローラはGo製 (with aws-sdk-go)、書き込みのリトライやバッファリングもしている
- 冪等性の担保はElasticsearchで
- PutRecords / GetRecords が安定している
- リアルタイムにクロール結果を解析するためのデータのバッファとして重宝している
- Lambdaと連携することでストリーム処理用アプリケーションも簡単に書ける
- Kinesis Streamとの連携が簡単
- 検証にもぽちっと新しいLambda Function作ればいいので手軽
- Kinesis Streamのデータはshardにデータがあるので同じデータでのテストも手軽
- Testing in Production (Data)
- Kinesis Applicationを自前で書くとシャードのやりくりが面倒
- そのあたりをLambda側のwrapperがいい感じにしてくれる
- デプロイが楽
- ビルドして実行可能バイナリをs3におけばそれを利用できる
- daemon管理などを考えなくていい
Kinesisのレコード形式と対となるPOJOオブジェクトを作成
public class KinesisMessageModel implements Serializable{
public String id;
public String url;
public String body;
public String title;
public String description;
// ...
}
see: 例: ハンドラーの入出力に POJO を使用する (Java) - AWS Lambda
データを加工して次のKinesis Streamへ
public class Boiler {
// Kinesis Streamからのデータをうけとるハンドラ
public void recordHandler(KinesisEvent event) throws IOException {
PutRecordsRequest putRecordsRequest
= getPutRecordsRequest(this.kinesisOutputStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList
= new ArrayList<>();
// 1つのeventには複数のレコードが入っている batch sizeで設定可能。
for(KinesisEventRecord rec : event.getRecords()) {
KinesisMessageModel record = toClass(rec);
PutRecordsRequestEntry putRecordsRequestEntry
= new PutRecordsRequestEntry();
// レコードの加工(実際にはここで本文抽出をしています)
ByteBuffer data = ByteBuffer.wrap(new ObjectMapper().writeValueAsString(record));
putRecordsRequestEntry.setData(data);
putRecordsRequestEntry.setPartitionKey(record.getSomeKey());
putRecordsRequestEntryList.add(putRecordsRequestEntry);
}
// 次のKinesis StreamへのPutRecordsの組み立てている
putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult
= this.kinesis.putRecords(putRecordsRequest);
}
}
- さくっと書くならnode.jsのほうが楽
- Javaの場合はblueprintがない & Lambda Consoleからさくっと試すことはできない
- パッケージングはMavenでやっていて、Maven Shade Pluginでuber jarをつくっておいています。
- uber jar: 依存ライブラリなどを全部1つのjarにいれたjarのこと
- 形態素解析用の辞書もjarにいれています
Lambda 関数ハンドラー (Java) - AWS Lambda Apache Maven Shade Plugin – Introduction
- エラーハンドリング
- 1つでも変なレコードがくるとKinesis Stream側のレコードがexpireするまでLambdaがretryしつづける
- failさせると停止してしまうので、skipするように実装すること
aws lambda create-function --region ap-northeast-1
--function-name my-lambda-function
--code S3Bucket=mybucket,S3Key=path/to/my.jar
--role arn:aws:iam::999999999999:role/lambda_kinesis_rw
--runtime java8
--handler com.your.app.Handler::recordHandler
--description "my kinesis stream!"
--timeout 15 --memory-size 512
aws lambda create-event-source-mapping
--event-source-arn arn:aws:kinesis:ap-northeast-1:999999999999:stream/your-stream
--function-name my-lambda-function
--enable --batch-size 100 --starting-position TRIM_HORIZON
-
Pull Request -> merge -> build (on Travis CI) -> S3
-
Travis CIでuber jarをつくっています
-
あとは
update-function-code
で反映aws lambda update-function-code --function-name my-lambda-function --s3-bucket mybucket --s3-key path/to/my.jar
- Log4jをつかっています
- エラーログなどはCloudWatch Logsから見ることができる
- 手元では再現しない不具合などがある場合にはCloudWatch Logsから見ること
AWS Lambda の Amazon CloudWatch ログへのアクセス - AWS Lambda ロギング (Java) - AWS Lambda
- Lambda + Kinesis Streamで文章をリアルタイム分類することができるようになりました
- Lambda, さくっとつかえておすすめです
画像などはこちらで
https://speakerdeck.com/suzuken/how-to-use-aws-lambda-in-document-processing-pipeline