Skip to content

Instantly share code, notes, and snippets.

@prassee
Created August 1, 2016 09:03
Show Gist options
  • Save prassee/65e6792a20417e1dbddf549168b5920b to your computer and use it in GitHub Desktop.
Save prassee/65e6792a20417e1dbddf549168b5920b to your computer and use it in GitHub Desktop.
package service
import _root_.util.Configurations
import _root_.util.futures.SafeFutures._
import akka.actor.{Actor, ActorSystem, Props}
import com.google.inject.{Inject, Singleton}
import com.typesafe.config.ConfigRenderOptions
import org.joda.time.DateTime
import play.api.Logger
import play.api.libs.json._
import service.aggregation_records.AggregationRecordsService
import service.aggregation_records.AggregationRecordsService.{CurrentRuleSetVersionType, RecordKey}
import service.formula.{FormulaModel, QueryDurationWindow}
import service.popularity_extraction.PopularityExtractionService
import service.popularity_scores.ProductPopularityScoresService
import service.popularity_scores.ProductPopularityScoresService.RuleSetVersion
import service.sync_statuses.ProductSyncStatusesService
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import service.ProductPopularityAggregators.ProductPopularityRule
@Singleton
class ProductPopularityAggregators @Inject()(actorSystem: ActorSystem, aggregationRecordsService: AggregationRecordsService,
popularityExtractionService: PopularityExtractionService, productSyncStatusesService: ProductSyncStatusesService,
productPopularityScoresService: ProductPopularityScoresService) {
private val x = (a: String) => a * 2
private def calcQueryWindow(formula: ProductPopularityRule) = {
lazy val presentDay = new DateTime()
for {
qryWindow <- Configurations.getIntOpt("rules.product_popularity_rules.productViewWindow")
ohWindow <- Configurations.getIntOpt("rules.product_popularity_rules.orderCountWindow")
a <- Option(presentDay minusDays formula.productViewWindow.getOrElse(qryWindow))
b <- Option(presentDay minusDays formula.orderCountWindow.getOrElse(ohWindow))
qDurWindow = QueryDurationWindow((a, presentDay), (b, presentDay))
} yield qDurWindow
}
def onStart() = {
import service.ProductPopularityAggregators._
for {
ppr <- Json.fromJson[ProductPopularityRules](Json.parse("rules.product_popularity_rules".configAsJson))
formula = {
productSyncStatusesService.checkAndUpdateCurrentRuleSetVersion(
Option(RecordKey.fromString(s"${CurrentRuleSetVersionType.prefix}-${ppr.version}")))
ppr.formulas.foreach(formula => {
val formulaModel = FormulaModel(formula.expression, formula.variables.toSet)
// create actors for each formula version
actorSystem
.actorOf(Props(new ProductPopularityAggregator(RuleSetVersion(formula.version), calcQueryWindow(formula).get,
formulaModel, aggregationRecordsService, popularityExtractionService, productPopularityScoresService)),
s"product-view-aggregator-${formula.version}")
})
}
} yield ()
}
}
object ProductPopularityAggregators {
case class ProductPopularityRule(expression: String, variables: Array[String], version: String, productViewWindow: Option[Int],
orderCountWindow: Option[Int])
object ProductPopularityRule {
implicit val popScoreRuleFormat: Format[ProductPopularityRule] = Json.format[ProductPopularityRule]
}
case class ProductPopularityRules(id: String, version: String, productViewWindow: Int, orderCountWindow: Int,
formulas: List[ProductPopularityRule])
object ProductPopularityRules {
implicit val popScoreRulesFormat: Format[ProductPopularityRules] = Json.format[ProductPopularityRules]
}
implicit class PathToJson(path: String) {
def configAsJson = Configurations.getConfigObject(path).toConfig.root().render(ConfigRenderOptions.concise())
}
}
private class ProductPopularityAggregator(ruleSet: RuleSetVersion, queryDurationWindow: QueryDurationWindow,
formulaModel: FormulaModel, aggregationRecordsService: AggregationRecordsService, popExtSvc: PopularityExtractionService,
productPopularityScoresService: ProductPopularityScoresService)
extends Actor with LogMessageTagger {
private val interval = Configurations.getInt("popularity.aggregator.schedule.frequency").minutes
@throws[Exception](classOf[Exception])
override def preStart(): Unit = {
Logger.trace(s"Aggregator actor for rule set - ${ruleSet.version} created ".tag(this.getClass.getName))
super.preStart()
self ! Aggregate
}
override def receive: Receive = {
case Aggregate =>
val traceBackSecs = Configurations.getLong("popularity.aggregator.batchSize")
val readUpTo = new DateTime(new DateTime().getMillis - traceBackSecs)
val aggregate = for {
lastReadTimeOpt <- aggregationRecordsService.lastReadTime(ruleSet)
popularityScoresEither <- Try(popExtSvc.extractPopularity(formulaModel, queryDurationWindow).safely).flattenedEither
_ <- popularityScoresEither match {
case Left(t) =>
Logger.trace(s"Extracting popularity scores had failed for version ${ruleSet.version}".tag(this.getClass.getName), t)
Future.successful(())
case Right(popularityScores) =>
Logger.trace(s"Updating popularity scores for version ${ruleSet.version}".tag(this.getClass.getName))
productPopularityScoresService.updatePopularityScores(ruleSet, readUpTo, popularityScores)
}
} yield ()
aggregate.onComplete(_ => context.system.scheduler.scheduleOnce(interval, self, Aggregate))
}
private case object Aggregate
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment