Manipulate fields in StreamSets Data Collector via Groovy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"pipelineConfig" : { | |
"schemaVersion" : 2, | |
"version" : 5, | |
"uuid" : "1fba4dbe-5855-4765-92ec-722fe832405a", | |
"title" : "Field Manipulations - Groovy", | |
"description" : "", | |
"configuration" : [ { | |
"name" : "executionMode", | |
"value" : "STANDALONE" | |
}, { | |
"name" : "deliveryGuarantee", | |
"value" : "AT_LEAST_ONCE" | |
}, { | |
"name" : "shouldRetry", | |
"value" : true | |
}, { | |
"name" : "retryAttempts", | |
"value" : -1 | |
}, { | |
"name" : "memoryLimit", | |
"value" : "${jvm:maxMemoryMB() * 0.65}" | |
}, { | |
"name" : "memoryLimitExceeded", | |
"value" : "STOP_PIPELINE" | |
}, { | |
"name" : "notifyOnStates", | |
"value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ] | |
}, { | |
"name" : "emailIDs", | |
"value" : [ ] | |
}, { | |
"name" : "constants", | |
"value" : [ ] | |
}, { | |
"name" : "badRecordsHandling", | |
"value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1" | |
}, { | |
"name" : "clusterSlaveMemory", | |
"value" : 1024 | |
}, { | |
"name" : "clusterSlaveJavaOpts", | |
"value" : "-XX:PermSize=128M -XX:MaxPermSize=256M -Dhttps.protocols=TLSv1.2,TLSv1.1 -Dlog4j.debug" | |
}, { | |
"name" : "clusterLauncherEnv", | |
"value" : [ ] | |
}, { | |
"name" : "mesosDispatcherURL", | |
"value" : null | |
}, { | |
"name" : "hdfsS3ConfDir", | |
"value" : null | |
}, { | |
"name" : "rateLimit", | |
"value" : 0 | |
}, { | |
"name" : "statsAggregatorStage", | |
"value" : "" | |
} ], | |
"uiInfo" : { | |
"previewConfig" : { | |
"previewSource" : "CONFIGURED_SOURCE", | |
"batchSize" : 10, | |
"timeout" : 10000, | |
"writeToDestinations" : false, | |
"showHeader" : false, | |
"showFieldType" : true, | |
"rememberMe" : false | |
} | |
}, | |
"stages" : [ { | |
"instanceName" : "DevRawDataSource_01", | |
"library" : "streamsets-datacollector-dev-lib", | |
"stageName" : "com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource", | |
"stageVersion" : "2", | |
"configuration" : [ { | |
"name" : "dataFormat", | |
"value" : "JSON" | |
}, { | |
"name" : "dataFormatConfig.compression", | |
"value" : "NONE" | |
}, { | |
"name" : "dataFormatConfig.filePatternInArchive", | |
"value" : "*" | |
}, { | |
"name" : "dataFormatConfig.charset", | |
"value" : "UTF-8" | |
}, { | |
"name" : "dataFormatConfig.removeCtrlChars", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.textMaxLineLen", | |
"value" : 1024 | |
}, { | |
"name" : "dataFormatConfig.useCustomDelimiter", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.customDelimiter", | |
"value" : "\\r\\n" | |
}, { | |
"name" : "dataFormatConfig.includeCustomDelimiterInTheText", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.jsonContent", | |
"value" : "MULTIPLE_OBJECTS" | |
}, { | |
"name" : "dataFormatConfig.jsonMaxObjectLen", | |
"value" : 4096 | |
}, { | |
"name" : "dataFormatConfig.csvFileFormat", | |
"value" : "CSV" | |
}, { | |
"name" : "dataFormatConfig.csvHeader", | |
"value" : "NO_HEADER" | |
}, { | |
"name" : "dataFormatConfig.csvMaxObjectLen", | |
"value" : 1024 | |
}, { | |
"name" : "dataFormatConfig.csvCustomDelimiter", | |
"value" : "|" | |
}, { | |
"name" : "dataFormatConfig.csvCustomEscape", | |
"value" : "\\" | |
}, { | |
"name" : "dataFormatConfig.csvCustomQuote", | |
"value" : "\"" | |
}, { | |
"name" : "dataFormatConfig.csvEnableComments", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.csvCommentMarker", | |
"value" : "#" | |
}, { | |
"name" : "dataFormatConfig.csvIgnoreEmptyLines", | |
"value" : true | |
}, { | |
"name" : "dataFormatConfig.csvRecordType", | |
"value" : "LIST_MAP" | |
}, { | |
"name" : "dataFormatConfig.csvSkipStartLines", | |
"value" : 0 | |
}, { | |
"name" : "dataFormatConfig.parseNull", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.nullConstant", | |
"value" : "\\\\N" | |
}, { | |
"name" : "dataFormatConfig.xmlRecordElement", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.xPathNamespaceContext", | |
"value" : [ ] | |
}, { | |
"name" : "dataFormatConfig.xmlMaxObjectLen", | |
"value" : 4096 | |
}, { | |
"name" : "dataFormatConfig.logMode", | |
"value" : "COMMON_LOG_FORMAT" | |
}, { | |
"name" : "dataFormatConfig.logMaxObjectLen", | |
"value" : 1024 | |
}, { | |
"name" : "dataFormatConfig.retainOriginalLine", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.customLogFormat", | |
"value" : "%h %l %u %t \"%r\" %>s %b" | |
}, { | |
"name" : "dataFormatConfig.regex", | |
"value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)" | |
}, { | |
"name" : "dataFormatConfig.fieldPathsToGroupName", | |
"value" : [ { | |
"fieldPath" : "/", | |
"group" : 1 | |
} ] | |
}, { | |
"name" : "dataFormatConfig.grokPatternDefinition", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.grokPattern", | |
"value" : "%{COMMONAPACHELOG}" | |
}, { | |
"name" : "dataFormatConfig.onParseError", | |
"value" : "ERROR" | |
}, { | |
"name" : "dataFormatConfig.maxStackTraceLines", | |
"value" : 50 | |
}, { | |
"name" : "dataFormatConfig.enableLog4jCustomLogFormat", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.log4jCustomLogFormat", | |
"value" : "%r [%t] %-5p %c %x - %m%n" | |
}, { | |
"name" : "dataFormatConfig.avroSchemaSource", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.avroSchema", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.schemaRegistryUrls", | |
"value" : [ ] | |
}, { | |
"name" : "dataFormatConfig.schemaLookupMode", | |
"value" : "SUBJECT" | |
}, { | |
"name" : "dataFormatConfig.subject", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.schemaId", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.protoDescriptorFile", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.messageType", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.isDelimited", | |
"value" : true | |
}, { | |
"name" : "dataFormatConfig.binaryMaxObjectLen", | |
"value" : 1024 | |
}, { | |
"name" : "dataFormatConfig.datagramMode", | |
"value" : "SYSLOG" | |
}, { | |
"name" : "dataFormatConfig.typesDbPath", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.convertTime", | |
"value" : false | |
}, { | |
"name" : "dataFormatConfig.excludeInterval", | |
"value" : true | |
}, { | |
"name" : "dataFormatConfig.authFilePath", | |
"value" : null | |
}, { | |
"name" : "dataFormatConfig.wholeFileMaxObjectLen", | |
"value" : 8192 | |
}, { | |
"name" : "dataFormatConfig.rateLimit", | |
"value" : "-1" | |
}, { | |
"name" : "dataFormatConfig.verifyChecksum", | |
"value" : false | |
}, { | |
"name" : "rawData", | |
"value" : "{\n \"status\": 0,\n \"results\": [\n {\n \"name\": \"StreamSets\",\n \"address\" : {\n \"street\": \"2 Bryant St\",\n \"city\": \"San Francisco\",\n \"state\": \"CA\",\n \"zip\": \"94105\"\n },\n \"phone\": \"(415) 851-1018\"\n },\n {\n \"name\": \"Salesforce\",\n \"address\" : {\n \"street\": \"1 Market St\",\n \"city\": \"San Francisco\",\n \"state\": \"CA\",\n \"zip\": \"94105\"\n },\n \"phone\": \"(415) 901-7000\"\n }\n ]\n}" | |
}, { | |
"name" : "stageOnRecordError", | |
"value" : "TO_ERROR" | |
} ], | |
"uiInfo" : { | |
"description" : "", | |
"label" : "Dev Raw Data Source 1", | |
"xPos" : 60, | |
"yPos" : 50, | |
"stageType" : "SOURCE" | |
}, | |
"inputLanes" : [ ], | |
"outputLanes" : [ "DevRawDataSource_01OutputLane14919318303350" ], | |
"eventLanes" : [ ] | |
}, { | |
"instanceName" : "GroovyEvaluator_01", | |
"library" : "streamsets-datacollector-groovy_2_4-lib", | |
"stageName" : "com_streamsets_pipeline_stage_processor_groovy_GroovyDProcessor", | |
"stageVersion" : "1", | |
"configuration" : [ { | |
"name" : "processingMode", | |
"value" : "BATCH" | |
}, { | |
"name" : "script", | |
"value" : "for (record in records) {\n try {\n // Keep a counter for source ids\n i = 0\n // Iterate through the results list\n for (result in record.value['results']) {\n // Flatten + rename the address fields in one step\n result['address'].each{ k, v -> result[k] = v }\n result.remove('address')\n // Split the street field\n parts = result['street'].split(/\\s+/, 2)\n result['street_number'] = parts[0]\n result['street_name'] = parts[1]\n result.remove('street')\n // Create a new record with the result\n outRecord = sdcFunctions.createRecord(record.sourceId + '::' + i)\n outRecord.value = result\n // Write it to the output\n\t output.write(outRecord)\n i++\n }\n } catch (e) {\n // Write a record to the error pipeline\n log.error(e.toString(), e)\n error.write(record, e.toString())\n }\n}" | |
}, { | |
"name" : "invokeDynamic", | |
"value" : false | |
}, { | |
"name" : "stageOnRecordError", | |
"value" : "TO_ERROR" | |
}, { | |
"name" : "stageRequiredFields", | |
"value" : [ ] | |
}, { | |
"name" : "stageRecordPreconditions", | |
"value" : [ ] | |
} ], | |
"uiInfo" : { | |
"description" : "", | |
"label" : "Groovy Evaluator 1", | |
"xPos" : 280, | |
"yPos" : 50, | |
"stageType" : "PROCESSOR" | |
}, | |
"inputLanes" : [ "DevRawDataSource_01OutputLane14919318303350" ], | |
"outputLanes" : [ "GroovyEvaluator_01OutputLane14919492774480" ], | |
"eventLanes" : [ ] | |
}, { | |
"instanceName" : "Trash_01", | |
"library" : "streamsets-datacollector-basic-lib", | |
"stageName" : "com_streamsets_pipeline_stage_destination_devnull_NullDTarget", | |
"stageVersion" : "1", | |
"configuration" : [ ], | |
"uiInfo" : { | |
"description" : "", | |
"label" : "Trash 1", | |
"xPos" : 500, | |
"yPos" : 50, | |
"stageType" : "TARGET" | |
}, | |
"inputLanes" : [ "GroovyEvaluator_01OutputLane14919492774480" ], | |
"outputLanes" : [ ], | |
"eventLanes" : [ ] | |
} ], | |
"errorStage" : { | |
"instanceName" : "Discard_ErrorStage", | |
"library" : "streamsets-datacollector-basic-lib", | |
"stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget", | |
"stageVersion" : "1", | |
"configuration" : [ ], | |
"uiInfo" : { | |
"description" : "", | |
"label" : "Error Records - Discard", | |
"xPos" : 60, | |
"yPos" : 50, | |
"stageType" : "TARGET" | |
}, | |
"inputLanes" : [ ], | |
"outputLanes" : [ ], | |
"eventLanes" : [ ] | |
}, | |
"info" : { | |
"name" : "7f3c6fd9-17cc-49de-bc2c-3375c4b5540b", | |
"title" : "Field Manipulations - Groovy", | |
"description" : "", | |
"created" : 1491949147714, | |
"lastModified" : 1491950456047, | |
"creator" : "admin", | |
"lastModifier" : "admin", | |
"lastRev" : "0", | |
"uuid" : "1fba4dbe-5855-4765-92ec-722fe832405a", | |
"valid" : true, | |
"metadata" : { | |
"labels" : [ ] | |
} | |
}, | |
"metadata" : { | |
"labels" : [ ] | |
}, | |
"statsAggregatorStage" : null, | |
"previewable" : true, | |
"issues" : { | |
"stageIssues" : { }, | |
"pipelineIssues" : [ ], | |
"issueCount" : 0 | |
}, | |
"valid" : true | |
}, | |
"pipelineRules" : { | |
"metricsRuleDefinitions" : [ { | |
"id" : "badRecordsAlertID", | |
"alertText" : "High incidence of Error Records", | |
"metricId" : "pipeline.batchErrorRecords.counter", | |
"metricType" : "COUNTER", | |
"metricElement" : "COUNTER_COUNT", | |
"condition" : "${value() > 100}", | |
"sendEmail" : false, | |
"enabled" : false, | |
"timestamp" : 1491931813412, | |
"valid" : true | |
}, { | |
"id" : "stageErrorAlertID", | |
"alertText" : "High incidence of Stage Errors", | |
"metricId" : "pipeline.batchErrorMessages.counter", | |
"metricType" : "COUNTER", | |
"metricElement" : "COUNTER_COUNT", | |
"condition" : "${value() > 100}", | |
"sendEmail" : false, | |
"enabled" : false, | |
"timestamp" : 1491931813412, | |
"valid" : true | |
}, { | |
"id" : "idleGaugeID", | |
"alertText" : "Pipeline is Idle", | |
"metricId" : "RuntimeStatsGauge.gauge", | |
"metricType" : "GAUGE", | |
"metricElement" : "TIME_OF_LAST_RECEIVED_RECORD", | |
"condition" : "${time:now() - value() > 120000}", | |
"sendEmail" : false, | |
"enabled" : false, | |
"timestamp" : 1491931813412, | |
"valid" : true | |
}, { | |
"id" : "batchTimeAlertID", | |
"alertText" : "Batch taking more time to process", | |
"metricId" : "RuntimeStatsGauge.gauge", | |
"metricType" : "GAUGE", | |
"metricElement" : "CURRENT_BATCH_AGE", | |
"condition" : "${value() > 200}", | |
"sendEmail" : false, | |
"enabled" : false, | |
"timestamp" : 1491931813412, | |
"valid" : true | |
}, { | |
"id" : "memoryLimitAlertID", | |
"alertText" : "Memory limit for pipeline exceeded", | |
"metricId" : "pipeline.memoryConsumed.counter", | |
"metricType" : "COUNTER", | |
"metricElement" : "COUNTER_COUNT", | |
"condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}", | |
"sendEmail" : false, | |
"enabled" : false, | |
"timestamp" : 1491931813412, | |
"valid" : true | |
} ], | |
"dataRuleDefinitions" : [ ], | |
"driftRuleDefinitions" : [ ], | |
"emailIds" : [ ], | |
"uuid" : "b09d5150-fbe9-4ebe-bdd7-ffe82eb9af8f", | |
"ruleIssues" : [ ] | |
}, | |
"libraryDefinitions" : null | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment