Skip to content

Instantly share code, notes, and snippets.

Created April 11, 2017 22:58
Show Gist options
  • Save metadaddy/3e2b7447b2eaa3c6c117a1a4e00f0373 to your computer and use it in GitHub Desktop.
Save metadaddy/3e2b7447b2eaa3c6c117a1a4e00f0373 to your computer and use it in GitHub Desktop.
Manipulate fields in StreamSets Data Collector via Groovy
"pipelineConfig" : {
"schemaVersion" : 2,
"version" : 5,
"uuid" : "1fba4dbe-5855-4765-92ec-722fe832405a",
"title" : "Field Manipulations - Groovy",
"description" : "",
"configuration" : [ {
"name" : "executionMode",
"value" : "STANDALONE"
}, {
"name" : "deliveryGuarantee",
"value" : "AT_LEAST_ONCE"
}, {
"name" : "shouldRetry",
"value" : true
}, {
"name" : "retryAttempts",
"value" : -1
}, {
"name" : "memoryLimit",
"value" : "${jvm:maxMemoryMB() * 0.65}"
}, {
"name" : "memoryLimitExceeded",
"value" : "STOP_PIPELINE"
}, {
"name" : "notifyOnStates",
"value" : [ "RUN_ERROR", "STOPPED", "FINISHED" ]
}, {
"name" : "emailIDs",
"value" : [ ]
}, {
"name" : "constants",
"value" : [ ]
}, {
"name" : "badRecordsHandling",
"value" : "streamsets-datacollector-basic-lib::com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget::1"
}, {
"name" : "clusterSlaveMemory",
"value" : 1024
}, {
"name" : "clusterSlaveJavaOpts",
"value" : "-XX:PermSize=128M -XX:MaxPermSize=256M -Dhttps.protocols=TLSv1.2,TLSv1.1 -Dlog4j.debug"
}, {
"name" : "clusterLauncherEnv",
"value" : [ ]
}, {
"name" : "mesosDispatcherURL",
"value" : null
}, {
"name" : "hdfsS3ConfDir",
"value" : null
}, {
"name" : "rateLimit",
"value" : 0
}, {
"name" : "statsAggregatorStage",
"value" : ""
} ],
"uiInfo" : {
"previewConfig" : {
"previewSource" : "CONFIGURED_SOURCE",
"batchSize" : 10,
"timeout" : 10000,
"writeToDestinations" : false,
"showHeader" : false,
"showFieldType" : true,
"rememberMe" : false
"stages" : [ {
"instanceName" : "DevRawDataSource_01",
"library" : "streamsets-datacollector-dev-lib",
"stageName" : "com_streamsets_pipeline_stage_devtest_rawdata_RawDataDSource",
"stageVersion" : "2",
"configuration" : [ {
"name" : "dataFormat",
"value" : "JSON"
}, {
"name" : "dataFormatConfig.compression",
"value" : "NONE"
}, {
"name" : "dataFormatConfig.filePatternInArchive",
"value" : "*"
}, {
"name" : "dataFormatConfig.charset",
"value" : "UTF-8"
}, {
"name" : "dataFormatConfig.removeCtrlChars",
"value" : false
}, {
"name" : "dataFormatConfig.textMaxLineLen",
"value" : 1024
}, {
"name" : "dataFormatConfig.useCustomDelimiter",
"value" : false
}, {
"name" : "dataFormatConfig.customDelimiter",
"value" : "\\r\\n"
}, {
"name" : "dataFormatConfig.includeCustomDelimiterInTheText",
"value" : false
}, {
"name" : "dataFormatConfig.jsonContent",
}, {
"name" : "dataFormatConfig.jsonMaxObjectLen",
"value" : 4096
}, {
"name" : "dataFormatConfig.csvFileFormat",
"value" : "CSV"
}, {
"name" : "dataFormatConfig.csvHeader",
"value" : "NO_HEADER"
}, {
"name" : "dataFormatConfig.csvMaxObjectLen",
"value" : 1024
}, {
"name" : "dataFormatConfig.csvCustomDelimiter",
"value" : "|"
}, {
"name" : "dataFormatConfig.csvCustomEscape",
"value" : "\\"
}, {
"name" : "dataFormatConfig.csvCustomQuote",
"value" : "\""
}, {
"name" : "dataFormatConfig.csvEnableComments",
"value" : false
}, {
"name" : "dataFormatConfig.csvCommentMarker",
"value" : "#"
}, {
"name" : "dataFormatConfig.csvIgnoreEmptyLines",
"value" : true
}, {
"name" : "dataFormatConfig.csvRecordType",
"value" : "LIST_MAP"
}, {
"name" : "dataFormatConfig.csvSkipStartLines",
"value" : 0
}, {
"name" : "dataFormatConfig.parseNull",
"value" : false
}, {
"name" : "dataFormatConfig.nullConstant",
"value" : "\\\\N"
}, {
"name" : "dataFormatConfig.xmlRecordElement",
"value" : null
}, {
"name" : "dataFormatConfig.xPathNamespaceContext",
"value" : [ ]
}, {
"name" : "dataFormatConfig.xmlMaxObjectLen",
"value" : 4096
}, {
"name" : "dataFormatConfig.logMode",
}, {
"name" : "dataFormatConfig.logMaxObjectLen",
"value" : 1024
}, {
"name" : "dataFormatConfig.retainOriginalLine",
"value" : false
}, {
"name" : "dataFormatConfig.customLogFormat",
"value" : "%h %l %u %t \"%r\" %>s %b"
}, {
"name" : "dataFormatConfig.regex",
"value" : "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+) (\\S+)\" (\\d{3}) (\\d+)"
}, {
"name" : "dataFormatConfig.fieldPathsToGroupName",
"value" : [ {
"fieldPath" : "/",
"group" : 1
} ]
}, {
"name" : "dataFormatConfig.grokPatternDefinition",
"value" : null
}, {
"name" : "dataFormatConfig.grokPattern",
"value" : "%{COMMONAPACHELOG}"
}, {
"name" : "dataFormatConfig.onParseError",
"value" : "ERROR"
}, {
"name" : "dataFormatConfig.maxStackTraceLines",
"value" : 50
}, {
"name" : "dataFormatConfig.enableLog4jCustomLogFormat",
"value" : false
}, {
"name" : "dataFormatConfig.log4jCustomLogFormat",
"value" : "%r [%t] %-5p %c %x - %m%n"
}, {
"name" : "dataFormatConfig.avroSchemaSource",
"value" : null
}, {
"name" : "dataFormatConfig.avroSchema",
"value" : null
}, {
"name" : "dataFormatConfig.schemaRegistryUrls",
"value" : [ ]
}, {
"name" : "dataFormatConfig.schemaLookupMode",
"value" : "SUBJECT"
}, {
"name" : "dataFormatConfig.subject",
"value" : null
}, {
"name" : "dataFormatConfig.schemaId",
"value" : null
}, {
"name" : "dataFormatConfig.protoDescriptorFile",
"value" : null
}, {
"name" : "dataFormatConfig.messageType",
"value" : null
}, {
"name" : "dataFormatConfig.isDelimited",
"value" : true
}, {
"name" : "dataFormatConfig.binaryMaxObjectLen",
"value" : 1024
}, {
"name" : "dataFormatConfig.datagramMode",
"value" : "SYSLOG"
}, {
"name" : "dataFormatConfig.typesDbPath",
"value" : null
}, {
"name" : "dataFormatConfig.convertTime",
"value" : false
}, {
"name" : "dataFormatConfig.excludeInterval",
"value" : true
}, {
"name" : "dataFormatConfig.authFilePath",
"value" : null
}, {
"name" : "dataFormatConfig.wholeFileMaxObjectLen",
"value" : 8192
}, {
"name" : "dataFormatConfig.rateLimit",
"value" : "-1"
}, {
"name" : "dataFormatConfig.verifyChecksum",
"value" : false
}, {
"name" : "rawData",
"value" : "{\n \"status\": 0,\n \"results\": [\n {\n \"name\": \"StreamSets\",\n \"address\" : {\n \"street\": \"2 Bryant St\",\n \"city\": \"San Francisco\",\n \"state\": \"CA\",\n \"zip\": \"94105\"\n },\n \"phone\": \"(415) 851-1018\"\n },\n {\n \"name\": \"Salesforce\",\n \"address\" : {\n \"street\": \"1 Market St\",\n \"city\": \"San Francisco\",\n \"state\": \"CA\",\n \"zip\": \"94105\"\n },\n \"phone\": \"(415) 901-7000\"\n }\n ]\n}"
}, {
"name" : "stageOnRecordError",
"value" : "TO_ERROR"
} ],
"uiInfo" : {
"description" : "",
"label" : "Dev Raw Data Source 1",
"xPos" : 60,
"yPos" : 50,
"stageType" : "SOURCE"
"inputLanes" : [ ],
"outputLanes" : [ "DevRawDataSource_01OutputLane14919318303350" ],
"eventLanes" : [ ]
}, {
"instanceName" : "GroovyEvaluator_01",
"library" : "streamsets-datacollector-groovy_2_4-lib",
"stageName" : "com_streamsets_pipeline_stage_processor_groovy_GroovyDProcessor",
"stageVersion" : "1",
"configuration" : [ {
"name" : "processingMode",
"value" : "BATCH"
}, {
"name" : "script",
"value" : "for (record in records) {\n try {\n // Keep a counter for source ids\n i = 0\n // Iterate through the results list\n for (result in record.value['results']) {\n // Flatten + rename the address fields in one step\n result['address'].each{ k, v -> result[k] = v }\n result.remove('address')\n // Split the street field\n parts = result['street'].split(/\\s+/, 2)\n result['street_number'] = parts[0]\n result['street_name'] = parts[1]\n result.remove('street')\n // Create a new record with the result\n outRecord = sdcFunctions.createRecord(record.sourceId + '::' + i)\n outRecord.value = result\n // Write it to the output\n\t output.write(outRecord)\n i++\n }\n } catch (e) {\n // Write a record to the error pipeline\n log.error(e.toString(), e)\n error.write(record, e.toString())\n }\n}"
}, {
"name" : "invokeDynamic",
"value" : false
}, {
"name" : "stageOnRecordError",
"value" : "TO_ERROR"
}, {
"name" : "stageRequiredFields",
"value" : [ ]
}, {
"name" : "stageRecordPreconditions",
"value" : [ ]
} ],
"uiInfo" : {
"description" : "",
"label" : "Groovy Evaluator 1",
"xPos" : 280,
"yPos" : 50,
"stageType" : "PROCESSOR"
"inputLanes" : [ "DevRawDataSource_01OutputLane14919318303350" ],
"outputLanes" : [ "GroovyEvaluator_01OutputLane14919492774480" ],
"eventLanes" : [ ]
}, {
"instanceName" : "Trash_01",
"library" : "streamsets-datacollector-basic-lib",
"stageName" : "com_streamsets_pipeline_stage_destination_devnull_NullDTarget",
"stageVersion" : "1",
"configuration" : [ ],
"uiInfo" : {
"description" : "",
"label" : "Trash 1",
"xPos" : 500,
"yPos" : 50,
"stageType" : "TARGET"
"inputLanes" : [ "GroovyEvaluator_01OutputLane14919492774480" ],
"outputLanes" : [ ],
"eventLanes" : [ ]
} ],
"errorStage" : {
"instanceName" : "Discard_ErrorStage",
"library" : "streamsets-datacollector-basic-lib",
"stageName" : "com_streamsets_pipeline_stage_destination_devnull_ToErrorNullDTarget",
"stageVersion" : "1",
"configuration" : [ ],
"uiInfo" : {
"description" : "",
"label" : "Error Records - Discard",
"xPos" : 60,
"yPos" : 50,
"stageType" : "TARGET"
"inputLanes" : [ ],
"outputLanes" : [ ],
"eventLanes" : [ ]
"info" : {
"name" : "7f3c6fd9-17cc-49de-bc2c-3375c4b5540b",
"title" : "Field Manipulations - Groovy",
"description" : "",
"created" : 1491949147714,
"lastModified" : 1491950456047,
"creator" : "admin",
"lastModifier" : "admin",
"lastRev" : "0",
"uuid" : "1fba4dbe-5855-4765-92ec-722fe832405a",
"valid" : true,
"metadata" : {
"labels" : [ ]
"metadata" : {
"labels" : [ ]
"statsAggregatorStage" : null,
"previewable" : true,
"issues" : {
"stageIssues" : { },
"pipelineIssues" : [ ],
"issueCount" : 0
"valid" : true
"pipelineRules" : {
"metricsRuleDefinitions" : [ {
"id" : "badRecordsAlertID",
"alertText" : "High incidence of Error Records",
"metricId" : "pipeline.batchErrorRecords.counter",
"metricType" : "COUNTER",
"metricElement" : "COUNTER_COUNT",
"condition" : "${value() > 100}",
"sendEmail" : false,
"enabled" : false,
"timestamp" : 1491931813412,
"valid" : true
}, {
"id" : "stageErrorAlertID",
"alertText" : "High incidence of Stage Errors",
"metricId" : "pipeline.batchErrorMessages.counter",
"metricType" : "COUNTER",
"metricElement" : "COUNTER_COUNT",
"condition" : "${value() > 100}",
"sendEmail" : false,
"enabled" : false,
"timestamp" : 1491931813412,
"valid" : true
}, {
"id" : "idleGaugeID",
"alertText" : "Pipeline is Idle",
"metricId" : "RuntimeStatsGauge.gauge",
"metricType" : "GAUGE",
"condition" : "${time:now() - value() > 120000}",
"sendEmail" : false,
"enabled" : false,
"timestamp" : 1491931813412,
"valid" : true
}, {
"id" : "batchTimeAlertID",
"alertText" : "Batch taking more time to process",
"metricId" : "RuntimeStatsGauge.gauge",
"metricType" : "GAUGE",
"metricElement" : "CURRENT_BATCH_AGE",
"condition" : "${value() > 200}",
"sendEmail" : false,
"enabled" : false,
"timestamp" : 1491931813412,
"valid" : true
}, {
"id" : "memoryLimitAlertID",
"alertText" : "Memory limit for pipeline exceeded",
"metricId" : "pipeline.memoryConsumed.counter",
"metricType" : "COUNTER",
"metricElement" : "COUNTER_COUNT",
"condition" : "${value() > (jvm:maxMemoryMB() * 0.65)}",
"sendEmail" : false,
"enabled" : false,
"timestamp" : 1491931813412,
"valid" : true
} ],
"dataRuleDefinitions" : [ ],
"driftRuleDefinitions" : [ ],
"emailIds" : [ ],
"uuid" : "b09d5150-fbe9-4ebe-bdd7-ffe82eb9af8f",
"ruleIssues" : [ ]
"libraryDefinitions" : null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment