Skip to content

Instantly share code, notes, and snippets.

@hanishi
Created November 19, 2020 15:25
Show Gist options
  • Save hanishi/a0b720363855699c3b8b33a223b8023d to your computer and use it in GitHub Desktop.
Save hanishi/a0b720363855699c3b8b33a223b8023d to your computer and use it in GitHub Desktop.
package models
import java.time.temporal.ChronoUnit
import java.time.{Instant, OffsetDateTime, ZoneId}
import akka.actor.typed.scaladsl.{
ActorContext,
Behaviors,
StashBuffer,
TimerScheduler
}
import akka.actor.typed.{ActorRef, Behavior}
import akka.pattern.StatusReply
import akka.util.Timeout
import play.api.Configuration
import play.api.fb.GraphApi.Edge.Path
import play.api.fb.GraphApi.{Node, PagedResults, Results, UnpagedResults}
import play.api.fb.WSClientExtension.{Edge, Node, id}
import play.api.fb.{GraphApi, Id}
import play.api.libs.functional.syntax.{toFunctionalBuilderOps, unlift}
import play.api.libs.json.{JsPath, Reads, Writes}
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters.{MapHasAsJava, SeqHasAsJava}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Success}
case class AdSet(
id: String,
name: Option[String] = None,
status: Option[String] = None,
effectiveStatus: Option[String],
startTime: Option[OffsetDateTime] = None,
endTime: Option[OffsetDateTime] = None,
campaign: Option[Campaign] = None
) extends Entity
object AdSet
extends EdgeNodes[
AdSet,
(AdSet, Option[(Option[Criteria], Boolean, Boolean)])
]
with EntityEdge[AdSet] {
val rTimestamp = raw"""^\d+:(\d+)$$""".r
implicit val noop: AdSet => Future[
(AdSet, Option[(Option[Criteria], Boolean, Boolean)])
] = adSets => Future.successful((adSets, None))
protected val path = Path[AdSet]("adsets")
def insights(
graphApi: GraphApi,
objectId: String,
dateStart: String,
dateEnd: String,
adlabel: String,
params: Map[String, String],
fields: String*
)(implicit
ec: ExecutionContext,
rd: Reads[Insights]
): Future[Results[Insights]] =
graphApi.read(
objectId,
Path[Insights]("insights"),
params ++ Map(
"level" -> "ad",
"time_range" -> s"""{"since":"$dateStart","until":"$dateEnd"}""",
"filtering" -> s"""[{"field":"ad.adlabels","operator":"ANY","value":["$adlabel"]}]"""
),
fields: _*
)
private def startDurationFromCriteria(criteria: Criteria) =
criteria.startDateTime.fold(OffsetDateTime.now().toEpochSecond.seconds)(
_.toEpochSecond.seconds
)
sealed trait Command
case class ContestCriteria(
numberOfActiveAds: Int,
numberOfAdsToCarryOver: Int,
daysOfInterContestInterval: Int,
daysToRunContest: Int
)
case class UpdateRoot(rootCriteria: Criteria) extends Command
case class Get(replyTo: ActorRef[StatusReply[(AdSet, Criteria)]])
extends Command
case class Update(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
replyTo: ActorRef[StatusReply[(AdSet, Criteria)]]
) extends Command
case class Contest(
current: Option[AdLabel]
) extends Command
case class AdLabelCreated(adlabel: AdLabel) extends Command
case class CreateAdLabelFailed(throwable: Throwable) extends Command
case class LabeledIneligible(ad: Ad) extends Command
case class FailedLabelAdPaused(throwable: Throwable) extends Command
case class ErrorStartNominating(throwable: Throwable) extends Command
case class ErrorStartingBout(throwable: Throwable) extends Command
case class AdLabelAssociated(ad: Ad, adlabel: AdLabel) extends Command
case class FailedAssociateAdLabel(
ad: Ad,
adlabel: AdLabel,
throwable: Throwable
) extends Command
private case class InitializationFailed(
throwable: Throwable
) extends Command
private case class AdsFetched(
results: Seq[Ad],
nextPageParameters: Map[String, String] = Map.empty
) extends Command
private case class FetchAdsFailed(throwable: Throwable) extends Command
private case class AdSetFetched(
adSet: AdSet
) extends Command
private case class InsightsFetched(
results: Seq[Insights],
nextPageParameters: Map[String, String] = Map.empty
) extends Command
private case class FetchAdSetFailed(throwable: Throwable) extends Command
private case class FetchInsightsFailed(throwable: Throwable) extends Command
private case class AdPaused(ad: Ad) extends Command
private case class PauseAdFailed(ad: Ad, throwable: Throwable) extends Command
private class Actor(
context: ActorContext[Command],
buffer: StashBuffer[Command],
timers: TimerScheduler[Command],
graphApi: GraphApi,
configuration: Configuration,
id: String,
name: Option[String],
campaign: Option[Campaign]
)(implicit
timeout: Timeout,
response: Node[Id],
rds: Reads[Id],
contestRepository: ContestRepository,
ec: ExecutionContext
) {
def fetchAdSet(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel
): Behavior[Command] = {
AdSet
.fetch(
graphApi,
id,
"id,name,status,effective_status,start_time,end_time"
)
.map { case (adSet, _) => adSet }
.pipe(context.pipeToSelf) {
case Success(adSet) =>
AdSetFetched(adSet)
case Failure(throwable) => FetchAdSetFailed(throwable)
}
Behaviors.receiveMessage {
case AdSetFetched(
AdSet(
id,
name,
Some(status),
Some(effectiveStatus),
startTime,
endTime,
campaign
)
) =>
if (status == "PAUSED" && effectiveStatus == "PAUSED") {
fetchAds(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
} else {
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer)
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
}
case FetchAdSetFailed(throwable) =>
//error log
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer)
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
def isExpired(labelName: String, name: String): Boolean = {
val rTimestampedLabel = raw"""^$labelName:(\d+)$$""".r
name match {
case rTimestampedLabel(expiration) =>
OffsetDateTime
.now()
.isAfter(
OffsetDateTime.from(Instant.ofEpochSecond(expiration.toLong))
)
}
}
def hasLabelWithExpiration(labelName: String, name: String): Boolean = {
val rTimestampedLabel = raw"""^$labelName:\d+$$""".r
rTimestampedLabel.findFirstIn(name).fold(false)(_ => true)
}
def inspect(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
ads: List[Ad]
): Behavior[Command] =
Behaviors.receiveMessage {
case current @ Contest(adlabel) =>
adlabel match {
case Some(AdLabel(labelId, labelName)) =>
val (
labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
) =
ads.foldLeft(
(
List.empty[Ad],
List.empty[Ad],
List.empty[Ad],
List.empty[Ad],
List.empty[(AdLabel, Ad)]
)
) { (acc, ad) =>
acc match {
case (
labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
) =>
ad match {
case adActive @ Ad(
id,
name,
createdTime,
updatedTime,
Some("ACTIVE"),
creative,
adlabels
) =>
if (
adlabels
.exists(adlabel => adlabel.id == labelId)
)
(
adActive :: labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
)
else
(
labeledActive,
adActive :: unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
)
case adPaused @ Ad(
id,
name,
createdTime,
updatedTime,
Some("PAUSED"),
creative,
adlabels
) =>
adlabels
.find(adlabel =>
hasLabelWithExpiration(labelName, adlabel.name)
)
.fold {
if (adlabels.contains(labelIneligible))
(
labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
)
else if (
adlabels
.exists(adlabel => adlabel.id == labelId)
)
(
labeledActive,
unlabeledActive,
adPaused :: labeledPaused,
unlabeledPaused,
pausedWithExpiration
)
else
(
labeledActive,
unlabeledActive,
labeledPaused,
adPaused :: unlabeledPaused,
pausedWithExpiration
)
} { adlabel =>
if (isExpired(labelName, adlabel.name))
(
labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
(adlabel -> adPaused) :: pausedWithExpiration
)
else
(
labeledActive,
unlabeledActive,
labeledPaused,
unlabeledPaused,
pausedWithExpiration
)
}
}
}
}
if (
unlabeledActive.nonEmpty || labeledPaused.nonEmpty || pausedWithExpiration.nonEmpty
) {
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer)
if (pausedWithExpiration.nonEmpty)
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
current,
labeledActive ::: unlabeledActive,
unlabeledPaused.sortBy(_.createdTime),
pausedWithExpiration
)
else
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
current,
labeledActive ::: unlabeledActive,
unlabeledPaused.sortBy(_.createdTime)
)
} else
buffer.unstashAll(
idle(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
)
case None =>
val (
activeAds,
pausedAds
) =
ads.foldLeft(
(
List.empty[Ad],
List.empty[Ad]
)
) { (acc, ad) =>
acc match {
case (activeAds, pausedAds) =>
ad match {
case ad @ Ad(
id,
name,
createdTime,
updatedTime,
Some(status),
creative,
adlabels
) =>
if (status == "ACTIVE") (ad :: activeAds, pausedAds)
else if (status == "PAUSED")
(activeAds, ad :: pausedAds)
else (activeAds, pausedAds)
}
}
}
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
current,
activeAds,
pausedAds
)
}
case ErrorStartingBout(throwable) =>
context.log.error("error while", throwable)
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
def nominateAdsForBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
activeAds: List[Ad],
pausedAds: List[Ad],
expired: List[(AdLabel, Ad)]
): Behavior[Command] = {
expired match {
case (adlabel, ad) :: tail =>
(for {
_ <- AdLabel.delete(graphApi, adlabel.id)
_ <- AdLabel.label(graphApi, ad, labelIneligible)
} yield ad).pipe(context.pipeToSelf) {
case Success(ad) => LabeledIneligible(ad)
case Failure(throwable) => FailedLabelAdPaused(throwable)
}
nominatingAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
activeAds,
pausedAds,
tail
)
case Nil =>
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
activeAds,
pausedAds
)
}
}
def nominatingAdsForBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
activeAds: List[Ad],
pausedAds: List[Ad],
expired: List[(AdLabel, Ad)]
) = {
Behaviors.receiveMessage {
case LabeledIneligible(ad) =>
context.log.info(s"$ad labeled as paused")
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
activeAds,
pausedAds,
expired
)
case FailedLabelAdPaused(throwable) =>
context.log.error("failed to label ad as paused", throwable)
nominateAdsForBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
activeAds,
pausedAds,
expired
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
def nominateAdsForBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
activeAds: List[Ad],
pausedAds: List[Ad]
): Behavior[Command] = {
contest match {
case Contest(current) =>
(for {
c <- campaign
accountId <- c.accountId
} yield accountId)
.fold[Future[AdLabel]](
Future.failed(new IllegalStateException("no accountId"))
) { accountId =>
current
.fold(AdLabel.label(graphApi, accountId, id)) {
case AdLabel(labelId, name) =>
for {
_ <- AdLabel.delete(graphApi, labelId)
label <- AdLabel.label(graphApi, accountId, id)
} yield label
}
}
.pipe(context.pipeToSelf) {
case Success(adLabel) => Contest(Some(adLabel))
case Failure(throwable) => CreateAdLabelFailed(throwable)
}
}
Behaviors.receiveMessage {
case contest: Contest =>
nominateActiveAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
activeAds,
pausedAds,
List.empty
)
case CreateAdLabelFailed(throwable) =>
context.log.error("error creating adlabel", throwable)
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
def nominateActiveAdsForContest(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
activeAds: List[Ad],
pausedAds: List[Ad],
nominated: List[Ad]
) = {
contest match {
case Contest(Some(current)) =>
activeAds match {
case head :: tail =>
AdLabel.label(graphApi, head, current).pipe(context.pipeToSelf) {
case Success(ad) => AdLabelAssociated(ad, current)
case Failure(throwable) =>
FailedAssociateAdLabel(head, current, throwable)
}
nominatingActiveAdForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
tail,
pausedAds,
nominated
)
case Nil =>
if (nominated.size < contestCriteria.numberOfActiveAds) {
nominatePausedAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
pausedAds.take(
contestCriteria.numberOfActiveAds - nominated.size
),
nominated
)
} else
scheduleBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
current,
labelIneligible
)
}
}
}
def scheduleBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
contest: AdLabel,
labelIneligible: AdLabel
): Behavior[Command] = {
contest match {
case AdLabel(_, name) =>
name match {
case rTimestamp(timestamp) =>
val date0 = OffsetDateTime
.ofInstant(
Instant
.ofEpochSecond(timestamp.toLong),
ZoneId.systemDefault()
)
.truncatedTo(ChronoUnit.DAYS)
val target = date0.plusSeconds(
contestCriteria.daysToRunContest * 24 * 60 * 60
)
val date1 = OffsetDateTime.now()
val secondsToAdd = target.toEpochSecond - date1.toEpochSecond
if (secondsToAdd > 0)
timers.startSingleTimer(
BoutTimer,
Bout,
secondsToAdd.seconds
)
else
context.self ! Bout
}
}
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
}
def nominatingActiveAdForContest(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
active: List[Ad],
paused: List[Ad],
nominated: List[Ad]
): Behavior[Command] =
Behaviors.receiveMessage {
case AdLabelAssociated(ad, label) =>
nominateActiveAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
active,
paused,
ad :: nominated
)
case FailedAssociateAdLabel(ad, adlabel, throwable) =>
context.log.error("failed to nominate ad", throwable)
nominateActiveAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
active,
paused,
nominated
)
case other =>
buffer.stash(other)
Behaviors.same
}
def nominatePausedAdsForContest(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
pausedAds: List[Ad],
nominated: List[Ad]
): Behavior[Command] =
contest match {
case Contest(Some(current)) =>
pausedAds match {
case head :: tail =>
AdLabel.label(graphApi, head, current).pipe(context.pipeToSelf) {
case Success(ad) => AdLabelAssociated(ad, current)
case Failure(throwable) =>
FailedAssociateAdLabel(head, current, throwable)
}
nominatingPausedAdForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
tail,
nominated
)
case Nil =>
scheduleBout(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
current,
labelIneligible
)
}
}
def nominatingPausedAdForContest(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
labelIneligible: AdLabel,
contest: Contest,
paused: List[Ad],
nominated: List[Ad]
): Behavior[Command] =
Behaviors.receiveMessage {
case AdLabelAssociated(ad, label) =>
nominatePausedAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
paused,
ad :: nominated
)
case FailedAssociateAdLabel(ad, adlabel, throwable) =>
context.log.error("failed to nominate ad", throwable)
nominatePausedAdsForContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
contest,
paused,
nominated
)
case other =>
buffer.stash(other)
Behaviors.same
}
def idle(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
duration: FiniteDuration =
configuration.get[Long](APP_ADSET_ADS_REFRESH).seconds
): Behavior[Command] = {
if (timers.isTimerActive(RefreshAdsKey)) timers.cancel(RefreshAds)
timers.startSingleTimer(
RefreshAdsKey,
RefreshAds,
duration
)
active(startTime, endTime, rootCriteria, criteria, labelIneligible)
}
def fetchAds(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
ads: List[Ad] = List.empty,
params: Map[String, String] = Map.empty
): Behavior[Command] = {
Ad.fetchAll(
graphApi,
id,
params,
"id",
"name",
"created_time",
"updated_time",
"status",
"creative{id,name}",
"adlabels"
)
.pipe(context.pipeToSelf) {
case Success(results) =>
results match {
case PagedResults(
ads,
_,
next,
_
) =>
next.fold(AdsFetched(ads)) { next =>
AdsFetched(ads, next.toMap("after", "limit"))
}
case UnpagedResults(ads, _) => AdsFetched(ads)
}
case Failure(throwable) => FetchAdsFailed(throwable)
}
fetchingAds(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
ads
)
}
def fetchingAds(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
ads: List[Ad] = List.empty
): Behavior[Command] = {
Behaviors.receiveMessage {
case AdsFetched(results, nextPageParameters) =>
val list = results.foldLeft(ads) { (acc, ad) => ad :: acc }
if (nextPageParameters.nonEmpty)
fetchAds(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
list,
nextPageParameters
)
else {
contestRepository.current().pipe(context.pipeToSelf) {
case Success(current) =>
Contest(current)
case Failure(throwable) => ErrorStartNominating(throwable)
}
val merged = rootCriteria.update(criteria: _*)
val contestCriteria = for {
numberOfActiveAds <- merged.numberOfActiveAds
numberOfAdsToCarryOver <- merged.numberOfAdsToCarryOver
daysOfInterContestInterval <- merged.daysOfInterContestInterval
daysToRunContest <- merged.daysToRunContest
} yield ContestCriteria(
numberOfActiveAds,
numberOfAdsToCarryOver,
daysOfInterContestInterval,
daysToRunContest
)
contestCriteria.fold(
buffer.unstashAll(
idle(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
)
) { contestCriteria =>
inspect(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
labelIneligible,
list
)
}
}
case FetchAdsFailed(throwable) =>
context.log.error("fetch ads failed", throwable)
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer)
buffer.unstashAll(
idle(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
def active(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel
): Behavior[Command] =
Behaviors.receiveMessage {
case Get(replyTo) =>
replyTo ! StatusReply.Success(
AdSet(id, name, None, None, startTime, endTime, campaign),
rootCriteria.update(criteria: _*)
)
Behaviors.same
case Update(startTime, endTime, rootCriteria, baseCriteria, replyTo) =>
val updated = criteria.patch(0, baseCriteria, baseCriteria.size)
replyTo ! StatusReply.Success(
AdSet(id, name, None, None, startTime, endTime, campaign),
rootCriteria.update(updated: _*)
)
idle(
startTime,
endTime,
rootCriteria,
updated,
labelIneligible,
startDurationFromCriteria(rootCriteria.update(baseCriteria: _*))
)
case UpdateRoot(rootCriteria) =>
idle(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
startDurationFromCriteria(rootCriteria.update(criteria: _*))
)
case RefreshAds =>
fetchAdSet(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
case Bout =>
startBout(startTime, endTime, rootCriteria, criteria, labelIneligible)
}
def init(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
duration: FiniteDuration
): Behavior[Command] = {
labelForIneligibleAds(id, campaign).pipe(
context.pipeToSelf
) {
case Success(value) =>
AdLabelCreated(value)
case Failure(throwable) =>
CreateAdLabelFailed(throwable)
}
Behaviors.receiveMessage {
case AdLabelCreated(labelIneligible) =>
buffer.unstashAll(
idle(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
duration
)
)
case CreateAdLabelFailed(throwable) =>
context.log.error("Unable to create label", throwable)
throw throwable
case other =>
buffer.stash(other)
Behaviors.same
}
}
def startBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel
): Behavior[Command] = {
contestRepository
.current()
.pipe(context.pipeToSelf) {
case Success(current) =>
Contest(current)
case Failure(throwable) => ErrorStartingBout(throwable)
}
startingBout(startTime, endTime, rootCriteria, criteria, labelIneligible)
}
def startingBout(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel
): Behavior[Command] =
Behaviors.receiveMessage {
case Contest(current) =>
current match {
case Some(adlabel) =>
fetchAdsInsights(
id,
adlabel,
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
case None =>
context.log.error("no label to match")
buffer.unstashAll(
active(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
)
}
Behaviors.same
case ErrorStartingBout(throwable) =>
context.log.error("retrieve contest failed", throwable)
buffer.unstashAll(
active(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
def fetchAdsInsights(
id: String,
label: AdLabel,
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
insights: List[Insights] = List.empty,
params: Map[String, String] = Map.empty
): Behavior[Command] = {
AdSet
.insights(
graphApi,
id,
"",
"",
"",
params,
"ad_id",
"impressions"
)
.pipe(context.pipeToSelf) {
case Success(results) =>
results match {
case PagedResults(insights, _, next, _) =>
next.fold(InsightsFetched(insights)) { next =>
InsightsFetched(insights, next.toMap("after", "limit"))
}
case UnpagedResults(insights, _) => InsightsFetched(insights)
}
case Failure(throwable) => FetchInsightsFailed(throwable)
}
fetchingAdsInsights(
id,
label,
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
insights
)
}
def runContest(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
contestCriteria: ContestCriteria,
current: AdLabel,
labelIneligible: AdLabel,
list: List[Insights]
): Behavior[Command] = {
val pauseList = list
.collect {
case Insights(adId, name, Some(impressions)) =>
(adId, name, impressions)
}
.sortBy(_._3)
.drop(list.size - contestCriteria.numberOfAdsToCarryOver)
.map {
case (id, name, _) => Ad(id, name)
}
AdLabel
.timestamp(
graphApi,
current,
contestCriteria.daysOfInterContestInterval
)
.pipe(context.pipeToSelf) {
case Success(adlabel) => AdLabelCreated(adlabel)
case Failure(throwable) => CreateAdLabelFailed(throwable)
}
Behaviors.receiveMessage {
case AdLabelCreated(adlabel) =>
pauseAd(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
adlabel,
pauseList
)
case CreateAdLabelFailed(throwable) =>
context.log.error("failed to create label timestamped", throwable)
buffer.unstashAll(
active(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
def pauseAd(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
adlabel: AdLabel,
ads: List[Ad],
paused: List[Ad] = List.empty
): Behavior[Command] = {
ads match {
case head :: tail =>
(for {
_ <- AdLabel.label(graphApi, head, adlabel)
result <- Ad.pause(head)
} yield result)
.pipe(context.pipeToSelf) {
case Success(result) =>
AdPaused(head)
case Failure(throwable) =>
PauseAdFailed(head, throwable)
}
pausingAd(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
adlabel,
tail,
paused
)
case Nil =>
context.log.info(s"paused $paused")
buffer.unstashAll(
active(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
}
}
def pausingAd(
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
adlabel: AdLabel,
ads: List[Ad],
paused: List[Ad] = List.empty
): Behavior[Command] =
Behaviors.receiveMessage {
case AdPaused(ad) =>
pauseAd(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
adlabel,
ads,
ad :: paused
)
case PauseAdFailed(ad, throwable) =>
context.log.error(s"failed to pause $ad", throwable)
buffer.unstashAll(
active(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
)
case other =>
buffer.stash(other)
Behaviors.same
}
def fetchingAdsInsights(
id: String,
label: AdLabel,
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]],
labelIneligible: AdLabel,
insights: List[Insights] = List.empty
): Behavior[Command] = {
Behaviors.receiveMessage {
case InsightsFetched(results, nextPageParameters) =>
val list = results.foldLeft(insights) { (acc, insights) =>
insights :: acc
}
if (nextPageParameters.nonEmpty)
fetchAdsInsights(
id,
label,
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible,
list,
nextPageParameters
)
else {
val merged = rootCriteria.update(criteria: _*)
val contestCriteria = for {
numberOfActiveAds <- merged.numberOfActiveAds
numberOfAdsToCarryOver <- merged.numberOfAdsToCarryOver
daysOfInterContestInterval <- merged.daysOfInterContestInterval
daysToRunContest <- merged.daysToRunContest
} yield ContestCriteria(
numberOfActiveAds,
numberOfAdsToCarryOver,
daysOfInterContestInterval,
daysToRunContest
)
contestCriteria.fold(
buffer.unstashAll(
active(
startTime,
endTime,
rootCriteria,
criteria,
labelIneligible
)
)
) { contestCriteria =>
runContest(
startTime,
endTime,
rootCriteria,
criteria,
contestCriteria,
label,
labelIneligible,
list
)
}
}
case FetchInsightsFailed(throwable) =>
context.log.error("fetch insights failed", throwable)
buffer.unstashAll(
active(startTime, endTime, rootCriteria, criteria, labelIneligible)
)
case other =>
buffer.stash(other)
Behaviors.same
}
}
private def labelForIneligibleAds(
id: String,
campaign: Option[Campaign]
)(implicit
response: Node[Id],
rds: Reads[Id],
ec: ExecutionContext
): Future[AdLabel] =
(for {
c <- campaign
accountId <- c.accountId
} yield accountId)
.fold(Future.failed(new IllegalStateException("no accountId"))) {
accountId =>
val name = s"$id:paused"
for {
id <- graphApi.publish(
accountId,
Path("adlabels"),
Map("name" -> Seq(name))
)
} yield AdLabel(id, name)
}
}
object Actor {
def apply(
client: GraphApiClient,
configuration: Configuration,
id: String,
name: Option[String],
startTime: Option[OffsetDateTime],
endTime: Option[OffsetDateTime],
campaign: Option[Campaign],
rootCriteria: Criteria,
criteria: Seq[Option[Criteria]]
)(implicit
timeout: Timeout,
response: Node[Id],
rds: Reads[Id],
contestRepository: ContestRepository,
ec: ExecutionContext
): Behavior[Command] = {
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer =>
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
client.graphApi.fold[Behavior[Command]](
throw new IllegalStateException("no access token")
) { graphApi =>
new Actor(
context,
buffer,
timers,
graphApi,
configuration,
id,
name,
campaign
).init(
startTime,
endTime,
rootCriteria,
criteria,
startDurationFromCriteria(rootCriteria.update(criteria: _*))
)
}
}
}
}
}
}
private case object BoutTimer
private case object Bout extends Command
private case object RefreshAds extends Command
private case object RefreshAdsKey
implicit val offsetDateTimeReads = facebookOffsetDateTimeReads
implicit val jsonReads: Reads[AdSet] = (
(JsPath \ "id").read[String] and
(JsPath \ "name").readNullable[String] and
(JsPath \ "status").readNullable[String] and
(JsPath \ "effectiveStatus").readNullable[String] and
(JsPath \ "start_time").readNullable[OffsetDateTime] and
(JsPath \ "end_time").readNullable[OffsetDateTime] and
(JsPath \ "campaign").readNullable[Campaign]
)(
AdSet.apply(
_: String,
_: Option[String],
_: Option[String],
_: Option[String],
_: Option[OffsetDateTime],
_: Option[OffsetDateTime],
_: Option[Campaign]
)
)
override implicit val edge: GraphApi.Edge[AdSet] = Edge[AdSet]
override implicit val node: GraphApi.Node[AdSet] = Node[AdSet]
implicit val jsonWrites: Writes[AdSet] =
((JsPath \ "id").write[String] and
(JsPath \ "name").writeNullable[String] and
(JsPath \ "status").writeNullable[String] and
(JsPath \ "effective_status").writeNullable[String] and
(JsPath \ "start_time").writeNullable[OffsetDateTime] and
(JsPath \ "end_time").writeNullable[OffsetDateTime] and
(JsPath \ "campaign").writeNullable[Campaign])(unlift(AdSet.unapply))
implicit class adSet2AttributeValue(adSet: AdSet) {
import software.amazon.awssdk.services.dynamodb.model.{
Update => DynamoUpdate,
_
}
def asPutItemRequest(
tableName: String,
criteria: Option[Criteria],
status: String
): TransactWriteItemsRequest =
TransactWriteItemsRequest
.builder()
.transactItems(
List(
TransactWriteItem
.builder()
.put(
Put
.builder()
.tableName(tableName)
.item(
Map
.empty[String, AttributeValue]
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}")))
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}")))
.pipe(_ + (ID -> S(adSet.id)))
.pipe(map =>
adSet.name.fold(map) { x => map + (NAME -> S(x)) }
)
.pipe(_ + (STATUS -> S(status)))
.pipe(map =>
criteria.fold(map) { x =>
map + (CRITERIA -> x.asAttributeValue)
}
)
.pipe(map =>
(for {
campaign <- adSet.campaign
accountId <- campaign.accountId
} yield S(s"ACCOUNT#act_$accountId")).fold(map) { x =>
map + (GSI1PK -> x)
}
)
.pipe(map =>
(for {
campaign <- adSet.campaign
accountId <- campaign.accountId
} yield S(
s"ACCOUNT#act_$accountId#CAMPAIGN#${campaign.id}#ADSET#${adSet.id}"
)).fold(map) { x => map + (GSI1SK -> x) }
)
.pipe(map =>
(for {
campaign <- adSet.campaign
accountId <- campaign.accountId
} yield S(s"ACCOUNT#act_$accountId#ADSETS"))
.fold(map) { x =>
map + (GSI2PK -> x)
}
)
.pipe(_ + (GSI2SK -> S(s"ADSET#${adSet.id}")))
.asJava
)
.conditionExpression("attribute_not_exists(#pk)")
.expressionAttributeNames(
Map("#pk" -> PARTITION_KEY).asJava
)
.build()
)
.build(),
addAdSet(tableName)
).asJava
)
.build()
def addAdSet(tableName: String): TransactWriteItem =
TransactWriteItem
.builder()
.update(
(
List.empty[String],
Map("#adsets" -> "AdSets"),
Map(":adsets" -> SS(s"${adSet.id}"))
).pipe {
case (set, expressionNames, expressionValues) =>
adSet.campaign.fold(
(
set,
expressionNames,
expressionValues
)
) { campaign =>
(
"#id = if_not_exists(#id, :id)" :: set,
expressionNames + ("#id" -> ID),
expressionValues + (":id" -> S(campaign.id))
)
}
}
.pipe {
case (set, expressionNames, expressionValues) =>
adSet.campaign.fold(
(
set,
expressionNames,
expressionValues
)
) { campaign =>
campaign.accountId.fold(
(
set,
expressionNames,
expressionValues
)
) { accountId =>
(
"#gsi1pk = if_not_exists(#gsi1pk, :gsi1pk)" :: set,
expressionNames + ("#gsi1pk" -> GSI1PK),
expressionValues + (":gsi1pk" -> S(
s"ACCOUNT#act_$accountId"
))
)
}
}
}
.pipe {
case (set, expressionNames, expressionValues) =>
adSet.campaign.fold(
(
set,
expressionNames,
expressionValues
)
) {
campaign =>
campaign.accountId.fold(
(
set,
expressionNames,
expressionValues
)
) { accountId =>
(
"#gsi1sk = if_not_exists(#gsi1sk, :gsi1sk)" :: set,
expressionNames + ("#gsi1sk" -> GSI1SK),
expressionValues + (":gsi1sk" -> S(
s"ACCOUNT#act_${accountId}#CAMPAIGN#${campaign.id}"
))
)
}
}
}
.pipe {
case (set, expressionNames, expressionValues) =>
adSet.campaign.fold(
(
set,
expressionNames,
expressionValues
)
) {
campaign =>
campaign.accountId.fold(
(
set,
expressionNames,
expressionValues
)
) { accountId =>
(
"#gsi2pk = if_not_exists(#gsi2pk, :gsi2pk)" :: set,
expressionNames + ("#gsi2pk" -> GSI2PK),
expressionValues + (":gsi2pk" -> S(
s"ACCOUNT#act_${accountId}#CAMPAIGNS"
))
)
}
}
}
.pipe {
case (set, expressionNames, expressionValues) =>
adSet.campaign.fold(
(
set,
expressionNames,
expressionValues
)
) { campaign =>
(
"#gsi2sk = if_not_exists(#gsi2sk, :gsi2sk)" :: set,
expressionNames + ("#gsi2sk" -> GSI2SK),
expressionValues + (":gsi2sk" -> S(
s"CAMPAIGN#${campaign.id}"
))
)
}
}
.pipe {
case (set, expressionNames, expressionValues) =>
(
"#status = if_not_exists(#status, :status)" :: set,
expressionNames + ("#status" -> "Status"),
expressionValues + (":status" -> S("Paused"))
)
} match {
case (set, expressionNames, expressionValues) =>
val updateExpression =
if (set.nonEmpty) s"set ${set.foldLeft("") { (a, b) =>
if (a.nonEmpty) s"$a,$b" else b
}}"
else ""
DynamoUpdate
.builder()
.tableName(tableName)
.key(
Map
.empty[String, AttributeValue]
.pipe(map =>
adSet.campaign
.fold(map) { x =>
map + (PARTITION_KEY -> S(s"CAMPAIGN#${x.id}"))
}
)
.pipe(map =>
adSet.campaign
.fold(map) { x =>
map + (SORT_KEY -> S(s"CAMPAIGN#${x.id}"))
}
)
.asJava
)
.updateExpression(
updateExpression + (if (updateExpression.isBlank)
"add #adsets :adsets"
else " add #adsets :adsets")
)
.expressionAttributeNames(expressionNames.asJava)
.expressionAttributeValues(expressionValues.asJava)
.build()
}
)
.build()
def asGetItemRequest(tableName: String): GetItemRequest =
GetItemRequest
.builder()
.tableName(tableName)
.key(
Map(
PARTITION_KEY -> S(s"ADSET#${adSet.id}"),
SORT_KEY -> S(s"ADSET#${adSet.id}")
).asJava
)
.attributesToGet(ID, NAME, STATUS, CRITERIA, CURRENT_CONTEST)
.build()
def asUpdateItemRequest(
tableName: String,
criteria: Option[Criteria],
status: String
): UpdateItemRequest = {
val builder = UpdateItemRequest
.builder()
.tableName(tableName)
.key(
Map
.empty[String, AttributeValue]
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}")))
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}")))
.asJava
)
(
List.empty[String],
List.empty[String],
Map.empty[String, String],
Map.empty[String, AttributeValue]
).pipe {
case (remove, set, expressionNames, expressionValues) =>
adSet.name.fold(
(
"#name" :: remove,
set,
expressionNames + ("#name" -> "Name"),
expressionValues
)
) { x =>
(
remove,
"#name = :name" :: set,
expressionNames + ("#name" -> "Name"),
expressionValues + (":name" -> S(x))
)
}
}
.pipe {
case (remove, set, expressionNames, expressionValues) =>
criteria.fold(
(
"#criteria" :: remove,
set,
expressionNames + ("#criteria" -> "Criteria"),
expressionValues
)
) { x =>
(
remove,
"#criteria = :criteria" :: set,
expressionNames + ("#criteria" -> "Criteria"),
expressionValues + (":criteria" -> x.asAttributeValue)
)
}
}
.pipe {
case (remove, set, expressionNames, expressionValues) =>
(
remove,
"#status = :status" :: set,
expressionNames + ("#status" -> "Status"),
expressionValues + (":status" -> S(status))
)
} match {
case (remove, set, expressionNames, expressionValues) =>
val updateExpression =
if (set.nonEmpty) s"set ${set.foldLeft("") { (a, b) =>
if (a.nonEmpty) s"$a,$b" else b
}}"
else ""
val deleteExpression = {
if (remove.nonEmpty) s"remove ${remove
.foldLeft("") { (a, b) => if (a.nonEmpty) s"$a,$b" else b }}"
else ""
}
builder.updateExpression(
updateExpression + (if (deleteExpression.nonEmpty)
if (updateExpression.isBlank)
deleteExpression
else " " + deleteExpression
else "")
)
builder.expressionAttributeNames(expressionNames.asJava)
if (expressionValues.nonEmpty)
builder.expressionAttributeValues(expressionValues.asJava)
builder.build()
}
}
//TODO allow delete item only if there is no associated contest
def asDeleteItemRequest(tableName: String): TransactWriteItemsRequest =
TransactWriteItemsRequest
.builder()
.transactItems(
List(
TransactWriteItem
.builder()
.delete(
Delete
.builder()
.tableName(tableName)
.key(
Map
.empty[String, AttributeValue]
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}")))
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}")))
.asJava
)
.build()
)
.build(),
TransactWriteItem
.builder()
.update(
software.amazon.awssdk.services.dynamodb.model.Update
.builder()
.tableName(tableName)
.key(
Map
.empty[String, AttributeValue]
.pipe(map =>
adSet.campaign
.fold(map) { x =>
map + (PARTITION_KEY -> S(s"CAMPAIGN#${x.id}"))
}
)
.pipe(map =>
adSet.campaign
.fold(map) { x =>
map + (SORT_KEY -> S(s"CAMPAIGN#${x.id}"))
}
)
.asJava
)
.updateExpression("delete #adsets :adsets")
.expressionAttributeNames(Map("#adsets" -> "AdSets").asJava)
.expressionAttributeValues(
Map(":adsets" -> SS(s"${adSet.id}")).asJava
)
.build()
)
.build()
).asJava
)
.build()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment