Created
May 11, 2018 05:26
-
-
Save gvolpe/ecce1ba7ca5481774366ad4c23c7a381 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.gvolpe.fs2rabbit.examples | |
import cats.effect.Effect | |
import com.github.gvolpe.fs2rabbit.config.QueueConfig | |
import com.github.gvolpe.fs2rabbit.interpreter.Fs2Rabbit | |
import com.github.gvolpe.fs2rabbit.json.Fs2JsonEncoder | |
import com.github.gvolpe.fs2rabbit.model._ | |
import com.github.gvolpe.fs2rabbit.typeclasses.StreamEval | |
import fs2.{Pipe, Stream} | |
import scala.concurrent.ExecutionContext | |
class AutoAckConsumerDemo[F[_]: Effect](implicit F: Fs2Rabbit[F], EC: ExecutionContext, SE: StreamEval[F]) { | |
private val queueName = QueueName("testQ") | |
private val exchangeName = ExchangeName("testEX") | |
private val routingKey = RoutingKey("testRK") | |
private val queueName2 = QueueName("testQ2") | |
def logPipe: Pipe[F, AmqpEnvelope, AckResult] = { streamMsg => | |
for { | |
amqpMsg <- streamMsg | |
_ <- SE.evalF[Unit](println(s"Consumed: $amqpMsg")) | |
} yield Ack(amqpMsg.deliveryTag) | |
} | |
val program: Stream[F, Unit] = F.createConnectionChannel flatMap { implicit channel => | |
for { | |
_ <- F.declareQueue(QueueConfig.default(queueName)) | |
_ <- F.declareExchange(exchangeName, ExchangeType.Topic) | |
_ <- F.bindQueue(queueName, exchangeName, routingKey) | |
_ <- F.declareQueue(QueueConfig.default(queueName2)) | |
_ <- F.bindQueue(queueName2, exchangeName, routingKey) | |
consumer <- F.createAutoAckConsumer(queueName) | |
consumer2 <- F.createAutoAckConsumer(queueName2) | |
publisher <- F.createPublisher(exchangeName, routingKey) | |
result <- new AutoAckFlow(consumer, consumer2, logPipe, publisher).flow | |
} yield result | |
} | |
} | |
class AutoAckFlow[F[_]](c1: StreamConsumer[F], | |
c2: StreamConsumer[F], | |
logger: Pipe[F, AmqpEnvelope, AckResult], | |
publisher: StreamPublisher[F])(implicit ec: ExecutionContext, SE: StreamEval[F], F: Effect[F]) { | |
import io.circe.generic.auto._ | |
case class Address(number: Int, streetName: String) | |
case class Person(id: Long, name: String, address: Address) | |
private val jsonEncoder = new Fs2JsonEncoder[F] | |
import jsonEncoder.jsonEncode | |
val simpleMessage = | |
AmqpMessage("Hey!", AmqpProperties(None, None, Map("demoId" -> LongVal(123), "app" -> StringVal("fs2RabbitDemo")))) | |
val classMessage = AmqpMessage(Person(1L, "Sherlock", Address(212, "Baker St")), AmqpProperties.empty) | |
val flow: Stream[F, Unit] = | |
Stream( | |
Stream(simpleMessage).covary[F] to publisher, | |
Stream(classMessage).covary[F] through jsonEncode[Person] to publisher, | |
c1 through logger to SE.liftSink(ack => F.delay(println(s"consumer #1 => $ack"))), | |
c2 through logger to SE.liftSink(ack => F.delay(println(s"consumer #2 => $ack"))) | |
).join(4) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment