Skip to content

Instantly share code, notes, and snippets.

@colindean
Last active September 25, 2018 23:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save colindean/d1191daf17c494edd3f7e032b0e1fb61 to your computer and use it in GitHub Desktop.
Save colindean/d1191daf17c494edd3f7e032b0e1fb61 to your computer and use it in GitHub Desktop.
An attempt at an ExecuteGroovyScript to fill in for ListDatabaseTables until NIFI-5519 is implemented
/********
* ListDatabaseTablesWithLookup
*
* by Colin Dean <colin.dean@arcadia.io>
*
* It it a cobbled-together attempt at implementing something to workaround
* ListDatabaseTables' inability to take incoming FlowFiles, which prevents
* that processor from using DBCPConnectionPoolLookup as its controller service
* instead of DBCPConnectionPool. This affects NiFi 1.7.0+.
*
* https://issues.apache.org/jira/browse/NIFI-5519 is tracking a proposal to
* change the behavior of ListDatabaseTables.
*
* Required dynamic properties:
*
* * databaseConnectionPoolName - the name of the controller service that is a DBCPConnectionPoolLookup
*
* This script borrows heavily from ListDatabaseTables, a part of Apache NiFi 1.7.1
*
* https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListDatabaseTables.java
*
* /*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* */
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.ProcessContext
import org.apache.nifi.processor.ProcessSession
import org.apache.nifi.processor.Relationship
import java.sql.Connection
import java.sql.DatabaseMetaData
import java.util.stream.Stream
import org.apache.nifi.util.StringUtils
import java.util.stream.Collectors
import java.sql.SQLException
import java.sql.ResultSet
final String dbcpLookupServiceAttribute = "database.name"
// Attribute names
final String DB_TABLE_NAME = "db.table.name"
final String DB_TABLE_CATALOG = "db.table.catalog"
final String DB_TABLE_SCHEMA = "db.table.schema"
final String DB_TABLE_FULLNAME = "db.table.fullname"
final String DB_TABLE_TYPE = "db.table.type"
final String DB_TABLE_REMARKS = "db.table.remarks"
final String DB_TABLE_COUNT = "db.table.count"
//renaming vars from binding for syntax highlighting
ProcessSession thisSession = session
ProcessContext thisContext = context
Relationship SUCCESS = REL_SUCCESS
Relationship FAILURE = REL_FAILURE
ComponentLog Log = log
FlowFile flowFileIn = thisSession.get()
if(!flowFileIn) return
boolean flowFileInIsRemovable = true
Closure<Connection> getConnection = { String dbcpLookupServiceKey ->
// expected dynamic properties
def dbcpLookupServiceName = databaseConnectionPoolName.value
Log.info("Controller service to lookup by name: $dbcpLookupServiceName")
if(dbcpLookupServiceName == null) {
throw new Exception("databaseConnectionPoolName property unset")
}
def serviceLookup = thisContext.controllerServiceLookup
def dbcpServiceId = serviceLookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> serviceLookup.getControllerServiceName(cs) == dbcpLookupServiceName
}
if(dbcpServiceId == null) {
throw new Exception("Unable to get controller service id for $dbcpLookupServiceName")
}
Log.info("$dbcpLookupServiceName is ID $dbcpServiceId")
def connectionConfig = [(dbcpLookupServiceAttribute): dbcpLookupServiceKey]
Log.info("Connection configuration: $connectionConfig")
def dbcpService = serviceLookup.getControllerService(dbcpServiceId)
if(dbcpService == null) {
throw new Exception("Unable to get controller service for ${dbcpServiceId}")
}
return dbcpService.getConnection(connectionConfig)
}
Connection conn
try {
String dbcpLookupServiceKey = flowFileIn.'database.name'
conn = getConnection(dbcpLookupServiceKey)
Log.debug("Connection open")
DatabaseMetaData dbMetaData = conn.getMetaData()
// https://docs.oracle.com/javase/8/docs/api/java/sql/DatabaseMetaData.html#getTables(java.lang.String,%20java.lang.String,%20java.lang.String,%20java.lang.String[])
ResultSet rs = dbMetaData.getTables(null, null, null, null)
while (rs.next()) {
FlowFile flowFileOut = thisSession.create(flowFileIn)
if(!flowFileOut) return
try {
final String tableCatalog = rs.getString(1)
final String tableSchema = rs.getString(2)
final String tableName = rs.getString(3)
final String tableType = rs.getString(4)
final String tableRemarks = rs.getString(5)
Log.info("Found table $tableName for database key $dbcpLookupServiceKey")
// Build fully-qualified name
String fqn = Stream.of(tableCatalog, tableSchema, tableName)
.filter({ segment -> !StringUtils.isEmpty(segment) })
.collect(Collectors.joining("."))
if (tableCatalog != null) {
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_CATALOG, tableCatalog)
}
if (tableSchema != null) {
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_SCHEMA, tableSchema)
}
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_NAME, tableName)
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_FULLNAME, fqn)
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_TYPE, tableType)
if (tableRemarks != null) {
flowFileOut = thisSession.putAttribute(flowFileOut, DB_TABLE_REMARKS, tableRemarks)
}
thisSession.transfer(flowFileOut, SUCCESS)
} catch(Exception e) {
Log.error('Scripting error', e)
flowFileOut = thisSession.putAttribute(flowFileOut, 'script.error', e.message)
thisSession.transfer(flowFileOut, FAILURE)
}
}
} catch (Exception e) {
flowFileInIsRemovable = false
Log.error('Scripting error while establishing querying database', e)
flowFileIn = thisSession.putAttribute(flowFileIn, 'script.error', e.message)
thisSession.transfer(flowFileIn, FAILURE)
} finally {
if(flowFileInIsRemovable){
thisSession.remove(flowFileIn)
}
if (conn) {
conn.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment