Created
January 23, 2012 17:13
-
-
Save piotrga/1664313 to your computer and use it in GitHub Desktop.
Producer - use-case
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ ActorRef, Terminated, AllForOneStrategy, Actor, Props, ActorSystem } | |
import org.apache.camel.builder.RouteBuilder | |
import org.apache.camel.impl.DefaultMessage | |
import org.apache.camel.{ Exchange, Processor} | |
import org.apache.camel.spi.Synchronization | |
import akka.camel.{Message, Failure, CamelExtension} | |
import akka.dispatch.Await | |
object CamelRoutes extends App{ | |
val system = ActorSystem("Test") | |
val other = system.actorOf(Props(ctx => {case m=> println("Other: "+m)})) | |
val camel = CamelExtension(system) | |
camel.context.addRoutes( | |
new RouteBuilder(){ | |
def configure() { | |
from("direct:a").to("direct:out").transform(body().prepend("TRANSFORMED<").append(">")) | |
.choice() | |
.when(header("respondTo").isEqualTo("c")).to("direct:c") | |
.when(header("respondTo").isEqualTo("d")).to("direct:d") | |
.otherwise().process(new Processor { def process(exchange: Exchange) { other ! exchange.getIn.getBody}}) | |
from("direct:c").process(new Processor { def process(exchange: Exchange) { println("C:" + exchange.getIn.getBody)}}) | |
from("direct:d").process(new Processor { def process(exchange: Exchange) { println("D:" + exchange.getIn.getBody)}}) | |
from("direct:out").process(new Processor { | |
def process(exchange: Exchange) { | |
val body = exchange.getIn.getBody.toString | |
println("out:" + body); | |
val response = new DefaultMessage | |
response.setBody("received: '%s'" format body) | |
if (!body.contains("boomerang")) response.setHeader("respondTo", if (body.length() % 2 == 0) "c" else "d") | |
exchange.setOut(response) | |
} | |
}) | |
} | |
}) | |
val producer = system.actorOf(Props( new Producer("direct:a"))) | |
val producer2 = system.actorOf(Props( new Producer("lalal alla lalal"))) | |
import akka.util.duration._ | |
producer ! "some message 1" | |
producer ! "some message 12" | |
(producer ? ("some message 123 boomerang", 1 second)).onSuccess{case msg:Message => println("RETURNED:" + msg.bodyAs[String])} | |
println("RETURNED: " + Await.result((producer2 ? ("this wont go anywhere", 1 second)), 1 second)) | |
Thread.sleep(5000) | |
system.shutdown() | |
} | |
class Producer(endpointUri: String) extends Actor{ | |
private lazy val camel = CamelExtension(context.system) | |
final def receive = { | |
case msg => camel.template.asyncCallbackSendBody(endpointUri, msg, new Callback(sender) ) | |
} | |
final override def preRestart(reason: Throwable, message: Option[Any]) { | |
sender ! Failure(reason, Map()) | |
} | |
} | |
class Callback(sender: ActorRef) extends Synchronization { | |
import akka.camel.CamelExchangeAdapter | |
def onFailure(exchange: Exchange) { sender ! new CamelExchangeAdapter(exchange).toFailureMessage} | |
def onComplete(exchange: Exchange) { sender ! new CamelExchangeAdapter(exchange).toResponseMessage} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment