Skip to content

Instantly share code, notes, and snippets.

@piotrga
Created January 23, 2012 17:13
Show Gist options
  • Save piotrga/1664313 to your computer and use it in GitHub Desktop.
Save piotrga/1664313 to your computer and use it in GitHub Desktop.
Producer - use-case
import akka.actor.{ ActorRef, Terminated, AllForOneStrategy, Actor, Props, ActorSystem }
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultMessage
import org.apache.camel.{ Exchange, Processor}
import org.apache.camel.spi.Synchronization
import akka.camel.{Message, Failure, CamelExtension}
import akka.dispatch.Await
object CamelRoutes extends App{
val system = ActorSystem("Test")
val other = system.actorOf(Props(ctx => {case m=> println("Other: "+m)}))
val camel = CamelExtension(system)
camel.context.addRoutes(
new RouteBuilder(){
def configure() {
from("direct:a").to("direct:out").transform(body().prepend("TRANSFORMED<").append(">"))
.choice()
.when(header("respondTo").isEqualTo("c")).to("direct:c")
.when(header("respondTo").isEqualTo("d")).to("direct:d")
.otherwise().process(new Processor { def process(exchange: Exchange) { other ! exchange.getIn.getBody}})
from("direct:c").process(new Processor { def process(exchange: Exchange) { println("C:" + exchange.getIn.getBody)}})
from("direct:d").process(new Processor { def process(exchange: Exchange) { println("D:" + exchange.getIn.getBody)}})
from("direct:out").process(new Processor {
def process(exchange: Exchange) {
val body = exchange.getIn.getBody.toString
println("out:" + body);
val response = new DefaultMessage
response.setBody("received: '%s'" format body)
if (!body.contains("boomerang")) response.setHeader("respondTo", if (body.length() % 2 == 0) "c" else "d")
exchange.setOut(response)
}
})
}
})
val producer = system.actorOf(Props( new Producer("direct:a")))
val producer2 = system.actorOf(Props( new Producer("lalal alla lalal")))
import akka.util.duration._
producer ! "some message 1"
producer ! "some message 12"
(producer ? ("some message 123 boomerang", 1 second)).onSuccess{case msg:Message => println("RETURNED:" + msg.bodyAs[String])}
println("RETURNED: " + Await.result((producer2 ? ("this wont go anywhere", 1 second)), 1 second))
Thread.sleep(5000)
system.shutdown()
}
class Producer(endpointUri: String) extends Actor{
private lazy val camel = CamelExtension(context.system)
final def receive = {
case msg => camel.template.asyncCallbackSendBody(endpointUri, msg, new Callback(sender) )
}
final override def preRestart(reason: Throwable, message: Option[Any]) {
sender ! Failure(reason, Map())
}
}
class Callback(sender: ActorRef) extends Synchronization {
import akka.camel.CamelExchangeAdapter
def onFailure(exchange: Exchange) { sender ! new CamelExchangeAdapter(exchange).toFailureMessage}
def onComplete(exchange: Exchange) { sender ! new CamelExchangeAdapter(exchange).toResponseMessage}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment