Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tomchavakis/eac02217c3658cbbe749ef39c19eaa69 to your computer and use it in GitHub Desktop.
Save tomchavakis/eac02217c3658cbbe749ef39c19eaa69 to your computer and use it in GitHub Desktop.
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import groovy.sql.Sql
import java.nio.charset.StandardCharsets
import org.apache.commons.io.IOUtils
import org.apache.nifi.annotation.documentation.CapabilityDescription
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.Relationship
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.processor.io.StreamCallback
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.expression.AttributeExpression;
@CapabilityDescription("An HTTP Client")
class CustomInvokeProcessor implements Processor {
/*
* Define Relationships (https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#documenting-relationships)
*/
final static Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description('FlowFiles that were successfully processed and had any data enriched are routed here')
.build()
final static Relationship REL_RESPONSE = new Relationship.Builder()
.name("response")
.description('A Response FlowFile will be routed upon success (2xx status codes)')
.build()
final static Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description('FlowFiles that were not successfully processed are routed here')
.build()
final static PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder()
.name("HTTP Method")
.displayName("HTTP request method")
.description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS)")
.required(true)
.defaultValue("GET")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.build()
final static PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder()
.name("Remote URL")
.displayName("Remote URL")
.description("Remote URL which will be connected to, including scheme, host, port, path.")
.required(true)
.addValidator(StandardValidators.URL_VALIDATOR)
.build()
final static String REQUEST_URL = "invokehttp.request.url";
@Override
Set<Relationship> getRelationships() { return [REL_SUCCESS, REL_RESPONSE, REL_FAILURE] as Set }
@Override
List<PropertyDescriptor> getPropertyDescriptors() {
Collections.unmodifiableList([PROP_METHOD, PROP_URL]) as List<PropertyDescriptor>
}
@Override
PropertyDescriptor getPropertyDescriptor(String name) {
switch(name) {
case 'HTTP METHOD': return PROP_METHOD
case 'HTTP URL': return PROP_URL
default: return null
}
}
def log
@Override
void initialize(ProcessorInitializationContext context) { log = context.getLogger()}
@Override
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
def session = sessionFactory.createSession()
def flowFile = session.create()
if(!flowFile) return
//def responseFlowFile = session.create()
try {
final String urlstr = context.getProperty(PROP_URL);
flowFile = session.putAttribute(flowFile, 'execution.status', '200')
session.transfer(flowFile, REL_SUCCESS)
//responseFlowFile = session.putAttribute(responseFlowFile, REQUEST_URL, urlstr)
//session.transfer(responseFlowFile, REL_RESPONSE)
session.commit()
} catch (final Throwable t) {
log.error('{} failed to process due to {}', [this, t] as Object[])
session.transfer(flowFile, REL_FAILURE)
}
}
@Override
Collection<ValidationResult> validate(ValidationContext context) { null }
@Override
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { }
@Override
String getIdentifier() { null }
}
processor = new CustomInvokeProcessor()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment