Skip to content

Instantly share code, notes, and snippets.

@MarkRBM
Last active April 24, 2019 19:23
Show Gist options
  • Save MarkRBM/69149a20dd62e97c13bb3ebbe453ccb0 to your computer and use it in GitHub Desktop.
Save MarkRBM/69149a20dd62e97c13bb3ebbe453ccb0 to your computer and use it in GitHub Desktop.
http4s websockets with fs2
final class EntityEventsBroker(config: ServerConfig)(
implicit ec: ExecutionContext
) extends EntityEventsStream[IO] {
def consume: Stream[IO, (Key, Value)] =
consumeProcessAndCommit[IO](
TopicSubscription(Set(config.kafkaConfig.topicName)),
KeySerde.deserializer(),
ValueSerde.deserializer(),
config.kafkaConfig.getConsumerSettings
)(processRecord)
private def processRecord(
r: ConsumerRecord[Key, Value]
): IO[(Key, Value)] = IO.pure {
(r.key(), r.value())
}
}
final class EntityEvents[F[_]](eeStream: EntityEventsStream[F])(
implicit F: Monad[F]
) {
def unfilteredStream: Stream[F, (IOfficeAppKeyAvro, EntityEventsValueAvro)] =
eeStream.consume
def filteredStream(
forAppcode: String,
forEntities: List[EntityType],
forEvents: List[EventType]
): Stream[F, EntityEventsValueAvro] = unfilteredStream.filter{ case (key, value) => {
val correctAppCode = key.getAppCode == forAppcode
val correctEntity = forEntities.foldLeft(true)((acc, entityType) => entityType == value.getEntity && acc)
val correctEvent = forEvents.foldLeft(true)((acc, entityEvent) => entityEvent == value.getEventType && acc)
println(s"forAppCode: $forAppcode, forEntities: $forEntities, forEventTypes: $forEvents}")
println(s"appCodeReceived: ${key.getAppCode}, entityReceived: ${value.getEntity}, eventTypeReceived: ${value.getEventType}")
println(s"correctAppCode: $correctAppCode, correctEntity: $correctEntity, correctEvent: $correctEvent")
correctAppCode && correctEntity && correctEvent
}}.map(_._2)
}
object HttpService[F[_]] extends Http4sDsl[F] {
import ParamMatchers._
def getService(scheduler: Scheduler, eeBroker: EntityEvents[F])(
implicit F: Effect[F]
): HttpService[F] = HttpService[F] {
case GET -> Root / "entity-events" :? AppCodeQueryParameterMatch(appCode) +& OptionalEntityTypesParameterMatcher(parsedEntities) +& OptionalEventTypesParameterMatcher(parsedEvents) =>
(parsedEntities.map(_.toEither), parsedEvents.map(_.toEither)) match {
case (None, None) => createFilteredEntityEventStream(eeBroker, scheduler, appCode, List.empty, List.empty)
case (None, Some(Right(events))) => createFilteredEntityEventStream(eeBroker, scheduler, appCode, List.empty, events)
case (Some(Right(entities)), None) => createFilteredEntityEventStream(eeBroker,scheduler, appCode, entities, List.empty)
case (Some(Left(err)), _) => BadRequest(s"$err")
case (_, Some(Left(err))) => BadRequest(s"$err")
case (Some(Right(entities)), Some(Right(events))) => createFilteredEntityEventStream(eeBroker, scheduler, appCode, entities, events)
}
}
private def createFilteredEntityEventStream[F[_]](eeBroker: EntityEvents[F], scheduler: Scheduler, appCode: String, entities: List[EntityType], events: List[EventType])(implicit F:Effect[F]) = {
case class EntityEvent(`type`: String, entity: String, entityId: Long)
val avroClassToEntityEvent: Pipe[F, EntityEventsValueAvro, EntityEvent] =
_.collect {
case eeva: EntityEventsValueAvro => {
EntityEvent(
eeva.getEventType.toString,
eeva.getEntity.toString,
eeva.getId
)
}
}
val entityEventToJsonText: Pipe[F, EntityEvent, Text] = _.collect {
case ee: EntityEvent => Text(ee.asJson.toString)
}
val keepAlive: Stream[F, Ping] =
scheduler.awakeEvery[F](5.seconds).map(d => Ping(Array.empty))
val fromKafka: Stream[F, Text] = eeBroker.filteredStream(appCode, entities, events)
.through(avroClassToEntityEvent)
.through(entityEventToJsonText)
val processClientMessage: Pipe[F, WebSocketFrame, WebSocketFrame] =
_.collect {
case Text(msg, _) => Text("You sent the server: " + msg)
case p: Ping => p
case _ => Text("Something new")
}
val queue = async.unboundedQueue[F, WebSocketFrame]
queue.flatMap { q =>
val fromClient = q.dequeue
val fromClientMergedWithPings =
fromClient.merge(keepAlive.merge(fromKafka))
val toClient = fromClientMergedWithPings
WebSocketBuilder[F].build(toClient, q.enqueue)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment