Skip to content

Instantly share code, notes, and snippets.

@piotrga
Created January 23, 2012 00:35
Show Gist options
  • Save piotrga/1659645 to your computer and use it in GitHub Desktop.
Save piotrga/1659645 to your computer and use it in GitHub Desktop.
ProducerRegistry
private[camel] case class RegisterProducer(actorRef: ActorRef, endpointUri: String)
/**
* Watches the end of life of <code>Producer</code>s.
* Removes a <code>Producer</code> from the <code>ProducerRegistry</code> when it is <code>Terminated</code>,
* which in turn stops the <code>SendProcessor</code>.
*/
private[camel] class CamelProducerIdempotentRegistry(camelContext: CamelContext) extends Actor {
private val camelObjects = new HashMap[ActorRef, (Endpoint,SendProcessor)]()
override def receive = {
case RegisterProducer(actorRef, endpointUri) ⇒ {
camelObjects.get(actorRef) match{
case Some((endpoint, processor)) => sender ! (endpoint,processor)
case None => register(endpointUri, actorRef)
}
}
case Terminated(actorRef) ⇒ {
camelObjects.remove(actorRef).foreach { case (endpoint, processor) =>
try {
processor.stop()
// endpoint.stop() ?
context.system.eventStream.publish(EndpointDeActivated(actorRef))
}
catch {
case e => context.system.eventStream.publish(EndpointFailedToDeActivate(actorRef, e ))
}
}
}
}
private def register(endpointUri: String, actorRef: ActorRef): Any = {
try {
val endpoint = camelContext.getEndpoint(endpointUri)
val processor = createSendProcessor(endpoint)
camelObjects.put(actorRef, (endpoint, processor))
context.watch(actorRef)
sender ! (endpoint, processor)
context.system.eventStream.publish(EndpointActivated(actorRef))
}
catch {
case e => context.system.eventStream.publish(EndpointFailedToActivate(actorRef, e ))
}
}
private def createSendProcessor(endpoint: Endpoint): SendProcessor = {
val processor = new SendProcessor(endpoint)
processor.start()
processor
}
}
/**
* Manages the Camel objects for <code>Producer</code>s.
* Every <code>Producer</code> needs an <code>Endpoint</code> and a <code>SendProcessor</code>
* to produce messages over an <code>Exchange</code>.
*/
private[camel] trait ProducerRegistry {
camel: Camel ⇒
private val registry = system.actorOf(Props(new CamelProducerIdempotentRegistry(camel.context)))
/**
* Creates <code>Endpoint</code> and <code>SendProcessor</code> and associates the actorRef to these.
* @param actorRef the actorRef of the <code>Producer</code> actor.
* @param endpointUri the endpoint Uri of the producer
* @return <code>Endpoint</code> and <code>SendProcessor</code> registered for the actorRef
*/
private[camel] def registerProducer(actorRef: ActorRef, endpointUri: String, timeout: Duration): (Endpoint, SendProcessor) = {
Await.result((registry ? (RegisterProducer(actorRef, endpointUri), timeout)).mapTo[(Endpoint, SendProcessor)], timeout)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment