Last active
December 29, 2015 02:49
-
-
Save ricston-git/7603052 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<flow name="parallelWsLookupFlow" doc:name="parallelWsLookupFlow"> | |
<enricher doc:name="Parallel Lookups"> | |
<processor-chain> | |
<custom-processor class="com.ricston.processor.ParallelMessageProcessor" doc:name="ParallelMessageProcessor"> | |
<spring:property name="processors"> | |
<spring:list> | |
<spring:ref bean="lookupWs1"/> | |
<spring:ref bean="lookupWs2"/> | |
</spring:list> | |
</spring:property> | |
<spring:property name="maxActive" value="100"/> | |
</custom-processor> | |
<combine-collections-transformer /> | |
</processor-chain> | |
<enrich target="#[flowVars.lookup0]" source="#[message.payload[0]]" /> | |
<enrich target="#[flowVars.lookup1]" source="#[message.payload[1]]" /> | |
</enricher> | |
</flow> | |
<flow name="lookupWs1" doc:name="lookupWs1"> | |
<cxf:jaxws-client | |
clientClass="${lookupWs1.clientClass}" | |
operation="${lookupWs1.operation}" port="${lookupWs1.port}" > | |
</cxf:jaxws-client> | |
<http:outbound-endpoint exchange-pattern="request-response" | |
host="${lookupWs1.host}" port="${lookupWs1.port}" path="${lookupWs1.path}" | |
method="POST" responseTimeout="${lookupWs1.responsetimeout}" doc:name="HTTP" /> | |
</flow> | |
<flow name="lookupWs2" doc:name="lookupWs2"> | |
<cxf:jaxws-client | |
clientClass="${lookupWs2.clientClass}" | |
operation="${lookupWs2.operation}" port="${lookupWs2.port}" > | |
</cxf:jaxws-client> | |
<http:outbound-endpoint exchange-pattern="request-response" | |
host="${lookupWs2.host}" port="${lookupWs2.port}" path="${lookupWs2.path}" | |
method="POST" responseTimeout="${lookupWs2.responsetimeout}" doc:name="HTTP" /> | |
</flow> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.processor; | |
/** | |
* imports | |
* / | |
/** | |
* ParallelMessageProcessor is a message processor that will execute the inner message | |
* processors in parallel. Before returning, it will wait for all the processors to finish | |
* execution. This implements the fork-join pattern. | |
* | |
* The result will be a MuleMessageCollection with an entry for each inner | |
* message processor, in the same order they were listed in <code>processors</code>. | |
* In simple terms, this is very similar to a what a parallel ALL could look like. | |
* | |
* The message that comes in is replicated and routed to each message processor. However | |
* all the inner message processors are executed using a thread pool. The thread pool is | |
* configurable. | |
* | |
* Usages: need to do multiple lookups in parallel, like web service calls, jdbc calls... | |
*/ | |
public class ParallelMessageProcessor implements MessageProcessor, MuleContextAware, Initialisable | |
{ | |
private Log logger = LogFactory.getLog(getClass()); | |
/** | |
* List of MessageProcessors to be executed in parallel | |
*/ | |
private List<MessageProcessor> processors; | |
/** | |
* MuleContext used to create events, messages ... | |
*/ | |
private MuleContext muleContext; | |
/** | |
* ThreadPoolExecutor to be used to run the procesors in parallel | |
*/ | |
protected ThreadPoolExecutor threadPool; | |
/** | |
* Max threads active in the pool | |
*/ | |
private int maxActive = 100; | |
/** | |
* The length of the queue used to queue the work for the thread pool | |
*/ | |
private int queueLength = 1000000; | |
public ParallelMessageProcessor() | |
{ | |
super(); | |
} | |
/** | |
* Initialse the thread pool | |
*/ | |
@Override | |
public void initialise() throws InitialisationException | |
{ | |
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueLength); | |
threadPool = new ThreadPoolExecutor(maxActive, maxActive, 10000, TimeUnit.MILLISECONDS, queue); | |
} | |
/** | |
* Send the message to all processors to be processed in parallel using the pool. Also | |
* wait for all processors to finish processing, and return the result in a | |
* MuleMessageCollection, in the same order as the processors were configured. | |
*/ | |
@Override | |
public MuleEvent process(MuleEvent event) throws MuleException | |
{ | |
//create list of ProcessorRunner, each one will execute a message processor | |
int noOfProcessors = processors.size(); | |
List<ProcessorRunner> threads = new ArrayList<ProcessorRunner>(noOfProcessors); | |
//create a MuleMessageCollection, to be used to return the results | |
MuleMessageCollection resultMessages = new DefaultMessageCollection(muleContext); | |
try | |
{ | |
//create a ProcessorRunner for each message processor, initialising it | |
//with the message processor to execute, and the current MuleEvent | |
for (MessageProcessor mp : processors) | |
{ | |
ProcessorRunner t = new ProcessorRunner(mp, event); | |
threads.add(t); | |
} | |
logDebugStart(); | |
//invoke the message processors using the thread pool | |
List<Future<MuleEvent>> future = threadPool.invokeAll(threads); | |
//collect the results into a MuleMessageCollection, wait if necessary | |
for (Future<MuleEvent> f : future) | |
{ | |
resultMessages.addMessage(f.get().getMessage()); | |
} | |
logDebugEnd(resultMessages); | |
} | |
catch (InterruptedException e) | |
{ | |
throw new MessagingException(event, e); | |
} | |
catch (Exception e) | |
{ | |
throw new MessagingException(event, e); | |
} | |
//return the MuleMessageCollection as a result | |
return new DefaultMuleEvent(resultMessages, event); | |
} | |
protected void logDebugStart() | |
{ | |
if (logger.isDebugEnabled()) | |
{ | |
logger.debug("Firing parallel requests"); | |
} | |
} | |
protected void logDebugEnd(MuleMessageCollection resultMessages) | |
{ | |
if (logger.isDebugEnabled()) | |
{ | |
logger.debug("Collected " + resultMessages.getMessagesAsArray().length + " messages"); | |
} | |
} | |
@Override | |
public void setMuleContext(MuleContext context) | |
{ | |
this.muleContext = context; | |
} | |
/** | |
* getters and setters here | |
* / | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.processor; | |
/** | |
* imports | |
* / | |
/** | |
* A simple class implementing Callable which will execute a MessageProcessor | |
* given a MuleEvent as input. the Callable and MuleEvent are configured on | |
* creation of this class, or usings the setter methods. | |
*/ | |
public class ProcessorRunner implements Callable<MuleEvent> | |
{ | |
/** | |
* The MessageProcessor to be executed | |
*/ | |
private MessageProcessor processor; | |
/** | |
* The MuleEvent to be passed on to the MessageProcessor | |
*/ | |
private MuleEvent event; | |
public ProcessorRunner() | |
{ | |
super(); | |
} | |
/** | |
* Initialise the class with the given parameters. | |
* | |
* @param processor The MessageProcessor to be executed | |
* @param event The MuleEvent to be passed to the processor | |
*/ | |
public ProcessorRunner(MessageProcessor processor, MuleEvent event) | |
{ | |
this(); | |
this.processor = processor; | |
this.event = event; | |
} | |
/** | |
* Create a clone of the MuleEvent passed in (no event can be shared with | |
* multiple threads) and execute the processor using the cloned event. | |
*/ | |
@Override | |
public MuleEvent call() throws Exception | |
{ | |
try | |
{ | |
MuleEvent clonedEvent = DefaultMuleEvent.copy(event); | |
MuleEvent result = this.processor.process(clonedEvent); | |
return result; | |
} | |
catch (MuleException e) | |
{ | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* getters and setters | |
* / | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment