Skip to content

Instantly share code, notes, and snippets.

@kciesielski
Last active September 23, 2019 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kciesielski/4598ec80204882ff82f822576b845412 to your computer and use it in GitHub Desktop.
Save kciesielski/4598ec80204882ff82f822576b845412 to your computer and use it in GitHub Desktop.
How to stream from Cassandra and pass the context around using FlowWithContext
case class MsgMetadata(offset: query.Offset, persistenceId: String, seqNum: Long)
trait EventsJournalOffsetDao {
def offsetFor(projection: ProjectionId): Future[Option[query.Offset]]
def saveOffset(projection: ProjectionId, currentOffset: query.Offset): Future[Unit]
}
class EventsStreamFactory(
config: EventsStreamConfig,
val projectionId: ProjectionId,
projectionFlow: FlowWithContext[AppEvent, MsgMetadata, AppEvent, MsgMetadata, NotUsed],
journalOffsetDao: EventsJournalOffsetDao
) {
def createStream(eventJournal: CassandraReadJournal): Source[Unit, NotUsed] = {
RestartSource
.withBackoff(minBackoff = 200.millis, maxBackoff = 3.seconds, randomFactor = 0.2) { () =>
SourceWithContext
.fromTuples(
Source
.fromFuture(journalOffsetDao.offsetFor(projectionId))
.map(_.getOrElse(query.Offset.timeBasedUUID(eventJournal.firstOffset)))
.flatMapConcat { currentOffset =>
eventJournal
.eventsByTag("MyTag", currentOffset)
}
.collect {
case EventEnvelope(offset, persistenceId, sequenceNr, event: AppEvent) =>
(event, MsgMetadata(offset, persistenceId, sequenceNr))
}
)
.via(projectionFlow)
.asSource
.mapAsync(1) {
case (_, context) => journalOffsetDao.saveOffset(projectionId, context.offset)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment