Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
A template for Apache NiFi that uses ExecuteScript with Groovy to issue a SQL query and produce a flowfile containing a CSV representation of the results
<?xml version="1.0" encoding="UTF-8" standalone="yes"?><template><description>A template that uses ExecuteScript with Groovy to issue a SQL query and produce a flowfile containing a CSV representation of the results</description><name>SQL-to-CSV_ExecuteScript</name><snippet><processors><id>26925087-23d2-48c9-86d2-6bdf7e350167</id><parentGroupId>3b5fc5a3-8ae5-4e68-9911-a46c8f9c59c3</parentGroupId><position><x>1398.5380462507696</x><y>72.79047858547945</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Script Engine</key><value><allowableValues><displayName>ECMAScript</displayName><value>ECMAScript</value></allowableValues><allowableValues><displayName>Groovy</displayName><value>Groovy</value></allowableValues><allowableValues><displayName>lua</displayName><value>lua</value></allowableValues><allowableValues><displayName>python</displayName><value>python</value></allowableValues><allowableValues><displayName>ruby</displayName><value>ruby</value></allowableValues><defaultValue>ECMAScript</defaultValue><description>The engine to execute scripts</description><displayName>Script Engine</displayName><dynamic>false</dynamic><name>Script Engine</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Script File</key><value><description>Path to script file to execute. Only one of Script File or Script Body may be used</description><displayName>Script File</displayName><dynamic>false</dynamic><name>Script File</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Script Body</key><value><description>Body of script to execute. Only one of Script File or Script Body may be used</description><displayName>Script Body</displayName><dynamic>false</dynamic><name>Script Body</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Module Directory</key><value><description>Comma-separated list of paths to files and/or directories which contain modules required by the script.</description><displayName>Module Directory</displayName><dynamic>false</dynamic><name>Module Directory</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>databaseConnectionPoolName</key><value><description></description><displayName>databaseConnectionPoolName</displayName><dynamic>true</dynamic><name>databaseConnectionPoolName</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>filename</key><value><description></description><displayName>filename</displayName><dynamic>true</dynamic><name>filename</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Script Engine</key><value>Groovy</value></entry><entry><key>Script File</key></entry><entry><key>Script Body</key><value>import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
def lookup = context.controllerServiceLookup
def dbServiceName = databaseConnectionPoolName.value
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -&gt; lookup.getControllerServiceName(cs) == dbServiceName
}
def conn = lookup.getControllerService(dbcpServiceId)?.getConnection()
try {
flowFile = session.create()
flowFile = session.write(flowFile, {out -&gt;
def sql = new Sql(conn)
sql.rows('select * from users').eachWithIndex { row, idx -&gt;
if(idx == 0) { out.write(((row.keySet() as List).join(',') + &quot;\n&quot;).getBytes()) }
out.write((row.values().join(',') + &quot;\n&quot;).getBytes())
}
} as OutputStreamCallback)
flowFile = session.putAttribute(flowFile, 'filename', filename.value)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error', e)
session.transfer(flowFile, REL_FAILURE)
}
conn?.close()</value></entry><entry><key>Module Directory</key></entry><entry><key>databaseConnectionPoolName</key><value>PostgresConnectionPool</value></entry><entry><key>filename</key><value>sql_results.csv</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>5 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>SQL-to-CSV</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that failed to be processed</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that were successfully processed</description><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>false</supportsParallelProcessing><type>org.apache.nifi.processors.script.ExecuteScript</type></processors></snippet><timestamp>04/08/2016 14:46:36 EDT</timestamp></template>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment