Created
January 23, 2012 00:35
-
-
Save piotrga/1659645 to your computer and use it in GitHub Desktop.
ProducerRegistry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private[camel] case class RegisterProducer(actorRef: ActorRef, endpointUri: String) | |
/** | |
* Watches the end of life of <code>Producer</code>s. | |
* Removes a <code>Producer</code> from the <code>ProducerRegistry</code> when it is <code>Terminated</code>, | |
* which in turn stops the <code>SendProcessor</code>. | |
*/ | |
private[camel] class CamelProducerIdempotentRegistry(camelContext: CamelContext) extends Actor { | |
private val camelObjects = new HashMap[ActorRef, (Endpoint,SendProcessor)]() | |
override def receive = { | |
case RegisterProducer(actorRef, endpointUri) ⇒ { | |
camelObjects.get(actorRef) match{ | |
case Some((endpoint, processor)) => sender ! (endpoint,processor) | |
case None => register(endpointUri, actorRef) | |
} | |
} | |
case Terminated(actorRef) ⇒ { | |
camelObjects.remove(actorRef).foreach { case (endpoint, processor) => | |
try { | |
processor.stop() | |
// endpoint.stop() ? | |
context.system.eventStream.publish(EndpointDeActivated(actorRef)) | |
} | |
catch { | |
case e => context.system.eventStream.publish(EndpointFailedToDeActivate(actorRef, e )) | |
} | |
} | |
} | |
} | |
private def register(endpointUri: String, actorRef: ActorRef): Any = { | |
try { | |
val endpoint = camelContext.getEndpoint(endpointUri) | |
val processor = createSendProcessor(endpoint) | |
camelObjects.put(actorRef, (endpoint, processor)) | |
context.watch(actorRef) | |
sender ! (endpoint, processor) | |
context.system.eventStream.publish(EndpointActivated(actorRef)) | |
} | |
catch { | |
case e => context.system.eventStream.publish(EndpointFailedToActivate(actorRef, e )) | |
} | |
} | |
private def createSendProcessor(endpoint: Endpoint): SendProcessor = { | |
val processor = new SendProcessor(endpoint) | |
processor.start() | |
processor | |
} | |
} | |
/** | |
* Manages the Camel objects for <code>Producer</code>s. | |
* Every <code>Producer</code> needs an <code>Endpoint</code> and a <code>SendProcessor</code> | |
* to produce messages over an <code>Exchange</code>. | |
*/ | |
private[camel] trait ProducerRegistry { | |
camel: Camel ⇒ | |
private val registry = system.actorOf(Props(new CamelProducerIdempotentRegistry(camel.context))) | |
/** | |
* Creates <code>Endpoint</code> and <code>SendProcessor</code> and associates the actorRef to these. | |
* @param actorRef the actorRef of the <code>Producer</code> actor. | |
* @param endpointUri the endpoint Uri of the producer | |
* @return <code>Endpoint</code> and <code>SendProcessor</code> registered for the actorRef | |
*/ | |
private[camel] def registerProducer(actorRef: ActorRef, endpointUri: String, timeout: Duration): (Endpoint, SendProcessor) = { | |
Await.result((registry ? (RegisterProducer(actorRef, endpointUri), timeout)).mapTo[(Endpoint, SendProcessor)], timeout) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment