Created
February 7, 2017 21:10
-
-
Save mcarolan/4a2cd0c03cd261c498fac45d9e1c2cb3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.Socket | |
import com.itv.lifecycle.Lifecycle | |
import itv.bucky._ | |
import itv.bucky.decl.{DeclarationLifecycle, Exchange, Queue} | |
import itv.bucky.pattern.requeue._ | |
import PublishCommandBuilder._ | |
import itv.bucky.PayloadMarshaller.StringPayloadMarshaller | |
import itv.bucky.Unmarshaller.StringPayloadUnmarshaller | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.{Await, Future} | |
object Shared { | |
val makeNoiseRoutingKey = RoutingKey("make-noise") | |
val catNoise = Queue(QueueName("cat.noise")) | |
val catExchange = Exchange(ExchangeName("cat")).binding(makeNoiseRoutingKey -> catNoise.name) | |
val config = AmqpClientConfig("33.33.33.11", 5672, "guest", "guest") | |
case class CatNoise(volume: Int, pitch: Int) | |
val catNoiseMarshaller: PayloadMarshaller[CatNoise] = | |
StringPayloadMarshaller contramap { catNoise => | |
s"${catNoise.volume},${catNoise.pitch}" | |
} | |
val catNoiseUnmarshaller: PayloadUnmarshaller[CatNoise] = | |
StringPayloadUnmarshaller flatMap Unmarshaller.liftResult { catNoise => | |
catNoise.split(",") match { | |
case Array(volumeString, pitchString) if volumeString.forall(_.isDigit) && pitchString.forall(_.isDigit) => | |
UnmarshalResult.Success(CatNoise(volumeString.toInt, pitchString.toInt)) | |
case _ => | |
UnmarshalResult.Failure(catNoise + " wasn't a valid cat noise") | |
} | |
} | |
} | |
object Publisher extends App { | |
import Shared._ | |
val noisePublisherConfig = | |
publishCommandBuilder(catNoiseMarshaller) using catExchange.name using makeNoiseRoutingKey | |
val publisherLifecycle = | |
for { | |
client <- AmqpClientLifecycle(config) | |
_ <- DeclarationLifecycle(Seq(catExchange), client) | |
noisePublisher <- client.publisherOf(noisePublisherConfig) | |
} | |
yield noisePublisher | |
Lifecycle.using(publisherLifecycle) { noisePublisher => | |
Await.result(noisePublisher(CatNoise(1000000, 99)), 1.second) | |
println("Starting publisher") | |
} | |
} | |
object Consumer extends App { | |
import Shared._ | |
val catNoiseHandler: RequeueHandler[CatNoise] = new RequeueHandler[CatNoise] { | |
override def apply(message: CatNoise): Future[RequeueConsumeAction] = | |
Future { | |
val socket = new Socket("localhost", 12345) | |
socket.getOutputStream.write(s"make a noise at pitch ${message.pitch}, it should be ${message.volume} loud".getBytes) | |
socket.close() | |
Ack | |
} | |
} | |
val consumerLifecycle = | |
for { | |
client <- AmqpClientLifecycle(config) | |
_ <- DeclarationLifecycle(requeueDeclarations(catNoise.name, makeNoiseRoutingKey), client) | |
_ <- client.requeueHandlerOf(catNoise.name, catNoiseHandler, RequeuePolicy(maximumProcessAttempts = 3, 5.seconds), catNoiseUnmarshaller) | |
} | |
yield () | |
println("starting consumer...") | |
consumerLifecycle.runUntilJvmShutdown() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment