Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active March 17, 2021 14:15
Show Gist options
  • Save hanishi/6805743f9f93784654d050fb13e01a52 to your computer and use it in GitHub Desktop.
Save hanishi/6805743f9f93784654d050fb13e01a52 to your computer and use it in GitHub Desktop.
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import scala.collection.immutable
import scala.concurrent.duration.{DurationLong, FiniteDuration}
import scala.reflect.ClassTag
trait MessageCollector[Aggregate <: Timestamp] {
type Message <: Timestamp
val receiveTimeout: Long => Message with MessageCollector.Timeout
val aggregate: (String, Option[String], List[Message]) => Aggregate
def apply(
id: String,
name: Option[String],
dst: ActorRef[Aggregate],
interval: Long
)(implicit ct: ClassTag[Message]): Behavior[Message] =
Behaviors.setup[Message] { context =>
Behaviors.withTimers { timers =>
def collecting(
messageGroups: Map[Long, immutable.IndexedSeq[
Message
]]
): Behavior[Message] =
Behaviors.receiveMessage {
case receiveTimeout: MessageCollector.Timeout =>
if (messageGroups.nonEmpty) {
val (x, y) = messageGroups.partition {
case (groupKey, _) => groupKey == receiveTimeout.epochSecond
}
for {
(_, messages) <- x
} dst ! aggregate(id, name, messages.toList)
collecting(y)
} else Behaviors.same
case message: Message =>
val timestamp = message.epochSecond
val groupKey = key(message, interval)
val delay = groupKey + interval
context.log.debug(
s"""
|### GROUP KEY ### $groupKey
|### DELAY ### $delay
|### SCHEDULED ### ${(delay - timestamp).seconds}""".stripMargin
)
timers.startSingleTimer(
receiveTimeout(groupKey),
(delay - timestamp).seconds
)
collecting(
messageGroups + messageGroups
.get(groupKey)
.fold { groupKey -> IndexedSeq(message) } { messages =>
groupKey -> (messages :+ message)
}
)
}
collecting(Map.empty)
}
}
def key(timestamp: Timestamp, interval: Long): Long =
timestamp.epochSecond - (timestamp.epochSecond % interval)
}
object MessageCollector {
trait Timeout extends Timestamp
type Aux[A <: Timestamp, B <: Timestamp] = MessageCollector[A] {
type Message = B
}
implicit object messageCollectorForAdSetCommand
extends MessageCollector[Campaign.Command with Timestamp] {
type Message = AdSet.Command with Timestamp
val receiveTimeout = timeout => AdSet.ReceiveTimeout(timeout)
val aggregate: (
String,
Option[String],
List[AdSet.Command with Timestamp]
) => Campaign.Command with Timestamp =
(id, name, messages) => Campaign.Messages(id, name, messages)
}
implicit object messageCollectorForCampaignCommand
extends MessageCollector[AdAccount.Command with Timestamp] {
type Message = Campaign.Command with Timestamp
val receiveTimeout = timeout => Campaign.ReceiveTimeout(timeout)
val aggregate: (
String,
Option[String],
List[Campaign.Command with Timestamp]
) => AdAccount.Command with Timestamp =
(id, name, messages) => AdAccount.Messages(id, name, messages)
}
implicit object messageCollectorForAdAccountCommand
extends MessageCollector[Leaderboard.Command] {
type Message = AdAccount.Command with Timestamp
val receiveTimeout = timeout => AdAccount.ReceiveTimeout(timeout)
val aggregate: (
String,
Option[String],
List[AdAccount.Command with Timestamp]
) => Leaderboard.Command with Timestamp =
(id, name, messages) => Leaderboard.Messages(id, name, messages)
}
def apply[A <: Timestamp, B <: Timestamp](
id: String,
name: Option[String],
dst: ActorRef[A],
interval: FiniteDuration
)(implicit
ct: ClassTag[B],
collector: MessageCollector.Aux[A, B]
): Behavior[B] = collector(id, name, dst, interval.toSeconds.longValue())
}
import actors.MessageCollectorSpec.{A, B}
import akka.actor.testkit.typed.scaladsl.{ActorTestKit, LogCapturing}
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import models.{MessageCollector, Timestamp}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import java.time.OffsetDateTime
import scala.concurrent.duration.DurationInt
object MessageCollectorSpec {
object A {
trait Command
case class Messages(
id: String,
name: Option[String],
message: List[B.Command]
) extends Command
with Timestamp
def apply(): Behavior[A.Command] =
Behaviors.receiveMessage {
case message: Messages =>
println(message)
Behaviors.same
}
}
implicit object messageCollectorForB
extends MessageCollector[A.Command with Timestamp] {
type Message = B.Command with Timestamp
val receiveTimeout = timeout => B.ReceiveTimeout(timeout)
val aggregate: (
String,
Option[String],
List[B.Command with Timestamp]
) => A.Command with Timestamp =
(id, name, messages) => A.Messages(id, name, messages)
}
object B {
trait Command
case class ReceiveTimeout(timeout: Long)
extends Command
with MessageCollector.Timeout {
override val epochSecond: Long = timeout
}
case class Message(message: String) extends Command with Timestamp
case class Messages(
id: String,
name: Option[String],
message: List[B.Command]
) extends Command
with Timestamp
def apply(): Behavior[A.Command] =
Behaviors.receiveMessage {
case message: Messages =>
println(message)
Behaviors.same
}
}
object C {
trait Command
case class ReceiveTimeout(timeout: Long)
extends Command
with MessageCollector.Timeout {
override val epochSecond: Long = timeout
}
case class Messages(
id: String,
name: Option[String],
message: List[B.Command]
) extends Command
with Timestamp
def apply(): Behavior[A.Command] =
Behaviors.receiveMessage {
case message: Messages =>
println(message)
Behaviors.same
}
}
}
class MessageCollectorSpec
extends AnyWordSpec
with BeforeAndAfterAll
with LogCapturing
with Matchers {
val testKit = ActorTestKit()
"A MessageCollector" must {
"release 2 batches" in {
val a = testKit.createTestProbe[A.Command]()
val collector = testKit.spawn(
MessageCollector("1234567890", Some("foo"), a.ref, 1.minutes)
)
val t = new Thread {
override def run(): Unit = {
val current = System.currentTimeMillis()
Thread.sleep(60000 - (current % 60000))
println(OffsetDateTime.now().toEpochSecond)
collector ! B.Message("Hello I am B1!")
Thread.sleep(10000)
collector ! B.Message("Hello I am B2!")
Thread.sleep(10000)
collector ! B.Message("Hello I am B3!")
Thread.sleep(45000)
collector ! B.Message(
"Hello I am B4!"
) //<- this will be in the second group
}
}
t.start()
Thread.sleep(60000)
a.expectMessage(
2.minutes,
A.Messages(
"1234567890",
Some("foo"),
List(
B.Message("Hello I am B1!"),
B.Message("Hello I am B2!"),
B.Message("Hello I am B3!")
)
)
)
a.expectMessage(
2.minutes,
A.Messages(
"1234567890",
Some("foo"),
List(
B.Message("Hello I am B4!")
)
)
)
println(OffsetDateTime.now().toEpochSecond)
}
}
override def afterAll(): Unit = testKit.shutdownTestKit()
}
...
trait Timestamp { val epochSecond: Long }
...
...
private case class WrappedAdSetCommand(command: AdSet.Command with Timestamp)
extends Command
private class Actor(
context: ActorContext[Command],
parent: ActorRef[Campaign.Command with Timestamp]
){
val adSetCommandMapper =
context.messageAdapter(rsp => WrappedAdSetCommand(rsp))
val collector =
context.spawnAnonymous(MessageCollector(id, name, parent, 5.minutes))
...
...
private case class WrappedCampaignCommand(
command: Campaign.Command with Timestamp
) extends Command
private class Actor(
context: ActorContext[Command],
parent: ActorRef[AdAccount.Command with Timestamp]
){
val campaignCommandMapper: ActorRef[Campaign.Command with Timestamp] =
context.messageAdapter(rsp => WrappedCampaignCommand(rsp))
val collector =
context.spawnAnonymous(MessageCollector(id, name, parent, 5.minutes))
...
...
case wrapped: WrappedAdSetCommand =>
collector ! wrapped.command
Behaviors.same
...
...
case wrapped: WrappedCampaignCommand =>
collector ! wrapped.command
Behaviors.same
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment