Skip to content

Instantly share code, notes, and snippets.

@gvolpe
Created May 11, 2018 05:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gvolpe/ecce1ba7ca5481774366ad4c23c7a381 to your computer and use it in GitHub Desktop.
Save gvolpe/ecce1ba7ca5481774366ad4c23c7a381 to your computer and use it in GitHub Desktop.
package com.github.gvolpe.fs2rabbit.examples
import cats.effect.Effect
import com.github.gvolpe.fs2rabbit.config.QueueConfig
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder
import com.github.gvolpe.fs2rabbit.model._
import com.github.gvolpe.fs2rabbit.typeclasses.StreamEval
import fs2.{Pipe, Stream}
import scala.concurrent.ExecutionContext
class AutoAckConsumerDemo[F[_]: Effect](implicit F: Fs2Rabbit[F], EC: ExecutionContext, SE: StreamEval[F]) {
private val queueName = QueueName("testQ")
private val exchangeName = ExchangeName("testEX")
private val routingKey = RoutingKey("testRK")
private val queueName2 = QueueName("testQ2")
def logPipe: Pipe[F, AmqpEnvelope, AckResult] = { streamMsg =>
for {
amqpMsg <- streamMsg
_ <- SE.evalF[Unit](println(s"Consumed: $amqpMsg"))
} yield Ack(amqpMsg.deliveryTag)
}
val program: Stream[F, Unit] = F.createConnectionChannel flatMap { implicit channel =>
for {
_ <- F.declareQueue(QueueConfig.default(queueName))
_ <- F.declareExchange(exchangeName, ExchangeType.Topic)
_ <- F.bindQueue(queueName, exchangeName, routingKey)
_ <- F.declareQueue(QueueConfig.default(queueName2))
_ <- F.bindQueue(queueName2, exchangeName, routingKey)
consumer <- F.createAutoAckConsumer(queueName)
consumer2 <- F.createAutoAckConsumer(queueName2)
publisher <- F.createPublisher(exchangeName, routingKey)
result <- new AutoAckFlow(consumer, consumer2, logPipe, publisher).flow
} yield result
}
}
class AutoAckFlow[F[_]](c1: StreamConsumer[F],
c2: StreamConsumer[F],
logger: Pipe[F, AmqpEnvelope, AckResult],
publisher: StreamPublisher[F])(implicit ec: ExecutionContext, SE: StreamEval[F], F: Effect[F]) {
import io.circe.generic.auto._
case class Address(number: Int, streetName: String)
case class Person(id: Long, name: String, address: Address)
private val jsonEncoder = new Fs2JsonEncoder[F]
import jsonEncoder.jsonEncode
val simpleMessage =
AmqpMessage("Hey!", AmqpProperties(None, None, Map("demoId" -> LongVal(123), "app" -> StringVal("fs2RabbitDemo"))))
val classMessage = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty)
val flow: Stream[F, Unit] =
Stream(
Stream(simpleMessage).covary[F] to publisher,
Stream(classMessage).covary[F] through jsonEncode[Person] to publisher,
c1 through logger to SE.liftSink(ack => F.delay(println(s"consumer #1 => $ack"))),
c2 through logger to SE.liftSink(ack => F.delay(println(s"consumer #2 => $ack")))
).join(4)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment