Created
November 19, 2020 15:25
-
-
Save hanishi/a0b720363855699c3b8b33a223b8023d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package models | |
import java.time.temporal.ChronoUnit | |
import java.time.{Instant, OffsetDateTime, ZoneId} | |
import akka.actor.typed.scaladsl.{ | |
ActorContext, | |
Behaviors, | |
StashBuffer, | |
TimerScheduler | |
} | |
import akka.actor.typed.{ActorRef, Behavior} | |
import akka.pattern.StatusReply | |
import akka.util.Timeout | |
import play.api.Configuration | |
import play.api.fb.GraphApi.Edge.Path | |
import play.api.fb.GraphApi.{Node, PagedResults, Results, UnpagedResults} | |
import play.api.fb.WSClientExtension.{Edge, Node, id} | |
import play.api.fb.{GraphApi, Id} | |
import play.api.libs.functional.syntax.{toFunctionalBuilderOps, unlift} | |
import play.api.libs.json.{JsPath, Reads, Writes} | |
import scala.concurrent.duration.{DurationLong, FiniteDuration} | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.jdk.CollectionConverters.{MapHasAsJava, SeqHasAsJava} | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{Failure, Success} | |
case class AdSet( | |
id: String, | |
name: Option[String] = None, | |
status: Option[String] = None, | |
effectiveStatus: Option[String], | |
startTime: Option[OffsetDateTime] = None, | |
endTime: Option[OffsetDateTime] = None, | |
campaign: Option[Campaign] = None | |
) extends Entity | |
object AdSet | |
extends EdgeNodes[ | |
AdSet, | |
(AdSet, Option[(Option[Criteria], Boolean, Boolean)]) | |
] | |
with EntityEdge[AdSet] { | |
val rTimestamp = raw"""^\d+:(\d+)$$""".r | |
implicit val noop: AdSet => Future[ | |
(AdSet, Option[(Option[Criteria], Boolean, Boolean)]) | |
] = adSets => Future.successful((adSets, None)) | |
protected val path = Path[AdSet]("adsets") | |
def insights( | |
graphApi: GraphApi, | |
objectId: String, | |
dateStart: String, | |
dateEnd: String, | |
adlabel: String, | |
params: Map[String, String], | |
fields: String* | |
)(implicit | |
ec: ExecutionContext, | |
rd: Reads[Insights] | |
): Future[Results[Insights]] = | |
graphApi.read( | |
objectId, | |
Path[Insights]("insights"), | |
params ++ Map( | |
"level" -> "ad", | |
"time_range" -> s"""{"since":"$dateStart","until":"$dateEnd"}""", | |
"filtering" -> s"""[{"field":"ad.adlabels","operator":"ANY","value":["$adlabel"]}]""" | |
), | |
fields: _* | |
) | |
private def startDurationFromCriteria(criteria: Criteria) = | |
criteria.startDateTime.fold(OffsetDateTime.now().toEpochSecond.seconds)( | |
_.toEpochSecond.seconds | |
) | |
sealed trait Command | |
case class ContestCriteria( | |
numberOfActiveAds: Int, | |
numberOfAdsToCarryOver: Int, | |
daysOfInterContestInterval: Int, | |
daysToRunContest: Int | |
) | |
case class UpdateRoot(rootCriteria: Criteria) extends Command | |
case class Get(replyTo: ActorRef[StatusReply[(AdSet, Criteria)]]) | |
extends Command | |
case class Update( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
replyTo: ActorRef[StatusReply[(AdSet, Criteria)]] | |
) extends Command | |
case class Contest( | |
current: Option[AdLabel] | |
) extends Command | |
case class AdLabelCreated(adlabel: AdLabel) extends Command | |
case class CreateAdLabelFailed(throwable: Throwable) extends Command | |
case class LabeledIneligible(ad: Ad) extends Command | |
case class FailedLabelAdPaused(throwable: Throwable) extends Command | |
case class ErrorStartNominating(throwable: Throwable) extends Command | |
case class ErrorStartingBout(throwable: Throwable) extends Command | |
case class AdLabelAssociated(ad: Ad, adlabel: AdLabel) extends Command | |
case class FailedAssociateAdLabel( | |
ad: Ad, | |
adlabel: AdLabel, | |
throwable: Throwable | |
) extends Command | |
private case class InitializationFailed( | |
throwable: Throwable | |
) extends Command | |
private case class AdsFetched( | |
results: Seq[Ad], | |
nextPageParameters: Map[String, String] = Map.empty | |
) extends Command | |
private case class FetchAdsFailed(throwable: Throwable) extends Command | |
private case class AdSetFetched( | |
adSet: AdSet | |
) extends Command | |
private case class InsightsFetched( | |
results: Seq[Insights], | |
nextPageParameters: Map[String, String] = Map.empty | |
) extends Command | |
private case class FetchAdSetFailed(throwable: Throwable) extends Command | |
private case class FetchInsightsFailed(throwable: Throwable) extends Command | |
private case class AdPaused(ad: Ad) extends Command | |
private case class PauseAdFailed(ad: Ad, throwable: Throwable) extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command], | |
timers: TimerScheduler[Command], | |
graphApi: GraphApi, | |
configuration: Configuration, | |
id: String, | |
name: Option[String], | |
campaign: Option[Campaign] | |
)(implicit | |
timeout: Timeout, | |
response: Node[Id], | |
rds: Reads[Id], | |
contestRepository: ContestRepository, | |
ec: ExecutionContext | |
) { | |
def fetchAdSet( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel | |
): Behavior[Command] = { | |
AdSet | |
.fetch( | |
graphApi, | |
id, | |
"id,name,status,effective_status,start_time,end_time" | |
) | |
.map { case (adSet, _) => adSet } | |
.pipe(context.pipeToSelf) { | |
case Success(adSet) => | |
AdSetFetched(adSet) | |
case Failure(throwable) => FetchAdSetFailed(throwable) | |
} | |
Behaviors.receiveMessage { | |
case AdSetFetched( | |
AdSet( | |
id, | |
name, | |
Some(status), | |
Some(effectiveStatus), | |
startTime, | |
endTime, | |
campaign | |
) | |
) => | |
if (status == "PAUSED" && effectiveStatus == "PAUSED") { | |
fetchAds( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
} else { | |
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer) | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
} | |
case FetchAdSetFailed(throwable) => | |
//error log | |
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer) | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def isExpired(labelName: String, name: String): Boolean = { | |
val rTimestampedLabel = raw"""^$labelName:(\d+)$$""".r | |
name match { | |
case rTimestampedLabel(expiration) => | |
OffsetDateTime | |
.now() | |
.isAfter( | |
OffsetDateTime.from(Instant.ofEpochSecond(expiration.toLong)) | |
) | |
} | |
} | |
def hasLabelWithExpiration(labelName: String, name: String): Boolean = { | |
val rTimestampedLabel = raw"""^$labelName:\d+$$""".r | |
rTimestampedLabel.findFirstIn(name).fold(false)(_ => true) | |
} | |
def inspect( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
ads: List[Ad] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case current @ Contest(adlabel) => | |
adlabel match { | |
case Some(AdLabel(labelId, labelName)) => | |
val ( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) = | |
ads.foldLeft( | |
( | |
List.empty[Ad], | |
List.empty[Ad], | |
List.empty[Ad], | |
List.empty[Ad], | |
List.empty[(AdLabel, Ad)] | |
) | |
) { (acc, ad) => | |
acc match { | |
case ( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) => | |
ad match { | |
case adActive @ Ad( | |
id, | |
name, | |
createdTime, | |
updatedTime, | |
Some("ACTIVE"), | |
creative, | |
adlabels | |
) => | |
if ( | |
adlabels | |
.exists(adlabel => adlabel.id == labelId) | |
) | |
( | |
adActive :: labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) | |
else | |
( | |
labeledActive, | |
adActive :: unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) | |
case adPaused @ Ad( | |
id, | |
name, | |
createdTime, | |
updatedTime, | |
Some("PAUSED"), | |
creative, | |
adlabels | |
) => | |
adlabels | |
.find(adlabel => | |
hasLabelWithExpiration(labelName, adlabel.name) | |
) | |
.fold { | |
if (adlabels.contains(labelIneligible)) | |
( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) | |
else if ( | |
adlabels | |
.exists(adlabel => adlabel.id == labelId) | |
) | |
( | |
labeledActive, | |
unlabeledActive, | |
adPaused :: labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) | |
else | |
( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
adPaused :: unlabeledPaused, | |
pausedWithExpiration | |
) | |
} { adlabel => | |
if (isExpired(labelName, adlabel.name)) | |
( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
(adlabel -> adPaused) :: pausedWithExpiration | |
) | |
else | |
( | |
labeledActive, | |
unlabeledActive, | |
labeledPaused, | |
unlabeledPaused, | |
pausedWithExpiration | |
) | |
} | |
} | |
} | |
} | |
if ( | |
unlabeledActive.nonEmpty || labeledPaused.nonEmpty || pausedWithExpiration.nonEmpty | |
) { | |
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer) | |
if (pausedWithExpiration.nonEmpty) | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
current, | |
labeledActive ::: unlabeledActive, | |
unlabeledPaused.sortBy(_.createdTime), | |
pausedWithExpiration | |
) | |
else | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
current, | |
labeledActive ::: unlabeledActive, | |
unlabeledPaused.sortBy(_.createdTime) | |
) | |
} else | |
buffer.unstashAll( | |
idle( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
) | |
case None => | |
val ( | |
activeAds, | |
pausedAds | |
) = | |
ads.foldLeft( | |
( | |
List.empty[Ad], | |
List.empty[Ad] | |
) | |
) { (acc, ad) => | |
acc match { | |
case (activeAds, pausedAds) => | |
ad match { | |
case ad @ Ad( | |
id, | |
name, | |
createdTime, | |
updatedTime, | |
Some(status), | |
creative, | |
adlabels | |
) => | |
if (status == "ACTIVE") (ad :: activeAds, pausedAds) | |
else if (status == "PAUSED") | |
(activeAds, ad :: pausedAds) | |
else (activeAds, pausedAds) | |
} | |
} | |
} | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
current, | |
activeAds, | |
pausedAds | |
) | |
} | |
case ErrorStartingBout(throwable) => | |
context.log.error("error while", throwable) | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def nominateAdsForBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
activeAds: List[Ad], | |
pausedAds: List[Ad], | |
expired: List[(AdLabel, Ad)] | |
): Behavior[Command] = { | |
expired match { | |
case (adlabel, ad) :: tail => | |
(for { | |
_ <- AdLabel.delete(graphApi, adlabel.id) | |
_ <- AdLabel.label(graphApi, ad, labelIneligible) | |
} yield ad).pipe(context.pipeToSelf) { | |
case Success(ad) => LabeledIneligible(ad) | |
case Failure(throwable) => FailedLabelAdPaused(throwable) | |
} | |
nominatingAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
activeAds, | |
pausedAds, | |
tail | |
) | |
case Nil => | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
activeAds, | |
pausedAds | |
) | |
} | |
} | |
def nominatingAdsForBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
activeAds: List[Ad], | |
pausedAds: List[Ad], | |
expired: List[(AdLabel, Ad)] | |
) = { | |
Behaviors.receiveMessage { | |
case LabeledIneligible(ad) => | |
context.log.info(s"$ad labeled as paused") | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
activeAds, | |
pausedAds, | |
expired | |
) | |
case FailedLabelAdPaused(throwable) => | |
context.log.error("failed to label ad as paused", throwable) | |
nominateAdsForBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
activeAds, | |
pausedAds, | |
expired | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def nominateAdsForBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
activeAds: List[Ad], | |
pausedAds: List[Ad] | |
): Behavior[Command] = { | |
contest match { | |
case Contest(current) => | |
(for { | |
c <- campaign | |
accountId <- c.accountId | |
} yield accountId) | |
.fold[Future[AdLabel]]( | |
Future.failed(new IllegalStateException("no accountId")) | |
) { accountId => | |
current | |
.fold(AdLabel.label(graphApi, accountId, id)) { | |
case AdLabel(labelId, name) => | |
for { | |
_ <- AdLabel.delete(graphApi, labelId) | |
label <- AdLabel.label(graphApi, accountId, id) | |
} yield label | |
} | |
} | |
.pipe(context.pipeToSelf) { | |
case Success(adLabel) => Contest(Some(adLabel)) | |
case Failure(throwable) => CreateAdLabelFailed(throwable) | |
} | |
} | |
Behaviors.receiveMessage { | |
case contest: Contest => | |
nominateActiveAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
activeAds, | |
pausedAds, | |
List.empty | |
) | |
case CreateAdLabelFailed(throwable) => | |
context.log.error("error creating adlabel", throwable) | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def nominateActiveAdsForContest( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
activeAds: List[Ad], | |
pausedAds: List[Ad], | |
nominated: List[Ad] | |
) = { | |
contest match { | |
case Contest(Some(current)) => | |
activeAds match { | |
case head :: tail => | |
AdLabel.label(graphApi, head, current).pipe(context.pipeToSelf) { | |
case Success(ad) => AdLabelAssociated(ad, current) | |
case Failure(throwable) => | |
FailedAssociateAdLabel(head, current, throwable) | |
} | |
nominatingActiveAdForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
tail, | |
pausedAds, | |
nominated | |
) | |
case Nil => | |
if (nominated.size < contestCriteria.numberOfActiveAds) { | |
nominatePausedAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
pausedAds.take( | |
contestCriteria.numberOfActiveAds - nominated.size | |
), | |
nominated | |
) | |
} else | |
scheduleBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
current, | |
labelIneligible | |
) | |
} | |
} | |
} | |
def scheduleBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
contest: AdLabel, | |
labelIneligible: AdLabel | |
): Behavior[Command] = { | |
contest match { | |
case AdLabel(_, name) => | |
name match { | |
case rTimestamp(timestamp) => | |
val date0 = OffsetDateTime | |
.ofInstant( | |
Instant | |
.ofEpochSecond(timestamp.toLong), | |
ZoneId.systemDefault() | |
) | |
.truncatedTo(ChronoUnit.DAYS) | |
val target = date0.plusSeconds( | |
contestCriteria.daysToRunContest * 24 * 60 * 60 | |
) | |
val date1 = OffsetDateTime.now() | |
val secondsToAdd = target.toEpochSecond - date1.toEpochSecond | |
if (secondsToAdd > 0) | |
timers.startSingleTimer( | |
BoutTimer, | |
Bout, | |
secondsToAdd.seconds | |
) | |
else | |
context.self ! Bout | |
} | |
} | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
} | |
def nominatingActiveAdForContest( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
active: List[Ad], | |
paused: List[Ad], | |
nominated: List[Ad] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case AdLabelAssociated(ad, label) => | |
nominateActiveAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
active, | |
paused, | |
ad :: nominated | |
) | |
case FailedAssociateAdLabel(ad, adlabel, throwable) => | |
context.log.error("failed to nominate ad", throwable) | |
nominateActiveAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
active, | |
paused, | |
nominated | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def nominatePausedAdsForContest( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
pausedAds: List[Ad], | |
nominated: List[Ad] | |
): Behavior[Command] = | |
contest match { | |
case Contest(Some(current)) => | |
pausedAds match { | |
case head :: tail => | |
AdLabel.label(graphApi, head, current).pipe(context.pipeToSelf) { | |
case Success(ad) => AdLabelAssociated(ad, current) | |
case Failure(throwable) => | |
FailedAssociateAdLabel(head, current, throwable) | |
} | |
nominatingPausedAdForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
tail, | |
nominated | |
) | |
case Nil => | |
scheduleBout( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
current, | |
labelIneligible | |
) | |
} | |
} | |
def nominatingPausedAdForContest( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
labelIneligible: AdLabel, | |
contest: Contest, | |
paused: List[Ad], | |
nominated: List[Ad] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case AdLabelAssociated(ad, label) => | |
nominatePausedAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
paused, | |
ad :: nominated | |
) | |
case FailedAssociateAdLabel(ad, adlabel, throwable) => | |
context.log.error("failed to nominate ad", throwable) | |
nominatePausedAdsForContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
contest, | |
paused, | |
nominated | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def idle( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
duration: FiniteDuration = | |
configuration.get[Long](APP_ADSET_ADS_REFRESH).seconds | |
): Behavior[Command] = { | |
if (timers.isTimerActive(RefreshAdsKey)) timers.cancel(RefreshAds) | |
timers.startSingleTimer( | |
RefreshAdsKey, | |
RefreshAds, | |
duration | |
) | |
active(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
} | |
def fetchAds( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
ads: List[Ad] = List.empty, | |
params: Map[String, String] = Map.empty | |
): Behavior[Command] = { | |
Ad.fetchAll( | |
graphApi, | |
id, | |
params, | |
"id", | |
"name", | |
"created_time", | |
"updated_time", | |
"status", | |
"creative{id,name}", | |
"adlabels" | |
) | |
.pipe(context.pipeToSelf) { | |
case Success(results) => | |
results match { | |
case PagedResults( | |
ads, | |
_, | |
next, | |
_ | |
) => | |
next.fold(AdsFetched(ads)) { next => | |
AdsFetched(ads, next.toMap("after", "limit")) | |
} | |
case UnpagedResults(ads, _) => AdsFetched(ads) | |
} | |
case Failure(throwable) => FetchAdsFailed(throwable) | |
} | |
fetchingAds( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
ads | |
) | |
} | |
def fetchingAds( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
ads: List[Ad] = List.empty | |
): Behavior[Command] = { | |
Behaviors.receiveMessage { | |
case AdsFetched(results, nextPageParameters) => | |
val list = results.foldLeft(ads) { (acc, ad) => ad :: acc } | |
if (nextPageParameters.nonEmpty) | |
fetchAds( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
list, | |
nextPageParameters | |
) | |
else { | |
contestRepository.current().pipe(context.pipeToSelf) { | |
case Success(current) => | |
Contest(current) | |
case Failure(throwable) => ErrorStartNominating(throwable) | |
} | |
val merged = rootCriteria.update(criteria: _*) | |
val contestCriteria = for { | |
numberOfActiveAds <- merged.numberOfActiveAds | |
numberOfAdsToCarryOver <- merged.numberOfAdsToCarryOver | |
daysOfInterContestInterval <- merged.daysOfInterContestInterval | |
daysToRunContest <- merged.daysToRunContest | |
} yield ContestCriteria( | |
numberOfActiveAds, | |
numberOfAdsToCarryOver, | |
daysOfInterContestInterval, | |
daysToRunContest | |
) | |
contestCriteria.fold( | |
buffer.unstashAll( | |
idle( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
) | |
) { contestCriteria => | |
inspect( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
labelIneligible, | |
list | |
) | |
} | |
} | |
case FetchAdsFailed(throwable) => | |
context.log.error("fetch ads failed", throwable) | |
if (timers.isTimerActive(BoutTimer)) timers.cancel(BoutTimer) | |
buffer.unstashAll( | |
idle(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def active( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case Get(replyTo) => | |
replyTo ! StatusReply.Success( | |
AdSet(id, name, None, None, startTime, endTime, campaign), | |
rootCriteria.update(criteria: _*) | |
) | |
Behaviors.same | |
case Update(startTime, endTime, rootCriteria, baseCriteria, replyTo) => | |
val updated = criteria.patch(0, baseCriteria, baseCriteria.size) | |
replyTo ! StatusReply.Success( | |
AdSet(id, name, None, None, startTime, endTime, campaign), | |
rootCriteria.update(updated: _*) | |
) | |
idle( | |
startTime, | |
endTime, | |
rootCriteria, | |
updated, | |
labelIneligible, | |
startDurationFromCriteria(rootCriteria.update(baseCriteria: _*)) | |
) | |
case UpdateRoot(rootCriteria) => | |
idle( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
startDurationFromCriteria(rootCriteria.update(criteria: _*)) | |
) | |
case RefreshAds => | |
fetchAdSet( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
case Bout => | |
startBout(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
} | |
def init( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
duration: FiniteDuration | |
): Behavior[Command] = { | |
labelForIneligibleAds(id, campaign).pipe( | |
context.pipeToSelf | |
) { | |
case Success(value) => | |
AdLabelCreated(value) | |
case Failure(throwable) => | |
CreateAdLabelFailed(throwable) | |
} | |
Behaviors.receiveMessage { | |
case AdLabelCreated(labelIneligible) => | |
buffer.unstashAll( | |
idle( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
duration | |
) | |
) | |
case CreateAdLabelFailed(throwable) => | |
context.log.error("Unable to create label", throwable) | |
throw throwable | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def startBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel | |
): Behavior[Command] = { | |
contestRepository | |
.current() | |
.pipe(context.pipeToSelf) { | |
case Success(current) => | |
Contest(current) | |
case Failure(throwable) => ErrorStartingBout(throwable) | |
} | |
startingBout(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
} | |
def startingBout( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case Contest(current) => | |
current match { | |
case Some(adlabel) => | |
fetchAdsInsights( | |
id, | |
adlabel, | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
case None => | |
context.log.error("no label to match") | |
buffer.unstashAll( | |
active( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
) | |
} | |
Behaviors.same | |
case ErrorStartingBout(throwable) => | |
context.log.error("retrieve contest failed", throwable) | |
buffer.unstashAll( | |
active(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def fetchAdsInsights( | |
id: String, | |
label: AdLabel, | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
insights: List[Insights] = List.empty, | |
params: Map[String, String] = Map.empty | |
): Behavior[Command] = { | |
AdSet | |
.insights( | |
graphApi, | |
id, | |
"", | |
"", | |
"", | |
params, | |
"ad_id", | |
"impressions" | |
) | |
.pipe(context.pipeToSelf) { | |
case Success(results) => | |
results match { | |
case PagedResults(insights, _, next, _) => | |
next.fold(InsightsFetched(insights)) { next => | |
InsightsFetched(insights, next.toMap("after", "limit")) | |
} | |
case UnpagedResults(insights, _) => InsightsFetched(insights) | |
} | |
case Failure(throwable) => FetchInsightsFailed(throwable) | |
} | |
fetchingAdsInsights( | |
id, | |
label, | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
insights | |
) | |
} | |
def runContest( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
contestCriteria: ContestCriteria, | |
current: AdLabel, | |
labelIneligible: AdLabel, | |
list: List[Insights] | |
): Behavior[Command] = { | |
val pauseList = list | |
.collect { | |
case Insights(adId, name, Some(impressions)) => | |
(adId, name, impressions) | |
} | |
.sortBy(_._3) | |
.drop(list.size - contestCriteria.numberOfAdsToCarryOver) | |
.map { | |
case (id, name, _) => Ad(id, name) | |
} | |
AdLabel | |
.timestamp( | |
graphApi, | |
current, | |
contestCriteria.daysOfInterContestInterval | |
) | |
.pipe(context.pipeToSelf) { | |
case Success(adlabel) => AdLabelCreated(adlabel) | |
case Failure(throwable) => CreateAdLabelFailed(throwable) | |
} | |
Behaviors.receiveMessage { | |
case AdLabelCreated(adlabel) => | |
pauseAd( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
adlabel, | |
pauseList | |
) | |
case CreateAdLabelFailed(throwable) => | |
context.log.error("failed to create label timestamped", throwable) | |
buffer.unstashAll( | |
active(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
def pauseAd( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
adlabel: AdLabel, | |
ads: List[Ad], | |
paused: List[Ad] = List.empty | |
): Behavior[Command] = { | |
ads match { | |
case head :: tail => | |
(for { | |
_ <- AdLabel.label(graphApi, head, adlabel) | |
result <- Ad.pause(head) | |
} yield result) | |
.pipe(context.pipeToSelf) { | |
case Success(result) => | |
AdPaused(head) | |
case Failure(throwable) => | |
PauseAdFailed(head, throwable) | |
} | |
pausingAd( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
adlabel, | |
tail, | |
paused | |
) | |
case Nil => | |
context.log.info(s"paused $paused") | |
buffer.unstashAll( | |
active(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
} | |
} | |
def pausingAd( | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
adlabel: AdLabel, | |
ads: List[Ad], | |
paused: List[Ad] = List.empty | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case AdPaused(ad) => | |
pauseAd( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
adlabel, | |
ads, | |
ad :: paused | |
) | |
case PauseAdFailed(ad, throwable) => | |
context.log.error(s"failed to pause $ad", throwable) | |
buffer.unstashAll( | |
active( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def fetchingAdsInsights( | |
id: String, | |
label: AdLabel, | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]], | |
labelIneligible: AdLabel, | |
insights: List[Insights] = List.empty | |
): Behavior[Command] = { | |
Behaviors.receiveMessage { | |
case InsightsFetched(results, nextPageParameters) => | |
val list = results.foldLeft(insights) { (acc, insights) => | |
insights :: acc | |
} | |
if (nextPageParameters.nonEmpty) | |
fetchAdsInsights( | |
id, | |
label, | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible, | |
list, | |
nextPageParameters | |
) | |
else { | |
val merged = rootCriteria.update(criteria: _*) | |
val contestCriteria = for { | |
numberOfActiveAds <- merged.numberOfActiveAds | |
numberOfAdsToCarryOver <- merged.numberOfAdsToCarryOver | |
daysOfInterContestInterval <- merged.daysOfInterContestInterval | |
daysToRunContest <- merged.daysToRunContest | |
} yield ContestCriteria( | |
numberOfActiveAds, | |
numberOfAdsToCarryOver, | |
daysOfInterContestInterval, | |
daysToRunContest | |
) | |
contestCriteria.fold( | |
buffer.unstashAll( | |
active( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
labelIneligible | |
) | |
) | |
) { contestCriteria => | |
runContest( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
contestCriteria, | |
label, | |
labelIneligible, | |
list | |
) | |
} | |
} | |
case FetchInsightsFailed(throwable) => | |
context.log.error("fetch insights failed", throwable) | |
buffer.unstashAll( | |
active(startTime, endTime, rootCriteria, criteria, labelIneligible) | |
) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
} | |
private def labelForIneligibleAds( | |
id: String, | |
campaign: Option[Campaign] | |
)(implicit | |
response: Node[Id], | |
rds: Reads[Id], | |
ec: ExecutionContext | |
): Future[AdLabel] = | |
(for { | |
c <- campaign | |
accountId <- c.accountId | |
} yield accountId) | |
.fold(Future.failed(new IllegalStateException("no accountId"))) { | |
accountId => | |
val name = s"$id:paused" | |
for { | |
id <- graphApi.publish( | |
accountId, | |
Path("adlabels"), | |
Map("name" -> Seq(name)) | |
) | |
} yield AdLabel(id, name) | |
} | |
} | |
object Actor { | |
def apply( | |
client: GraphApiClient, | |
configuration: Configuration, | |
id: String, | |
name: Option[String], | |
startTime: Option[OffsetDateTime], | |
endTime: Option[OffsetDateTime], | |
campaign: Option[Campaign], | |
rootCriteria: Criteria, | |
criteria: Seq[Option[Criteria]] | |
)(implicit | |
timeout: Timeout, | |
response: Node[Id], | |
rds: Reads[Id], | |
contestRepository: ContestRepository, | |
ec: ExecutionContext | |
): Behavior[Command] = { | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer => | |
Behaviors.setup { context => | |
Behaviors.withTimers { timers => | |
client.graphApi.fold[Behavior[Command]]( | |
throw new IllegalStateException("no access token") | |
) { graphApi => | |
new Actor( | |
context, | |
buffer, | |
timers, | |
graphApi, | |
configuration, | |
id, | |
name, | |
campaign | |
).init( | |
startTime, | |
endTime, | |
rootCriteria, | |
criteria, | |
startDurationFromCriteria(rootCriteria.update(criteria: _*)) | |
) | |
} | |
} | |
} | |
} | |
} | |
} | |
private case object BoutTimer | |
private case object Bout extends Command | |
private case object RefreshAds extends Command | |
private case object RefreshAdsKey | |
implicit val offsetDateTimeReads = facebookOffsetDateTimeReads | |
implicit val jsonReads: Reads[AdSet] = ( | |
(JsPath \ "id").read[String] and | |
(JsPath \ "name").readNullable[String] and | |
(JsPath \ "status").readNullable[String] and | |
(JsPath \ "effectiveStatus").readNullable[String] and | |
(JsPath \ "start_time").readNullable[OffsetDateTime] and | |
(JsPath \ "end_time").readNullable[OffsetDateTime] and | |
(JsPath \ "campaign").readNullable[Campaign] | |
)( | |
AdSet.apply( | |
_: String, | |
_: Option[String], | |
_: Option[String], | |
_: Option[String], | |
_: Option[OffsetDateTime], | |
_: Option[OffsetDateTime], | |
_: Option[Campaign] | |
) | |
) | |
override implicit val edge: GraphApi.Edge[AdSet] = Edge[AdSet] | |
override implicit val node: GraphApi.Node[AdSet] = Node[AdSet] | |
implicit val jsonWrites: Writes[AdSet] = | |
((JsPath \ "id").write[String] and | |
(JsPath \ "name").writeNullable[String] and | |
(JsPath \ "status").writeNullable[String] and | |
(JsPath \ "effective_status").writeNullable[String] and | |
(JsPath \ "start_time").writeNullable[OffsetDateTime] and | |
(JsPath \ "end_time").writeNullable[OffsetDateTime] and | |
(JsPath \ "campaign").writeNullable[Campaign])(unlift(AdSet.unapply)) | |
implicit class adSet2AttributeValue(adSet: AdSet) { | |
import software.amazon.awssdk.services.dynamodb.model.{ | |
Update => DynamoUpdate, | |
_ | |
} | |
def asPutItemRequest( | |
tableName: String, | |
criteria: Option[Criteria], | |
status: String | |
): TransactWriteItemsRequest = | |
TransactWriteItemsRequest | |
.builder() | |
.transactItems( | |
List( | |
TransactWriteItem | |
.builder() | |
.put( | |
Put | |
.builder() | |
.tableName(tableName) | |
.item( | |
Map | |
.empty[String, AttributeValue] | |
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}"))) | |
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}"))) | |
.pipe(_ + (ID -> S(adSet.id))) | |
.pipe(map => | |
adSet.name.fold(map) { x => map + (NAME -> S(x)) } | |
) | |
.pipe(_ + (STATUS -> S(status))) | |
.pipe(map => | |
criteria.fold(map) { x => | |
map + (CRITERIA -> x.asAttributeValue) | |
} | |
) | |
.pipe(map => | |
(for { | |
campaign <- adSet.campaign | |
accountId <- campaign.accountId | |
} yield S(s"ACCOUNT#act_$accountId")).fold(map) { x => | |
map + (GSI1PK -> x) | |
} | |
) | |
.pipe(map => | |
(for { | |
campaign <- adSet.campaign | |
accountId <- campaign.accountId | |
} yield S( | |
s"ACCOUNT#act_$accountId#CAMPAIGN#${campaign.id}#ADSET#${adSet.id}" | |
)).fold(map) { x => map + (GSI1SK -> x) } | |
) | |
.pipe(map => | |
(for { | |
campaign <- adSet.campaign | |
accountId <- campaign.accountId | |
} yield S(s"ACCOUNT#act_$accountId#ADSETS")) | |
.fold(map) { x => | |
map + (GSI2PK -> x) | |
} | |
) | |
.pipe(_ + (GSI2SK -> S(s"ADSET#${adSet.id}"))) | |
.asJava | |
) | |
.conditionExpression("attribute_not_exists(#pk)") | |
.expressionAttributeNames( | |
Map("#pk" -> PARTITION_KEY).asJava | |
) | |
.build() | |
) | |
.build(), | |
addAdSet(tableName) | |
).asJava | |
) | |
.build() | |
def addAdSet(tableName: String): TransactWriteItem = | |
TransactWriteItem | |
.builder() | |
.update( | |
( | |
List.empty[String], | |
Map("#adsets" -> "AdSets"), | |
Map(":adsets" -> SS(s"${adSet.id}")) | |
).pipe { | |
case (set, expressionNames, expressionValues) => | |
adSet.campaign.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { campaign => | |
( | |
"#id = if_not_exists(#id, :id)" :: set, | |
expressionNames + ("#id" -> ID), | |
expressionValues + (":id" -> S(campaign.id)) | |
) | |
} | |
} | |
.pipe { | |
case (set, expressionNames, expressionValues) => | |
adSet.campaign.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { campaign => | |
campaign.accountId.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { accountId => | |
( | |
"#gsi1pk = if_not_exists(#gsi1pk, :gsi1pk)" :: set, | |
expressionNames + ("#gsi1pk" -> GSI1PK), | |
expressionValues + (":gsi1pk" -> S( | |
s"ACCOUNT#act_$accountId" | |
)) | |
) | |
} | |
} | |
} | |
.pipe { | |
case (set, expressionNames, expressionValues) => | |
adSet.campaign.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { | |
campaign => | |
campaign.accountId.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { accountId => | |
( | |
"#gsi1sk = if_not_exists(#gsi1sk, :gsi1sk)" :: set, | |
expressionNames + ("#gsi1sk" -> GSI1SK), | |
expressionValues + (":gsi1sk" -> S( | |
s"ACCOUNT#act_${accountId}#CAMPAIGN#${campaign.id}" | |
)) | |
) | |
} | |
} | |
} | |
.pipe { | |
case (set, expressionNames, expressionValues) => | |
adSet.campaign.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { | |
campaign => | |
campaign.accountId.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { accountId => | |
( | |
"#gsi2pk = if_not_exists(#gsi2pk, :gsi2pk)" :: set, | |
expressionNames + ("#gsi2pk" -> GSI2PK), | |
expressionValues + (":gsi2pk" -> S( | |
s"ACCOUNT#act_${accountId}#CAMPAIGNS" | |
)) | |
) | |
} | |
} | |
} | |
.pipe { | |
case (set, expressionNames, expressionValues) => | |
adSet.campaign.fold( | |
( | |
set, | |
expressionNames, | |
expressionValues | |
) | |
) { campaign => | |
( | |
"#gsi2sk = if_not_exists(#gsi2sk, :gsi2sk)" :: set, | |
expressionNames + ("#gsi2sk" -> GSI2SK), | |
expressionValues + (":gsi2sk" -> S( | |
s"CAMPAIGN#${campaign.id}" | |
)) | |
) | |
} | |
} | |
.pipe { | |
case (set, expressionNames, expressionValues) => | |
( | |
"#status = if_not_exists(#status, :status)" :: set, | |
expressionNames + ("#status" -> "Status"), | |
expressionValues + (":status" -> S("Paused")) | |
) | |
} match { | |
case (set, expressionNames, expressionValues) => | |
val updateExpression = | |
if (set.nonEmpty) s"set ${set.foldLeft("") { (a, b) => | |
if (a.nonEmpty) s"$a,$b" else b | |
}}" | |
else "" | |
DynamoUpdate | |
.builder() | |
.tableName(tableName) | |
.key( | |
Map | |
.empty[String, AttributeValue] | |
.pipe(map => | |
adSet.campaign | |
.fold(map) { x => | |
map + (PARTITION_KEY -> S(s"CAMPAIGN#${x.id}")) | |
} | |
) | |
.pipe(map => | |
adSet.campaign | |
.fold(map) { x => | |
map + (SORT_KEY -> S(s"CAMPAIGN#${x.id}")) | |
} | |
) | |
.asJava | |
) | |
.updateExpression( | |
updateExpression + (if (updateExpression.isBlank) | |
"add #adsets :adsets" | |
else " add #adsets :adsets") | |
) | |
.expressionAttributeNames(expressionNames.asJava) | |
.expressionAttributeValues(expressionValues.asJava) | |
.build() | |
} | |
) | |
.build() | |
def asGetItemRequest(tableName: String): GetItemRequest = | |
GetItemRequest | |
.builder() | |
.tableName(tableName) | |
.key( | |
Map( | |
PARTITION_KEY -> S(s"ADSET#${adSet.id}"), | |
SORT_KEY -> S(s"ADSET#${adSet.id}") | |
).asJava | |
) | |
.attributesToGet(ID, NAME, STATUS, CRITERIA, CURRENT_CONTEST) | |
.build() | |
def asUpdateItemRequest( | |
tableName: String, | |
criteria: Option[Criteria], | |
status: String | |
): UpdateItemRequest = { | |
val builder = UpdateItemRequest | |
.builder() | |
.tableName(tableName) | |
.key( | |
Map | |
.empty[String, AttributeValue] | |
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}"))) | |
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}"))) | |
.asJava | |
) | |
( | |
List.empty[String], | |
List.empty[String], | |
Map.empty[String, String], | |
Map.empty[String, AttributeValue] | |
).pipe { | |
case (remove, set, expressionNames, expressionValues) => | |
adSet.name.fold( | |
( | |
"#name" :: remove, | |
set, | |
expressionNames + ("#name" -> "Name"), | |
expressionValues | |
) | |
) { x => | |
( | |
remove, | |
"#name = :name" :: set, | |
expressionNames + ("#name" -> "Name"), | |
expressionValues + (":name" -> S(x)) | |
) | |
} | |
} | |
.pipe { | |
case (remove, set, expressionNames, expressionValues) => | |
criteria.fold( | |
( | |
"#criteria" :: remove, | |
set, | |
expressionNames + ("#criteria" -> "Criteria"), | |
expressionValues | |
) | |
) { x => | |
( | |
remove, | |
"#criteria = :criteria" :: set, | |
expressionNames + ("#criteria" -> "Criteria"), | |
expressionValues + (":criteria" -> x.asAttributeValue) | |
) | |
} | |
} | |
.pipe { | |
case (remove, set, expressionNames, expressionValues) => | |
( | |
remove, | |
"#status = :status" :: set, | |
expressionNames + ("#status" -> "Status"), | |
expressionValues + (":status" -> S(status)) | |
) | |
} match { | |
case (remove, set, expressionNames, expressionValues) => | |
val updateExpression = | |
if (set.nonEmpty) s"set ${set.foldLeft("") { (a, b) => | |
if (a.nonEmpty) s"$a,$b" else b | |
}}" | |
else "" | |
val deleteExpression = { | |
if (remove.nonEmpty) s"remove ${remove | |
.foldLeft("") { (a, b) => if (a.nonEmpty) s"$a,$b" else b }}" | |
else "" | |
} | |
builder.updateExpression( | |
updateExpression + (if (deleteExpression.nonEmpty) | |
if (updateExpression.isBlank) | |
deleteExpression | |
else " " + deleteExpression | |
else "") | |
) | |
builder.expressionAttributeNames(expressionNames.asJava) | |
if (expressionValues.nonEmpty) | |
builder.expressionAttributeValues(expressionValues.asJava) | |
builder.build() | |
} | |
} | |
//TODO allow delete item only if there is no associated contest | |
def asDeleteItemRequest(tableName: String): TransactWriteItemsRequest = | |
TransactWriteItemsRequest | |
.builder() | |
.transactItems( | |
List( | |
TransactWriteItem | |
.builder() | |
.delete( | |
Delete | |
.builder() | |
.tableName(tableName) | |
.key( | |
Map | |
.empty[String, AttributeValue] | |
.pipe(_ + (PARTITION_KEY -> S(s"ADSET#${adSet.id}"))) | |
.pipe(_ + (SORT_KEY -> S(s"ADSET#${adSet.id}"))) | |
.asJava | |
) | |
.build() | |
) | |
.build(), | |
TransactWriteItem | |
.builder() | |
.update( | |
software.amazon.awssdk.services.dynamodb.model.Update | |
.builder() | |
.tableName(tableName) | |
.key( | |
Map | |
.empty[String, AttributeValue] | |
.pipe(map => | |
adSet.campaign | |
.fold(map) { x => | |
map + (PARTITION_KEY -> S(s"CAMPAIGN#${x.id}")) | |
} | |
) | |
.pipe(map => | |
adSet.campaign | |
.fold(map) { x => | |
map + (SORT_KEY -> S(s"CAMPAIGN#${x.id}")) | |
} | |
) | |
.asJava | |
) | |
.updateExpression("delete #adsets :adsets") | |
.expressionAttributeNames(Map("#adsets" -> "AdSets").asJava) | |
.expressionAttributeValues( | |
Map(":adsets" -> SS(s"${adSet.id}")).asJava | |
) | |
.build() | |
) | |
.build() | |
).asJava | |
) | |
.build() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment