Last active
March 17, 2021 14:15
-
-
Save hanishi/6805743f9f93784654d050fb13e01a52 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.{ActorRef, Behavior} | |
import scala.collection.immutable | |
import scala.concurrent.duration.{DurationLong, FiniteDuration} | |
import scala.reflect.ClassTag | |
trait MessageCollector[Aggregate <: Timestamp] { | |
type Message <: Timestamp | |
val receiveTimeout: Long => Message with MessageCollector.Timeout | |
val aggregate: (String, Option[String], List[Message]) => Aggregate | |
def apply( | |
id: String, | |
name: Option[String], | |
dst: ActorRef[Aggregate], | |
interval: Long | |
)(implicit ct: ClassTag[Message]): Behavior[Message] = | |
Behaviors.setup[Message] { context => | |
Behaviors.withTimers { timers => | |
def collecting( | |
messageGroups: Map[Long, immutable.IndexedSeq[ | |
Message | |
]] | |
): Behavior[Message] = | |
Behaviors.receiveMessage { | |
case receiveTimeout: MessageCollector.Timeout => | |
if (messageGroups.nonEmpty) { | |
val (x, y) = messageGroups.partition { | |
case (groupKey, _) => groupKey == receiveTimeout.epochSecond | |
} | |
for { | |
(_, messages) <- x | |
} dst ! aggregate(id, name, messages.toList) | |
collecting(y) | |
} else Behaviors.same | |
case message: Message => | |
val timestamp = message.epochSecond | |
val groupKey = key(message, interval) | |
val delay = groupKey + interval | |
context.log.debug( | |
s""" | |
|### GROUP KEY ### $groupKey | |
|### DELAY ### $delay | |
|### SCHEDULED ### ${(delay - timestamp).seconds}""".stripMargin | |
) | |
timers.startSingleTimer( | |
receiveTimeout(groupKey), | |
(delay - timestamp).seconds | |
) | |
collecting( | |
messageGroups + messageGroups | |
.get(groupKey) | |
.fold { groupKey -> IndexedSeq(message) } { messages => | |
groupKey -> (messages :+ message) | |
} | |
) | |
} | |
collecting(Map.empty) | |
} | |
} | |
def key(timestamp: Timestamp, interval: Long): Long = | |
timestamp.epochSecond - (timestamp.epochSecond % interval) | |
} | |
object MessageCollector { | |
trait Timeout extends Timestamp | |
type Aux[A <: Timestamp, B <: Timestamp] = MessageCollector[A] { | |
type Message = B | |
} | |
implicit object messageCollectorForAdSetCommand | |
extends MessageCollector[Campaign.Command with Timestamp] { | |
type Message = AdSet.Command with Timestamp | |
val receiveTimeout = timeout => AdSet.ReceiveTimeout(timeout) | |
val aggregate: ( | |
String, | |
Option[String], | |
List[AdSet.Command with Timestamp] | |
) => Campaign.Command with Timestamp = | |
(id, name, messages) => Campaign.Messages(id, name, messages) | |
} | |
implicit object messageCollectorForCampaignCommand | |
extends MessageCollector[AdAccount.Command with Timestamp] { | |
type Message = Campaign.Command with Timestamp | |
val receiveTimeout = timeout => Campaign.ReceiveTimeout(timeout) | |
val aggregate: ( | |
String, | |
Option[String], | |
List[Campaign.Command with Timestamp] | |
) => AdAccount.Command with Timestamp = | |
(id, name, messages) => AdAccount.Messages(id, name, messages) | |
} | |
implicit object messageCollectorForAdAccountCommand | |
extends MessageCollector[Leaderboard.Command] { | |
type Message = AdAccount.Command with Timestamp | |
val receiveTimeout = timeout => AdAccount.ReceiveTimeout(timeout) | |
val aggregate: ( | |
String, | |
Option[String], | |
List[AdAccount.Command with Timestamp] | |
) => Leaderboard.Command with Timestamp = | |
(id, name, messages) => Leaderboard.Messages(id, name, messages) | |
} | |
def apply[A <: Timestamp, B <: Timestamp]( | |
id: String, | |
name: Option[String], | |
dst: ActorRef[A], | |
interval: FiniteDuration | |
)(implicit | |
ct: ClassTag[B], | |
collector: MessageCollector.Aux[A, B] | |
): Behavior[B] = collector(id, name, dst, interval.toSeconds.longValue()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import actors.MessageCollectorSpec.{A, B} | |
import akka.actor.testkit.typed.scaladsl.{ActorTestKit, LogCapturing} | |
import akka.actor.typed.Behavior | |
import akka.actor.typed.scaladsl.Behaviors | |
import models.{MessageCollector, Timestamp} | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.matchers.should.Matchers | |
import org.scalatest.wordspec.AnyWordSpec | |
import java.time.OffsetDateTime | |
import scala.concurrent.duration.DurationInt | |
object MessageCollectorSpec { | |
object A { | |
trait Command | |
case class Messages( | |
id: String, | |
name: Option[String], | |
message: List[B.Command] | |
) extends Command | |
with Timestamp | |
def apply(): Behavior[A.Command] = | |
Behaviors.receiveMessage { | |
case message: Messages => | |
println(message) | |
Behaviors.same | |
} | |
} | |
implicit object messageCollectorForB | |
extends MessageCollector[A.Command with Timestamp] { | |
type Message = B.Command with Timestamp | |
val receiveTimeout = timeout => B.ReceiveTimeout(timeout) | |
val aggregate: ( | |
String, | |
Option[String], | |
List[B.Command with Timestamp] | |
) => A.Command with Timestamp = | |
(id, name, messages) => A.Messages(id, name, messages) | |
} | |
object B { | |
trait Command | |
case class ReceiveTimeout(timeout: Long) | |
extends Command | |
with MessageCollector.Timeout { | |
override val epochSecond: Long = timeout | |
} | |
case class Message(message: String) extends Command with Timestamp | |
case class Messages( | |
id: String, | |
name: Option[String], | |
message: List[B.Command] | |
) extends Command | |
with Timestamp | |
def apply(): Behavior[A.Command] = | |
Behaviors.receiveMessage { | |
case message: Messages => | |
println(message) | |
Behaviors.same | |
} | |
} | |
object C { | |
trait Command | |
case class ReceiveTimeout(timeout: Long) | |
extends Command | |
with MessageCollector.Timeout { | |
override val epochSecond: Long = timeout | |
} | |
case class Messages( | |
id: String, | |
name: Option[String], | |
message: List[B.Command] | |
) extends Command | |
with Timestamp | |
def apply(): Behavior[A.Command] = | |
Behaviors.receiveMessage { | |
case message: Messages => | |
println(message) | |
Behaviors.same | |
} | |
} | |
} | |
class MessageCollectorSpec | |
extends AnyWordSpec | |
with BeforeAndAfterAll | |
with LogCapturing | |
with Matchers { | |
val testKit = ActorTestKit() | |
"A MessageCollector" must { | |
"release 2 batches" in { | |
val a = testKit.createTestProbe[A.Command]() | |
val collector = testKit.spawn( | |
MessageCollector("1234567890", Some("foo"), a.ref, 1.minutes) | |
) | |
val t = new Thread { | |
override def run(): Unit = { | |
val current = System.currentTimeMillis() | |
Thread.sleep(60000 - (current % 60000)) | |
println(OffsetDateTime.now().toEpochSecond) | |
collector ! B.Message("Hello I am B1!") | |
Thread.sleep(10000) | |
collector ! B.Message("Hello I am B2!") | |
Thread.sleep(10000) | |
collector ! B.Message("Hello I am B3!") | |
Thread.sleep(45000) | |
collector ! B.Message( | |
"Hello I am B4!" | |
) //<- this will be in the second group | |
} | |
} | |
t.start() | |
Thread.sleep(60000) | |
a.expectMessage( | |
2.minutes, | |
A.Messages( | |
"1234567890", | |
Some("foo"), | |
List( | |
B.Message("Hello I am B1!"), | |
B.Message("Hello I am B2!"), | |
B.Message("Hello I am B3!") | |
) | |
) | |
) | |
a.expectMessage( | |
2.minutes, | |
A.Messages( | |
"1234567890", | |
Some("foo"), | |
List( | |
B.Message("Hello I am B4!") | |
) | |
) | |
) | |
println(OffsetDateTime.now().toEpochSecond) | |
} | |
} | |
override def afterAll(): Unit = testKit.shutdownTestKit() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
... | |
trait Timestamp { val epochSecond: Long } | |
... | |
... | |
private case class WrappedAdSetCommand(command: AdSet.Command with Timestamp) | |
extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
parent: ActorRef[Campaign.Command with Timestamp] | |
){ | |
val adSetCommandMapper = | |
context.messageAdapter(rsp => WrappedAdSetCommand(rsp)) | |
val collector = | |
context.spawnAnonymous(MessageCollector(id, name, parent, 5.minutes)) | |
... | |
... | |
private case class WrappedCampaignCommand( | |
command: Campaign.Command with Timestamp | |
) extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
parent: ActorRef[AdAccount.Command with Timestamp] | |
){ | |
val campaignCommandMapper: ActorRef[Campaign.Command with Timestamp] = | |
context.messageAdapter(rsp => WrappedCampaignCommand(rsp)) | |
val collector = | |
context.spawnAnonymous(MessageCollector(id, name, parent, 5.minutes)) | |
... | |
... | |
case wrapped: WrappedAdSetCommand => | |
collector ! wrapped.command | |
Behaviors.same | |
... | |
... | |
case wrapped: WrappedCampaignCommand => | |
collector ! wrapped.command | |
Behaviors.same | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment