Created
March 7, 2012 15:19
-
-
Save johndemic/1993756 to your computer and use it in GitHub Desktop.
Complete Async Aggregation Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.mule.example.cep; | |
import org.mule.api.MuleMessage; | |
import org.mule.api.MuleMessageCollection; | |
import org.mule.api.client.MuleClient; | |
import org.mule.tck.FunctionalTestCase; | |
import org.mule.util.CollectionUtils; | |
import java.util.*; | |
public class AggregationTestCase extends FunctionalTestCase { | |
@Override | |
protected String getConfigResources() { | |
return "src/main/resources/mule-async-config.xml"; | |
} | |
public void testCanAggregate() throws Exception { | |
MuleClient client = muleContext.getClient(); | |
Map<String, Object> properties = new HashMap<String, Object>(); | |
properties.put("requestId", UUID.randomUUID().toString()); | |
client.dispatch("vm://vm.in", "foo", properties); | |
MuleMessage response = client.request("vm://vm.out", 15000); | |
assertNotNull(response); | |
assertTrue(response instanceof MuleMessageCollection); | |
MuleMessageCollection aggregatedResponse = (MuleMessageCollection) response; | |
assertEquals(3, aggregatedResponse.size()); | |
Set<String> payloads = new HashSet<String>(); | |
CollectionUtils.addAll(payloads, aggregatedResponse.getPayloadsAsArray()); | |
assertTrue(payloads.contains("foo1")); | |
assertTrue(payloads.contains("foo2")); | |
assertTrue(payloads.contains("foo3")); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<mule xmlns="http://www.mulesoft.org/schema/mule/core" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:spring="http://www.springframework.org/schema/beans" | |
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" | |
xsi:schemaLocation=" | |
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd | |
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/3.2/mule.xsd | |
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/3.2/mule-vm.xsd | |
"> | |
<flow name="Async Dispatch"> | |
<vm:inbound-endpoint path="vm.in"> | |
<message-properties-transformer scope="session"> | |
<add-message-property key="requestId" value="#[header:INBOUND:requestId]"/> | |
</message-properties-transformer> | |
</vm:inbound-endpoint> | |
<all> | |
<vm:outbound-endpoint path="vm.flow.1"/> | |
<vm:outbound-endpoint path="vm.flow.2"/> | |
<vm:outbound-endpoint path="vm.flow.3"/> | |
</all> | |
</flow> | |
<flow name="Async Aggregate"> | |
<vm:inbound-endpoint path="vm.aggregate"/> | |
<collection-aggregator /> | |
<vm:outbound-endpoint path="vm.out"/> | |
</flow> | |
<flow name="vmFlow1"> | |
<vm:inbound-endpoint path="vm.flow.1"/> | |
<logger level="INFO" message="FOO: #[header:SESSION:requestId]"/> | |
<expression-transformer evaluator="groovy" expression="return 'foo1'"/> | |
<vm:outbound-endpoint path="vm.aggregate"> | |
<message-properties-transformer> | |
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/> | |
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/> | |
</message-properties-transformer> | |
</vm:outbound-endpoint> | |
</flow> | |
<flow name="vmFlow2"> | |
<vm:inbound-endpoint path="vm.flow.2"/> | |
<expression-transformer evaluator="groovy" expression="return 'foo2'"/> | |
<vm:outbound-endpoint path="vm.aggregate"> | |
<message-properties-transformer> | |
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/> | |
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/> | |
</message-properties-transformer> | |
</vm:outbound-endpoint> | |
</flow> | |
<flow name="vmFlow3"> | |
<vm:inbound-endpoint path="vm.flow.3"/> | |
<expression-transformer evaluator="groovy" expression="return 'foo3'"/> | |
<vm:outbound-endpoint path="vm.aggregate"> | |
<message-properties-transformer> | |
<add-message-property key="MULE_CORRELATION_ID" value="#[header:SESSION:requestId]"/> | |
<add-message-property key="MULE_CORRELATION_GROUP_SIZE" value="3"/> | |
</message-properties-transformer> | |
</vm:outbound-endpoint> | |
</flow> | |
</mule> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment