Skip to content

Instantly share code, notes, and snippets.

@alopresto
Created August 27, 2019 20:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alopresto/b651d34d7d38bf05abbb2286ada85199 to your computer and use it in GitHub Desktop.
Save alopresto/b651d34d7d38bf05abbb2286ada85199 to your computer and use it in GitHub Desktop.
Sample Apache NiFi custom processor written in Groovy.
class GroovyProcessor implements Processor {
def REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles that were successfully processed").build()
def ProcessorLog log
@Override
void initialize(ProcessorInitializationContext context) {
log = context.getLogger()
}
@Override
Set<Relationship> getRelationships() {
return [REL_SUCCESS] as Set
}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
try {
def session = sessionFactory.createSession()
def flowFile = session.get()
if (!flowFile) return
def selectedColumns = ''
flowFile = session.write(flowFile,
{ inputStream, outputStream ->
String line
final BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream, 'UTF-8'))
line = inReader.readLine()
String[] header = line?.split(',')
selectedColumns = "${header[1]},${header[2]}"
while (line = inReader.readLine()) {
String[] cols = line.split(',')
outputStream.write("${cols[2].capitalize()} ${cols[3].capitalize()}\n".getBytes('UTF-8'))
}
} as StreamCallback)
flowFile = session.putAttribute(flowFile, "selected.columns", selectedColumns)
flowFile = session.putAttribute(flowFile, "filename", "split_cols_invoke.txt")
// transfer
session.transfer(flowFile, REL_SUCCESS)
session.commit()
}
catch (e) {
throw new ProcessException(e)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { return null }
@Override
PropertyDescriptor getPropertyDescriptor(String name) { return null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
List<PropertyDescriptor> getPropertyDescriptors() { return null }
@Override
String getIdentifier() { return null }
}
processor = new GroovyProcessor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment