Skip to content

Instantly share code, notes, and snippets.

@mcarolan
Created February 7, 2017 21:10
Show Gist options
  • Save mcarolan/4a2cd0c03cd261c498fac45d9e1c2cb3 to your computer and use it in GitHub Desktop.
Save mcarolan/4a2cd0c03cd261c498fac45d9e1c2cb3 to your computer and use it in GitHub Desktop.
import java.net.Socket
import com.itv.lifecycle.Lifecycle
import itv.bucky._
import itv.bucky.decl.{DeclarationLifecycle, Exchange, Queue}
import itv.bucky.pattern.requeue._
import PublishCommandBuilder._
import itv.bucky.PayloadMarshaller.StringPayloadMarshaller
import itv.bucky.Unmarshaller.StringPayloadUnmarshaller
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
object Shared {
val makeNoiseRoutingKey = RoutingKey("make-noise")
val catNoise = Queue(QueueName("cat.noise"))
val catExchange = Exchange(ExchangeName("cat")).binding(makeNoiseRoutingKey -> catNoise.name)
val config = AmqpClientConfig("33.33.33.11", 5672, "guest", "guest")
case class CatNoise(volume: Int, pitch: Int)
val catNoiseMarshaller: PayloadMarshaller[CatNoise] =
StringPayloadMarshaller contramap { catNoise =>
s"${catNoise.volume},${catNoise.pitch}"
}
val catNoiseUnmarshaller: PayloadUnmarshaller[CatNoise] =
StringPayloadUnmarshaller flatMap Unmarshaller.liftResult { catNoise =>
catNoise.split(",") match {
case Array(volumeString, pitchString) if volumeString.forall(_.isDigit) && pitchString.forall(_.isDigit) =>
UnmarshalResult.Success(CatNoise(volumeString.toInt, pitchString.toInt))
case _ =>
UnmarshalResult.Failure(catNoise + " wasn't a valid cat noise")
}
}
}
object Publisher extends App {
import Shared._
val noisePublisherConfig =
publishCommandBuilder(catNoiseMarshaller) using catExchange.name using makeNoiseRoutingKey
val publisherLifecycle =
for {
client <- AmqpClientLifecycle(config)
_ <- DeclarationLifecycle(Seq(catExchange), client)
noisePublisher <- client.publisherOf(noisePublisherConfig)
}
yield noisePublisher
Lifecycle.using(publisherLifecycle) { noisePublisher =>
Await.result(noisePublisher(CatNoise(1000000, 99)), 1.second)
println("Starting publisher")
}
}
object Consumer extends App {
import Shared._
val catNoiseHandler: RequeueHandler[CatNoise] = new RequeueHandler[CatNoise] {
override def apply(message: CatNoise): Future[RequeueConsumeAction] =
Future {
val socket = new Socket("localhost", 12345)
socket.getOutputStream.write(s"make a noise at pitch ${message.pitch}, it should be ${message.volume} loud".getBytes)
socket.close()
Ack
}
}
val consumerLifecycle =
for {
client <- AmqpClientLifecycle(config)
_ <- DeclarationLifecycle(requeueDeclarations(catNoise.name, makeNoiseRoutingKey), client)
_ <- client.requeueHandlerOf(catNoise.name, catNoiseHandler, RequeuePolicy(maximumProcessAttempts = 3, 5.seconds), catNoiseUnmarshaller)
}
yield ()
println("starting consumer...")
consumerLifecycle.runUntilJvmShutdown()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment