Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package com.trlln.bbb.stream
import java.text.SimpleDateFormat
import java.util.Date
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider
import com.amazonaws.services.lambda.runtime.Context
import com.amazonaws.services.lambda.runtime.events.KinesisEvent
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord
import com.google.protobuf.InvalidProtocolBufferException
import com.trlln.bbb.proto.Proto.{MissionHistory, MissionHistoryStat}
import com.trlln.bbb.repository.Repository
import com.trlln.bbb.repository.aws.AwsConfig
import com.twitter.util.{Await, Future}
import scala.collection.JavaConversions._
import scala.collection.mutable
object MissionHistoryAnalyzer {
val dateFormat = new SimpleDateFormat("yyyy-MM-dd")
val idPrefix = "mission-history-stat"
val credentials = new EnvironmentVariableCredentialsProvider().getCredentials
val config = new AwsConfig()
.withDynamoEndpoint("https://dynamodb.ap-northeast-1.amazonaws.com")
.withAccessKey(credentials.getAWSAccessKeyId)
.withSecretKey(credentials.getAWSSecretKey)
// TODO: credentials seem wrong and dynamodb returns errors
val repository = Repository(config)
def run(event: KinesisEvent, context: Context): Unit = {
val groupedByDate = groupHistoriesByDate(event.getRecords)
val futures = groupedByDate.map {
case (date, histories) =>
updateStat(date, histories)
}
Await.result(Future.join(futures.toSeq))
}
private def updateStat(date: String, histories: Iterable[MissionHistory]): Future[Option[MissionHistoryStat]] = {
getOrInitializeStatBuilder(date, histories)
.map(aggregateStat(histories))
.flatMap(putStat)
}
private def getOrInitializeStatBuilder(date: String, histories: Iterable[MissionHistory]): Future[MissionHistoryStat.Builder] = {
val id = s"${idPrefix}:${histories.last.getMissionId}:${date}"
repository.missionHistoryStat.get(id).map {
case Some(stat) =>
MissionHistoryStat.newBuilder(stat)
case _ =>
MissionHistoryStat.newBuilder().setId(id)
}
}
private def aggregateStat(histories: Iterable[MissionHistory])(builder: MissionHistoryStat.Builder): MissionHistoryStat.Builder = {
val accountIds = mutable.Set[String]()
for (history <- histories) {
if (history.hasFinishedAt) {
history.getResult match {
case MissionHistory.Result.LOST =>
builder.setLost(builder.getLost + 1)
case MissionHistory.Result.DRAW =>
builder.setDraw(builder.getDraw + 1)
case MissionHistory.Result.WON =>
builder.setWon(builder.getWon + 1)
}
} else {
builder.setUnfinished(builder.getUnfinished + 1)
}
accountIds += history.getAccountId
}
builder.setAccounts(accountIds.size)
}
private def putStat(builder: MissionHistoryStat.Builder): Future[Option[MissionHistoryStat]] = {
repository.missionHistoryStat.put(builder)
}
private def groupHistoriesByDate(records: java.util.List[KinesisEventRecord]): Map[String, Iterable[MissionHistory]] = {
val histories = mutable.Map[String, MissionHistory]()
for (record: KinesisEventRecord <- records) {
try {
val history = MissionHistory
.parseFrom(record.getKinesis.getData.array)
if (history.hasFinishedAt) {
histories(history.getId) = history
} else {
if (!histories.isDefinedAt(history.getId)) {
histories(history.getId) = history
}
}
} catch {
case _: InvalidProtocolBufferException =>
}
}
histories.values.groupBy { history =>
val timestamp = if (history.hasFinishedAt) {
new Date(history.getFinishedAt)
} else {
new Date(history.getStartedAt)
}
dateFormat.format(timestamp)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment