Skip to content

Instantly share code, notes, and snippets.

@slavaschmidt
Created January 9, 2021 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slavaschmidt/8b8f8a1ed550c5a2032992ff6d012afa to your computer and use it in GitHub Desktop.
Save slavaschmidt/8b8f8a1ed550c5a2032992ff6d012afa to your computer and use it in GitHub Desktop.
object main {
implicit val sharding: ClusterSharding = ClusterSharding(system)
val cluster: Cluster = Cluster(system)
val selfMember = cluster.selfMember
if (selfMember.hasRole("intake")) {
val messageExtractor = new HashCodeNoEnvelopeMessageExtractor[PollingCommand](numberOfShards = 1) {
override def entityId(message: PollingCommand): String = message.getClass.getSimpleName
}
sharding.init(Entity(DbPolling.InsertsEntityKey) { _ =>
DbPolling.behavior(config, DbPolling.InsertsEntityKey)
}.withSettings(ClusterShardingSettings(system)).withMessageExtractor(messageExtractor))
sharding.init(Entity(DbPolling.UpdatesEntityKey) { _ =>
DbPolling.behavior(config, DbPolling.UpdatesEntityKey)
}.withSettings(ClusterShardingSettings(system)).withMessageExtractor(messageExtractor))
sharding.entityRefFor(DbPolling.InsertsEntityKey, DbPolling.EntityId).tell(PollFirstInsertBatch)
sharding.entityRefFor(DbPolling.UpdatesEntityKey, DbPolling.EntityId).tell(PollFirstUpdateBatch)
}
}
object DbPolling {
val InsertsEntityKey: EntityTypeKey[PollingCommand] = EntityTypeKey[PollingCommand]("InsertsPollingCommand")
val UpdatesEntityKey: EntityTypeKey[PollingCommand] = EntityTypeKey[PollingCommand]("UpdatesPollingCommand")
val EntityId = "DB_POLLING"
def behavior(config: Config, entityKey: EntityTypeKey[PollingCommand])(implicit sharding: ClusterSharding): Behavior[PollingCommand] =
Behaviors
.supervise(behaviorForConfig(config, entityKey))
.onFailure[Exception](SupervisorStrategy.restartWithBackoff(onReconnectMinBackoff, onReconnectMaxBackoff, onReconnectRandomFactor))
private def behaviorForConfig(config: Config, entityKey: EntityTypeKey[PollingCommand])(implicit sharding: ClusterSharding): Behavior[PollingCommand] =
Behaviors.withTimers { timers =>
Behaviors.setup { ctx: ActorContext[PollingCommand] =>
val query = connect(config)
EventSourcedBehavior
.withEnforcedReplies[PollingCommand, PollingEvent, PollingState](
persistenceId = PersistenceId(entityKey.name, EntityId),
emptyState = EmptyPollingState("self"),
commandHandler = commandHandler(ctx, timers, query),
eventHandler = eventHandler)
.withRetention(RetentionCriteria.snapshotEvery(60, 5).withDeleteEventsOnSnapshot)
.onPersistFailure(SupervisorStrategy.restartWithBackoff(onReconnectMinBackoff, onReconnectMaxBackoff, onReconnectRandomFactor))
.receiveSignal {
case (_, signal) if signal == PreRestart =>
timers.cancelAll()
query.connection.close()
ctx.log.info(s"DbQuery $signal: {}", ctx.self.path)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment