Skip to content

Instantly share code, notes, and snippets.

@ricston-git
Last active March 6, 2018 14:33
Show Gist options
  • Save ricston-git/a19fc3e2e3f1b5c0fe9a4a790d483e2e to your computer and use it in GitHub Desktop.
Save ricston-git/a19fc3e2e3f1b5c0fe9a4a790d483e2e to your computer and use it in GitHub Desktop.
public class QuerySingleFileProcessor implements Callable, Startable {
private SftpConnector sftpConnector;
private ImmutableEndpoint immutableEndpoint;
private SftpReceiverRequesterUtil util;
@Inject
private MuleContext muleContext;
@Override
public void start() throws MuleException {
// Lookup the SFTPConnector from the Mule Registry
sftpConnector = (SftpConnector) this.muleContext.getRegistry().lookupConnector("SftpConnector");
final EndpointBuilder endpointBuilder = this.muleContext.getRegistry().lookupObject("QuerySingleFileEndpoint");
// Build an inbound endpoint based on the properties supplied in the global endpoint.
this.immutableEndpoint = endpointBuilder.buildInboundEndpoint();
this.util = new SftpReceiverRequesterUtil(immutableEndpoint);
}
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
// Create a notifier which will notify notification subscribers of this SFTP event.
final SftpNotifier notifier = new SftpNotifier(sftpConnector, eventContext.getMessage(), immutableEndpoint, eventContext.getFlowConstruct().getName());
// Obtain the name of the file we want to download from a flow variable.
final String fileName = eventContext.getMessage().getProperty("fileName", PropertyScope.INVOCATION);
if (StringUtils.isBlank(fileName)) {
throw new Exception("The flow variable 'fileName' cannot be null since it indicates which file to retrieve from the SFTP server.");
}
try {
// Download the file
final InputStream inputStream = util.retrieveFile(fileName, notifier);
// We only do this because org.mule.transport.sftp.SftpStream is package-private,
// but the subclasses are public.
if (inputStream instanceof SftpFileArchiveInputStream) {
final SftpFileArchiveInputStream sftpArchiveInputStream = (SftpFileArchiveInputStream) inputStream;
sftpArchiveInputStream.performPostProcessingOnClose(this.shouldDeleteFile());
}
if (inputStream instanceof SftpInputStream) {
final SftpInputStream sftpInputStream = (SftpInputStream) inputStream;
sftpInputStream.performPostProcessingOnClose(this.shouldDeleteFile());
}
return inputStream;
}
catch (IOException e) {
throw new DefaultMuleException(MessageFactory.createStaticMessage("An IOException was thrown whilst attempting to download '%s'", fileName), e);
}
}
/**
* A method that simply checks whether a file should be deleted or not based on the 'autoDelete' property
* defined on the connector or global endpoint. If you want to dynamically resolve this based on the Mule message,
* you can add it as a parameter.
*/
protected boolean shouldDeleteFile() {
final String endpointAutoDelete = (String) immutableEndpoint.getProperty("autoDelete");
final boolean connectorAutoDelete = sftpConnector.isAutoDelete();
return connectorAutoDelete || Boolean.valueOf(endpointAutoDelete);
}
}
<sftp:connector name="SftpConnector" archiveDir="${sftp.archiveDir}" doc:name="SFTP"/>
<sftp:endpoint name="QuerySingleFileEndpoint" host="${sftp.host}" port="${sftp.port}" autoDelete="true"
connector-ref="SftpConnector" user="${sftp.username}" password="${sftp.password}"
path="${sftp.path}" doc:name="SFTP"/>
<flow name="read-sftp-files-flow" initialState="started">
<sftp:inbound-endpoint host="${sftp.host}" port="${sftp.port}" doc:name="Read INSTRUCT files"
connector-ref="SftpConnector" user="${sftp.username}" password="${sftp.password}"
path="${sftp.path}" pollingFrequency="${sftp.pollingFrequency:10000}">
<file:filename-wildcard-filter pattern="*.instruct" caseSensitive="true"/>
</sftp:inbound-endpoint>
<object-to-string-transformer doc:name="Transform INSTRUCT file to String" />
<splitter expression="#[payload.split('\n')]" doc:name="Split file line by line"/>
<logger level="INFO" message="File to read from SFTP is: #[payload]" doc:name="Log files to read" />
<set-variable variableName="fileName" value="#[payload]" doc:name="Set fileName flowVar" />
<component doc:name="Query Single File">
<singleton-object class="com.ricston.blogs.QuerySingleFileProcessor" />
</component>
<object-to-string-transformer doc:name="Transform read file to String" />
<logger level="INFO" message="File contents: #[payload]" doc:name="Log file contents" />
</flow>
<component doc:name="Query Single File">
<singleton-object class="com.ricston.blogs.QuerySingleFileProcessor" />
</component>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment