Skip to content

Instantly share code, notes, and snippets.

@ricston-git
Last active December 29, 2015 02:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ricston-git/7603052 to your computer and use it in GitHub Desktop.
Save ricston-git/7603052 to your computer and use it in GitHub Desktop.
<flow name="parallelWsLookupFlow" doc:name="parallelWsLookupFlow">
<enricher doc:name="Parallel Lookups">
<processor-chain>
<custom-processor class="com.ricston.processor.ParallelMessageProcessor" doc:name="ParallelMessageProcessor">
<spring:property name="processors">
<spring:list>
<spring:ref bean="lookupWs1"/>
<spring:ref bean="lookupWs2"/>
</spring:list>
</spring:property>
<spring:property name="maxActive" value="100"/>
</custom-processor>
<combine-collections-transformer />
</processor-chain>
<enrich target="#[flowVars.lookup0]" source="#[message.payload[0]]" />
<enrich target="#[flowVars.lookup1]" source="#[message.payload[1]]" />
</enricher>
</flow>
<flow name="lookupWs1" doc:name="lookupWs1">
<cxf:jaxws-client
clientClass="${lookupWs1.clientClass}"
operation="${lookupWs1.operation}" port="${lookupWs1.port}" >
</cxf:jaxws-client>
<http:outbound-endpoint exchange-pattern="request-response"
host="${lookupWs1.host}" port="${lookupWs1.port}" path="${lookupWs1.path}"
method="POST" responseTimeout="${lookupWs1.responsetimeout}" doc:name="HTTP" />
</flow>
<flow name="lookupWs2" doc:name="lookupWs2">
<cxf:jaxws-client
clientClass="${lookupWs2.clientClass}"
operation="${lookupWs2.operation}" port="${lookupWs2.port}" >
</cxf:jaxws-client>
<http:outbound-endpoint exchange-pattern="request-response"
host="${lookupWs2.host}" port="${lookupWs2.port}" path="${lookupWs2.path}"
method="POST" responseTimeout="${lookupWs2.responsetimeout}" doc:name="HTTP" />
</flow>
package com.ricston.processor;
/**
* imports
* /
/**
* ParallelMessageProcessor is a message processor that will execute the inner message
* processors in parallel. Before returning, it will wait for all the processors to finish
* execution. This implements the fork-join pattern.
*
* The result will be a MuleMessageCollection with an entry for each inner
* message processor, in the same order they were listed in <code>processors</code>.
* In simple terms, this is very similar to a what a parallel ALL could look like.
*
* The message that comes in is replicated and routed to each message processor. However
* all the inner message processors are executed using a thread pool. The thread pool is
* configurable.
*
* Usages: need to do multiple lookups in parallel, like web service calls, jdbc calls...
*/
public class ParallelMessageProcessor implements MessageProcessor, MuleContextAware, Initialisable
{
private Log logger = LogFactory.getLog(getClass());
/**
* List of MessageProcessors to be executed in parallel
*/
private List<MessageProcessor> processors;
/**
* MuleContext used to create events, messages ...
*/
private MuleContext muleContext;
/**
* ThreadPoolExecutor to be used to run the procesors in parallel
*/
protected ThreadPoolExecutor threadPool;
/**
* Max threads active in the pool
*/
private int maxActive = 100;
/**
* The length of the queue used to queue the work for the thread pool
*/
private int queueLength = 1000000;
public ParallelMessageProcessor()
{
super();
}
/**
* Initialse the thread pool
*/
@Override
public void initialise() throws InitialisationException
{
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueLength);
threadPool = new ThreadPoolExecutor(maxActive, maxActive, 10000, TimeUnit.MILLISECONDS, queue);
}
/**
* Send the message to all processors to be processed in parallel using the pool. Also
* wait for all processors to finish processing, and return the result in a
* MuleMessageCollection, in the same order as the processors were configured.
*/
@Override
public MuleEvent process(MuleEvent event) throws MuleException
{
//create list of ProcessorRunner, each one will execute a message processor
int noOfProcessors = processors.size();
List<ProcessorRunner> threads = new ArrayList<ProcessorRunner>(noOfProcessors);
//create a MuleMessageCollection, to be used to return the results
MuleMessageCollection resultMessages = new DefaultMessageCollection(muleContext);
try
{
//create a ProcessorRunner for each message processor, initialising it
//with the message processor to execute, and the current MuleEvent
for (MessageProcessor mp : processors)
{
ProcessorRunner t = new ProcessorRunner(mp, event);
threads.add(t);
}
logDebugStart();
//invoke the message processors using the thread pool
List<Future<MuleEvent>> future = threadPool.invokeAll(threads);
//collect the results into a MuleMessageCollection, wait if necessary
for (Future<MuleEvent> f : future)
{
resultMessages.addMessage(f.get().getMessage());
}
logDebugEnd(resultMessages);
}
catch (InterruptedException e)
{
throw new MessagingException(event, e);
}
catch (Exception e)
{
throw new MessagingException(event, e);
}
//return the MuleMessageCollection as a result
return new DefaultMuleEvent(resultMessages, event);
}
protected void logDebugStart()
{
if (logger.isDebugEnabled())
{
logger.debug("Firing parallel requests");
}
}
protected void logDebugEnd(MuleMessageCollection resultMessages)
{
if (logger.isDebugEnabled())
{
logger.debug("Collected " + resultMessages.getMessagesAsArray().length + " messages");
}
}
@Override
public void setMuleContext(MuleContext context)
{
this.muleContext = context;
}
/**
* getters and setters here
* /
}
package com.ricston.processor;
/**
* imports
* /
/**
* A simple class implementing Callable which will execute a MessageProcessor
* given a MuleEvent as input. the Callable and MuleEvent are configured on
* creation of this class, or usings the setter methods.
*/
public class ProcessorRunner implements Callable<MuleEvent>
{
/**
* The MessageProcessor to be executed
*/
private MessageProcessor processor;
/**
* The MuleEvent to be passed on to the MessageProcessor
*/
private MuleEvent event;
public ProcessorRunner()
{
super();
}
/**
* Initialise the class with the given parameters.
*
* @param processor The MessageProcessor to be executed
* @param event The MuleEvent to be passed to the processor
*/
public ProcessorRunner(MessageProcessor processor, MuleEvent event)
{
this();
this.processor = processor;
this.event = event;
}
/**
* Create a clone of the MuleEvent passed in (no event can be shared with
* multiple threads) and execute the processor using the cloned event.
*/
@Override
public MuleEvent call() throws Exception
{
try
{
MuleEvent clonedEvent = DefaultMuleEvent.copy(event);
MuleEvent result = this.processor.process(clonedEvent);
return result;
}
catch (MuleException e)
{
throw new RuntimeException(e);
}
}
/**
* getters and setters
* /
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment