Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Complete Async Aggregation Example
package org.mule.example.cep;
import org.mule.api.MuleMessage;
import org.mule.api.MuleMessageCollection;
import org.mule.api.client.MuleClient;
import org.mule.tck.FunctionalTestCase;
import org.mule.util.CollectionUtils;
import java.util.*;
public class AggregationTestCase extends FunctionalTestCase {
@Override
protected String getConfigResources() {
return "src/main/resources/mule-async-config.xml";
}
public void testCanAggregate() throws Exception {
MuleClient client = muleContext.getClient();
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("requestId", UUID.randomUUID().toString());
client.dispatch("vm://vm.in", "foo", properties);
MuleMessage response = client.request("vm://vm.out", 15000);
assertNotNull(response);
assertTrue(response instanceof MuleMessageCollection);
MuleMessageCollection aggregatedResponse = (MuleMessageCollection) response;
assertEquals(3, aggregatedResponse.size());
Set<String> payloads = new HashSet<String>();
CollectionUtils.addAll(payloads, aggregatedResponse.getPayloadsAsArray());
assertTrue(payloads.contains("foo1"));
assertTrue(payloads.contains("foo2"));
assertTrue(payloads.contains("foo3"));
}
}
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd
">
<flow name="Async Dispatch">
<vm:inbound-endpoint path="vm.in">
<message-properties-transformer scope="session">
<add-message-property key="requestId" value="#[header:INBOUND:requestId]"/>
</message-properties-transformer>
</vm:inbound-endpoint>
<all>
<vm:outbound-endpoint path="vm.flow.1"/>
<vm:outbound-endpoint path="vm.flow.2"/>
<vm:outbound-endpoint path="vm.flow.3"/>
</all>
</flow>
<flow name="Async Aggregate">
<vm:inbound-endpoint path="vm.aggregate"/>
<collection-aggregator />
<vm:outbound-endpoint path="vm.out"/>
</flow>
<flow name="vmFlow1">
<vm:inbound-endpoint path="vm.flow.1"/>
<logger level="INFO" message="FOO: #[header:SESSION:requestId]"/>
<expression-transformer evaluator="groovy" expression="return 'foo1'"/>
<vm:outbound-endpoint path="vm.aggregate">
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/>
</message-properties-transformer>
</vm:outbound-endpoint>
</flow>
<flow name="vmFlow2">
<vm:inbound-endpoint path="vm.flow.2"/>
<expression-transformer evaluator="groovy" expression="return 'foo2'"/>
<vm:outbound-endpoint path="vm.aggregate">
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/>
</message-properties-transformer>
</vm:outbound-endpoint>
</flow>
<flow name="vmFlow3">
<vm:inbound-endpoint path="vm.flow.3"/>
<expression-transformer evaluator="groovy" expression="return 'foo3'"/>
<vm:outbound-endpoint path="vm.aggregate">
<message-properties-transformer>
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/>
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/>
</message-properties-transformer>
</vm:outbound-endpoint>
</flow>
</mule>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.