Skip to content

Instantly share code, notes, and snippets.

@samspeed
Last active February 5, 2018 21:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samspeed/f5f880e18ea43a8e5687ff3119f216d2 to your computer and use it in GitHub Desktop.
Save samspeed/f5f880e18ea43a8e5687ff3119f216d2 to your computer and use it in GitHub Desktop.
Creating Dynamic Flows in Mule using Dataweave and Spring
%dw 1.0
%output application/xml encoding="UTF-8"
---
mule @(
'xmlns':"http://www.mulesoft.org/schema/mule/core",
'xmlns:doc':"http://www.mulesoft.org/schema/mule/documentation",
'xmlns:file':"http://www.mulesoft.org/schema/mule/file",
'xmlns:spring':"http://www.springframework.org/schema/beans",
'xmlns:xsi':"http://www.w3.org/2001/XMLSchema-instance",
'xsi:schemaLocation': "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd"): {
flow @(name:payload.flowName):{
(poll @('doc:name':"Poll") :{
'fixed-frequency-scheduler' @(
'frequency':"5",
'timeUnit':'SECONDS'
):{},
set-payload @(
value:"Poll message",
'doc:name':"Set Payload"
):{}
}) when payload.inboundEndpointType == "Poll",
('file:inbound-endpoint' @(
'path': payload.inboundEndpoint.path,
'doc:name':'File',
responseTimeout:"10000"
):{}) when payload.inboundEndpointType == "File",
logger @(
'message':payload.logger1.message,
'doc:name':'logger',
level:payload.logger1.level
):{},
logger @(
'message':"Message payload: #[message.payloadAs(String)]",
'doc:name':'logger',
level:"INFO"
):{}
}
}
%dw 1.0
%output application/xml encoding="UTF-8"
---
mule @(
'xmlns':"http://www.mulesoft.org/schema/mule/core",
'xmlns:doc':"http://www.mulesoft.org/schema/mule/documentation",
'xmlns:file':"http://www.mulesoft.org/schema/mule/file",
'xmlns:spring':"http://www.springframework.org/schema/beans",
'xmlns:xsi':"http://www.w3.org/2001/XMLSchema-instance",
'xsi:schemaLocation': "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd"): {
flow @(name:payload.flowName):{
(poll @('doc:name':"Poll") :{
'fixed-frequency-scheduler' @(
'frequency':"5",
'timeUnit':'SECONDS'
):{},
set-payload @(
value:"Poll message",
'doc:name':"Set Payload"
):{}
}) when payload.inboundEndpointType == "Poll",
('file:inbound-endpoint' @(
'path': payload.inboundEndpoint.path,
'doc:name':'File',
responseTimeout:"10000"
):{}) when payload.inboundEndpointType == "File",
logger @(
'message':payload.logger1.message,
'doc:name':'logger',
level:payload.logger1.level
):{},
logger @(
'message':"Message payload: #[message.payloadAs(String)]",
'doc:name':'logger',
level:"INFO"
):{},
flow-ref @(
name:"outboundFlow",
'doc:name':"outboundFlow"
):{}
}
}
<flow name="outboundFlow">
<logger message="Common outbound flow" level="INFO" doc:name="Logger"/>
<file:outbound-endpoint path="/Users/sam.speed/outbound" outputPattern="#[message.id].txt" responseTimeout="10000" doc:name="File"/>
</flow>
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:quartz="http://www.mulesoft.org/schema/mule/quartz" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/quartz http://www.mulesoft.org/schema/mule/quartz/current/mule-quartz.xsd">
<spring:beans>
<spring:bean id="DynamicFlowController" name="DynamicFlowController" class="com.ricston.dynamicFlows.DynamicFlowController" scope="singleton"/>
</spring:beans>
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="createContextFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/create" doc:name="HTTP"/>
<set-variable variableName="contextName" value="#[json:contextName]" doc:name="Set contextName"/>
<dw:transform-message doc:name="Transform Message">
<dw:input-payload doc:sample="sample_data/json.json"/>
<dw:set-payload resource="classpath:change_endpoints_with_outbound.dwl"/>
</dw:transform-message>
<invoke method="createContext" name="CreateContext" object-ref="DynamicFlowController" methodArguments="#[payload],#[flowVars.contextName != null ? flowVars.contextName : message.id]"/>
<set-payload value="#['ok']" doc:name="Set Payload"/>
</flow>
<flow name="listContextFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/list" doc:name="HTTP"/>
<invoke method="listContexts" name="ListContexts" object-ref="DynamicFlowController" />
<dw:transform-message doc:name="Transform Message">
<dw:input-payload mimeType="application/java"/>
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
contexts: payload
}]]></dw:set-payload>
</dw:transform-message>
</flow>
<flow name="stopContextFlow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/delete" doc:name="HTTP"/>
<invoke method="stopContext" name="stopContext" object-ref="DynamicFlowController" methodArguments="#[message.inboundProperties.'http.query.params'.contextName]"/>
<set-payload value="#['ok']" doc:name="Set Payload"/>
</flow>
</mule>
package com.ricston.dynamicFlows;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.mule.context.DefaultMuleContextFactory;
import com.ricston.dynamicFlows.exception.ContextNotExistsExeception;
import org.mule.api.MuleContext;
import org.mule.config.ConfigResource;
import org.mule.config.spring.SpringXmlConfigurationBuilder;
import org.mule.api.lifecycle.Disposable;
import org.mule.api.lifecycle.Initialisable;
public class DynamicFlowController implements Initialisable, Disposable {
private static final Logger logger = LogManager.getLogger(DynamicFlowController.class);
private final String COMMON_FLOWS = "common-flows";
private final String COMMON_FLOWS_FILENAME = "common-flows.xml";
private HashMap<String, MuleContext> contextList;
public DynamicFlowController() {
contextList = new HashMap<String, MuleContext>();
}
public Object createContext(String payload, String contextName) {
logger.debug("Running DynamicFlowFactory.createContext(String payload,String contextName)");
// ensure unique context name
while (contextList.containsKey(contextName)) {
contextName += "+1";
}
try {
ArrayList<String> muleConfigFiles = new ArrayList<String>();
muleConfigFiles.add(payload);
createMuleContextFromConfigFiles(contextName, muleConfigFiles);
} catch (Exception e) {
logger.error("Exception: ", e);
}
return payload;
}
private void createMuleContextFromConfigFiles(String contextName, ArrayList<String> configFiles) {
try {
//Create new MuleContext with the mule config xml files provided
MuleContext muleContext = new DefaultMuleContextFactory().createMuleContext();
ConfigResource configResource[] = new ConfigResource[configFiles.size()];
for (int i=0;i< configFiles.size(); i++) {
configResource[i] = new ConfigResource(contextName + "-flows-" + i, new ByteArrayInputStream(configFiles.get(i).getBytes()));
}
SpringXmlConfigurationBuilder configBuilder = new SpringXmlConfigurationBuilder(configResource);
//Add common flows context as parent
if (!COMMON_FLOWS.equals(contextName)) {
configBuilder.setDomainContext(contextList.get(COMMON_FLOWS));
}
configBuilder.configure(muleContext);
muleContext.start();
contextList.put(contextName, muleContext);
} catch (Exception e) {
logger.error("Exception: ", e);
}
}
public void initialise() {
logger.debug("Running DynamicFlowFactory.initialise()");
try {
String payload = IOUtils.toString(DynamicFlowController.class.getClassLoader().getResource(COMMON_FLOWS_FILENAME),StandardCharsets.UTF_8);
ArrayList<String> muleConfigFiles = new ArrayList<String>();
muleConfigFiles.add(payload);
createMuleContextFromConfigFiles(COMMON_FLOWS, muleConfigFiles);
} catch (Exception e) {
logger.error("Exception: ", e);
}
}
public List<String> listContexts() {
logger.debug("Running DynamicFlowFactory.listContexts()");
List<String> contextNames = new ArrayList<String>(contextList.keySet());
return contextNames;
}
public void stopContext(String contextName) throws ContextNotExistsExeception {
logger.debug("Running DynamicFlowFactory.stopContext(String contextName)");
if (contextList.containsKey(contextName)) {
MuleContext muleContext = contextList.get(contextName);
logger.debug("Stopping context " + contextName);
try {
muleContext.stop();
muleContext.dispose();
contextList.remove(contextName);
} catch (Exception e) {
logger.error("Exception: ", e);
}
} else {
throw new ContextNotExistsExeception("Cannot find context with name " + contextName);
}
}
@Override
public void dispose() {
List<String> contextNames = new ArrayList<String>(contextList.keySet());
//Dispose of each context that was created
for(String contextName : contextNames){
if(!(COMMON_FLOWS.equals(contextName))){
try{
MuleContext muleContext = contextList.get(contextName);
muleContext.stop();
muleContext.dispose();
}catch(Exception e){
logger.error("Exception: ", e);
}
}
}
//Dispose of common flows last
try{
MuleContext muleContext = contextList.get(COMMON_FLOWS);
muleContext.stop();
muleContext.dispose();
}catch(Exception e){
logger.error("Exception: ", e);
}
}
}
<?xml version='1.0' encoding='UTF-8'?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd">
<flow name="file-flow">
<file:inbound-endpoint path="/Users/sam.speed/inbound" doc:name="File" responseTimeout="10000"/>
<logger message="Starting file flow with name: #[flow.name]" doc:name="logger" level="INFO"/>
<logger message="Message payload: #[message.payloadAs(String)]" doc:name="logger" level="INFO"/>
</flow>
</mule>
{
"contextName":"File-endpoint-context",
"flowName":"file-flow",
"inboundEndpointType":"File",
"inboundEndpoint":{
"path":"/Users/sam.speed/inbound"
},
"logger1":{
"level":"INFO",
"message":"Starting file flow with name: #[flow.name]"
}
}
flow-ref @(
name:"outboundFlow",
'doc:name':"outboundFlow"
):{}
<?xml version='1.0' encoding='UTF-8'?>
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd">
<flow name="poll-flow">
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="5" timeUnit="SECONDS"/>
<set-payload value="Poll message" doc:name="Set Payload"/>
</poll>
<logger message="Starting poll flow with name: #[flow.name]" doc:name="logger" level="INFO"/>
<logger message="Message payload: #[message.payloadAs(String)]" doc:name="logger" level="INFO"/>
<flow-ref name="outboundFlow" doc:name="outboundFlow"/>
</flow>
</mule>
{
"contextName":"Poll-endpoint-context",
"flowName":"poll-flow",
"inboundEndpointType":"Poll",
"logger1":{
"level":"INFO",
"message":"Starting poll flow with name: #[flow.name]"
}
}
18:31:07 [connector.file.mule.default.receiver.01] INFO org.mule.transport.file.FileMessageReceiver - Lock obtained on file: /Users/sam.speed/inbound/example.txt
18:31:07 [file-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Starting file flow with name: file-flow
18:31:07 [file-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Message payload: file-based-message
18:31:07 [outboundFlow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Common outbound flow
18:31:07 [connector.file.mule.default.dispatcher.01] INFO org.mule.transport.file.FileConnector - Writing file to: /Users/sam.speed/outbound/62e66cb0-6983-11e7-b7ae-186590cf1ef3.txt
18:31:11 [poll-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Starting poll flow with name: poll-flow
18:31:11 [poll-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Message payload: Poll message
18:31:11 [outboundFlow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Common outbound flow
18:31:11 [connector.file.mule.default.dispatcher.01] INFO org.mule.transport.file.FileConnector - Writing file to: /Users/sam.speed/outbound/64f130d0-6983-11e7-b7ae-186590cf1ef3.txt
18:31:16 [poll-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Starting poll flow with name: poll-flow
18:31:16 [poll-flow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Message payload: Poll message
18:31:16 [outboundFlow.stage1.02] INFO org.mule.api.processor.LoggerMessageProcessor - Common outbound flow
18:31:16 [connector.file.mule.default.dispatcher.01] INFO org.mule.transport.file.FileConnector - Writing file to: /Users/sam.speed/outbound/67ebfa40-6983-11e7-b7ae-186590cf1ef3.txt
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment