Created
March 18, 2022 19:55
-
-
Save tomchavakis/eac02217c3658cbbe749ef39c19eaa69 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import groovy.json.JsonBuilder | |
import groovy.json.JsonSlurper | |
import groovy.sql.Sql | |
import java.nio.charset.StandardCharsets | |
import org.apache.commons.io.IOUtils | |
import org.apache.nifi.annotation.documentation.CapabilityDescription | |
import org.apache.nifi.components.PropertyDescriptor | |
import org.apache.nifi.dbcp.DBCPService | |
import org.apache.nifi.processor.Relationship | |
import org.apache.nifi.processor.exception.ProcessException | |
import org.apache.nifi.processor.io.StreamCallback | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.apache.nifi.processor.util.StandardValidators | |
import org.apache.nifi.expression.AttributeExpression; | |
@CapabilityDescription("An HTTP Client") | |
class CustomInvokeProcessor implements Processor { | |
/* | |
* Define Relationships (https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#documenting-relationships) | |
*/ | |
final static Relationship REL_SUCCESS = new Relationship.Builder() | |
.name("success") | |
.description('FlowFiles that were successfully processed and had any data enriched are routed here') | |
.build() | |
final static Relationship REL_RESPONSE = new Relationship.Builder() | |
.name("response") | |
.description('A Response FlowFile will be routed upon success (2xx status codes)') | |
.build() | |
final static Relationship REL_FAILURE = new Relationship.Builder() | |
.name("failure") | |
.description('FlowFiles that were not successfully processed are routed here') | |
.build() | |
final static PropertyDescriptor PROP_METHOD = new PropertyDescriptor.Builder() | |
.name("HTTP Method") | |
.displayName("HTTP request method") | |
.description("HTTP request method (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS)") | |
.required(true) | |
.defaultValue("GET") | |
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) | |
.build() | |
final static PropertyDescriptor PROP_URL = new PropertyDescriptor.Builder() | |
.name("Remote URL") | |
.displayName("Remote URL") | |
.description("Remote URL which will be connected to, including scheme, host, port, path.") | |
.required(true) | |
.addValidator(StandardValidators.URL_VALIDATOR) | |
.build() | |
final static String REQUEST_URL = "invokehttp.request.url"; | |
@Override | |
Set<Relationship> getRelationships() { return [REL_SUCCESS, REL_RESPONSE, REL_FAILURE] as Set } | |
@Override | |
List<PropertyDescriptor> getPropertyDescriptors() { | |
Collections.unmodifiableList([PROP_METHOD, PROP_URL]) as List<PropertyDescriptor> | |
} | |
@Override | |
PropertyDescriptor getPropertyDescriptor(String name) { | |
switch(name) { | |
case 'HTTP METHOD': return PROP_METHOD | |
case 'HTTP URL': return PROP_URL | |
default: return null | |
} | |
} | |
def log | |
@Override | |
void initialize(ProcessorInitializationContext context) { log = context.getLogger()} | |
@Override | |
void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { | |
def session = sessionFactory.createSession() | |
def flowFile = session.create() | |
if(!flowFile) return | |
//def responseFlowFile = session.create() | |
try { | |
final String urlstr = context.getProperty(PROP_URL); | |
flowFile = session.putAttribute(flowFile, 'execution.status', '200') | |
session.transfer(flowFile, REL_SUCCESS) | |
//responseFlowFile = session.putAttribute(responseFlowFile, REQUEST_URL, urlstr) | |
//session.transfer(responseFlowFile, REL_RESPONSE) | |
session.commit() | |
} catch (final Throwable t) { | |
log.error('{} failed to process due to {}', [this, t] as Object[]) | |
session.transfer(flowFile, REL_FAILURE) | |
} | |
} | |
@Override | |
Collection<ValidationResult> validate(ValidationContext context) { null } | |
@Override | |
void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { } | |
@Override | |
String getIdentifier() { null } | |
} | |
processor = new CustomInvokeProcessor() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment