Skip to content

Instantly share code, notes, and snippets.

@jster1357
Created October 31, 2022 19:27
Show Gist options
  • Save jster1357/0878a1fecb98c93414c1378583826a6a to your computer and use it in GitHub Desktop.
Save jster1357/0878a1fecb98c93414c1378583826a6a to your computer and use it in GitHub Desktop.
PipelineMetadataExtractionPipeline.json
{
"artifact": {
"name": "cdap-data-pipeline",
"version": "6.7.1",
"scope": "SYSTEM"
},
"description": "Data Pipeline Application",
"name": "getRunIDMetrics_v4",
"config": {
"resources": {
"memoryMB": 2048,
"virtualCores": 1
},
"driverResources": {
"memoryMB": 2048,
"virtualCores": 1
},
"connections": [
{
"from": "getRunDetails",
"to": "jobDetails"
},
{
"from": "jobDetails",
"to": "Joiner"
},
{
"from": "Joiner",
"to": "BigQuery"
},
{
"from": "getMetrics",
"to": "modifyMetrics"
},
{
"from": "modifyMetrics",
"to": "addMetadataFields"
},
{
"from": "addMetadataFields",
"to": "Joiner"
}
],
"comments": [],
"postActions": [],
"properties": {},
"processTimingEnabled": true,
"stageLoggingEnabled": false,
"stages": [
{
"name": "getRunDetails",
"plugin": {
"name": "HTTP",
"type": "batchsource",
"label": "getRunDetails",
"artifact": {
"name": "http-plugins",
"version": "1.2.2",
"scope": "USER"
},
"properties": {
"referenceName": "test",
"url": "${endpoint}/v3/namespaces/${namespace}/apps/${pipeline_name}/workflows/DataPipelineWorkflow/runs/${runid}",
"httpMethod": "GET",
"format": "text",
"oauth2Enabled": "true",
"httpErrorsHandling": "2..:Success,.*:Fail",
"errorHandling": "stopOnError",
"retryPolicy": "exponential",
"linearRetryInterval": "30",
"maxRetryDuration": "600",
"connectTimeout": "120",
"readTimeout": "120",
"paginationType": "None",
"waitTimeBetweenPages": "0",
"verifyHttps": "true",
"keystoreType": "Java KeyStore (JKS)",
"keystoreKeyAlgorithm": "SunX509",
"trustStoreType": "Java KeyStore (JKS)",
"trustStoreKeyAlgorithm": "SunX509",
"transportProtocols": "TLSv1.2",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
"authUrl": "https://accounts.google.com/o/oauth2/auth",
"tokenUrl": "https://accounts.google.com/o/oauth2/token",
"clientId": "[clientid]",
"clientSecret": "[client secret]",
"scopes": "https://www.googleapis.com/auth/cloud-platform",
"refreshToken": "[refresh token]"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
"id": "getRunDetails"
},
{
"name": "jobDetails",
"plugin": {
"name": "Wrangler",
"type": "transform",
"label": "jobDetails",
"artifact": {
"name": "wrangler-transform",
"version": "4.7.1",
"scope": "SYSTEM"
},
"properties": {
"field": "*",
"precondition": "false",
"directives": "parse-as-json :body 1\nrename body_runid runid\nrename body_starting job_start\nrename job_start job_starting\nrename body_start job_start\nrename body_end job_end\nrename body_status job_status\nrename body_properties job_properties\nrename body_cluster dataproc_cluster\nrename body_profile profile\nrename dataproc_cluster dataproc_cluster_status\nrename profile dataproc_profile\nset-type :job_properties string\nfind-and-replace :job_properties s/\\\\\"/\"/g\nfind-and-replace :job_properties s/\\}\\\"/\\}/g\nfind-and-replace :job_properties s/\\\"\\{/\\{/g\nparse-timestamp :job_starting 'seconds'\nparse-timestamp :job_start 'seconds'\nparse-timestamp :job_end 'seconds'\ndiff-date :job_start :job_starting :pipeline_starting_duration\ndiff-date :job_end :job_starting :pipeline_total_duration\ndiff-date :job_end :job_start :pipeline_runtime_duration\nset-column :pipeline_starting_duration pipeline_starting_duration / 1000\nset-column :pipeline_total_duration pipeline_total_duration / 1000\nset-column :pipeline_runtime_duration pipeline_runtime_duration / 1000\nset-type :dataproc_cluster_status string\nset-type :dataproc_profile string",
"on-error": "fail-pipeline",
"schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]}]}",
"workspaceId": "6652df20-a842-4944-9c9c-134ae558fe9c"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]}]}",
"inputSchema": [
{
"name": "getRunDetails",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}"
}
],
"id": "jobDetails"
},
{
"name": "Joiner",
"plugin": {
"name": "Joiner",
"type": "batchjoiner",
"label": "Joiner",
"artifact": {
"name": "core-plugins",
"version": "2.9.1",
"scope": "SYSTEM"
},
"properties": {
"selectedFields": "jobDetails.runid as runid,jobDetails.job_starting as job_starting,jobDetails.job_start as job_start,jobDetails.job_end as job_end,jobDetails.job_status as job_status,jobDetails.job_properties as job_properties,jobDetails.dataproc_cluster_status as dataproc_cluster_status,jobDetails.dataproc_profile as dataproc_profile,jobDetails.pipeline_starting_duration as pipeline_starting_duration,jobDetails.pipeline_total_duration as pipeline_total_duration,jobDetails.pipeline_runtime_duration as pipeline_runtime_duration,addMetadataFields.metricName as metricName,addMetadataFields.metricValue as metricValue,addMetadataFields.namespace as namespace,addMetadataFields.pipeline_name as pipeline_name",
"requiredInputs": "jobDetails,addMetadataFields",
"conditionType": "basic",
"joinKeys": "addMetadataFields.runid = jobDetails.runid",
"joinNullKeys": "true",
"distributionEnabled": "false",
"schema": "{\"type\":\"record\",\"name\":\"join.typeoutput\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]}]}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": "{\"type\":\"record\",\"name\":\"join.typeoutput\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]}]}"
}
],
"inputSchema": [
{
"name": "jobDetails",
"schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]}]}"
},
{
"name": "addMetadataFields",
"schema": "{\"type\":\"record\",\"name\":\"record.typeadded\",\"fields\":[{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]},{\"name\":\"runid\",\"type\":[\"string\",\"null\"]}]}"
}
],
"id": "Joiner"
},
{
"name": "BigQuery",
"plugin": {
"name": "BigQueryTable",
"type": "batchsink",
"label": "BigQuery",
"artifact": {
"name": "google-cloud",
"version": "0.20.1",
"scope": "SYSTEM"
},
"properties": {
"useConnection": "false",
"project": "auto-detect",
"serviceAccountType": "filePath",
"serviceFilePath": "auto-detect",
"referenceName": "BQGetMetrics",
"dataset": "demo_data",
"table": "df_metrics2",
"operation": "insert",
"truncateTable": "false",
"allowSchemaRelaxation": "true",
"location": "US",
"createPartitionedTable": "false",
"partitioningType": "NONE",
"schema": "{\"type\":\"record\",\"name\":\"join.typeoutput\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]}]}"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"join.typeoutput\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]}]}",
"inputSchema": [
{
"name": "Joiner",
"schema": "{\"type\":\"record\",\"name\":\"join.typeoutput\",\"fields\":[{\"name\":\"runid\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_starting\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_start\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_end\",\"type\":[{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},\"null\"]},{\"name\":\"job_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"job_properties\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_cluster_status\",\"type\":[\"string\",\"null\"]},{\"name\":\"dataproc_profile\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_starting_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_total_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"pipeline_runtime_duration\",\"type\":[\"long\",\"null\"]},{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]}]}"
}
],
"id": "BigQuery"
},
{
"name": "getMetrics",
"plugin": {
"name": "HTTP",
"type": "batchsource",
"label": "getMetrics",
"artifact": {
"name": "http-plugins",
"version": "1.2.2",
"scope": "USER"
},
"properties": {
"referenceName": "test",
"url": "${endpoint}/v3/metrics/query",
"httpMethod": "POST",
"requestBody": "{\n \"query\": {\n \"tags\": {\n \"namespace\": \"${namespace}\",\n \"app\": \"${pipeline_name}\",\n \"workflow\": \"DataPipelineWorkflow\",\n \"run\": \"${runid}\"\n },\n \"metrics\": [\n \"user.WriteGCS.records.in\",\n \"user.WriteGCS.records.out\",\n \"user.GCS.records.in\",\n \"user.GCS.records.out\", \n \"user.GCS.records.error\",\n \"user.WriteGCS.records.error\",\n \"user.GCS.process.time.total\",\n \"user.WriteGCS.process.time.total\",\n \"user.GCS.process.time.avg\", \n \"user.WriteGCS.process.time.avg\",\n \"user.GCS.process.time.max\",\n \"user.WriteGCS.process.time.max\",\n \"user.GCS.process.time.min\",\n \"user.WriteGCS.process.time.min\",\n \"user.GCS.process.time.stddev\",\n \"user.WriteGCS.process.time.stddev\"], \"timeRange\" : {\n \"aggregate\": true\n }\n }\n}",
"format": "text",
"oauth2Enabled": "true",
"httpErrorsHandling": "2..:Success,.*:Fail",
"errorHandling": "stopOnError",
"retryPolicy": "exponential",
"linearRetryInterval": "30",
"maxRetryDuration": "600",
"connectTimeout": "120",
"readTimeout": "120",
"paginationType": "None",
"waitTimeBetweenPages": "0",
"verifyHttps": "true",
"keystoreType": "Java KeyStore (JKS)",
"keystoreKeyAlgorithm": "SunX509",
"trustStoreType": "Java KeyStore (JKS)",
"trustStoreKeyAlgorithm": "SunX509",
"transportProtocols": "TLSv1.2",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
"authUrl": "https://accounts.google.com/o/oauth2/auth",
"tokenUrl": "https://accounts.google.com/o/oauth2/token",
"clientId": "[client id]",
"clientSecret": "[client secret]",
"scopes": "https://www.googleapis.com/auth/cloud-platform",
"refreshToken": "[refresh token]"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}",
"id": "getMetrics"
},
{
"name": "modifyMetrics",
"plugin": {
"name": "Wrangler",
"type": "transform",
"label": "modifyMetrics",
"artifact": {
"name": "wrangler-transform",
"version": "4.7.1",
"scope": "SYSTEM"
},
"properties": {
"field": "*",
"precondition": "false",
"directives": "parse-as-json :body 2\nparse-as-json :body_query_series 1\nparse-as-json :body_query_series 1\ndrop :body_query_series_grouping\nparse-as-json :body_query_series_data 1\nparse-as-json :body_query_series_data 1\ndrop :body_query_startTime\ndrop :body_query_endTime\ndrop :body_query_resolution\ndrop :body_query_series_data_time\nrename body_query_series_metricName metricName\nrename body_query_series_data_value metricValue",
"on-error": "fail-pipeline",
"schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]}]}",
"workspaceId": "15493673-4d71-4e57-a9d1-d741cbb4f3f6"
}
},
"outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]}]}",
"inputSchema": [
{
"name": "getMetrics",
"schema": "{\"type\":\"record\",\"name\":\"etlSchemaBody\",\"fields\":[{\"name\":\"body\",\"type\":\"string\"}]}"
}
],
"id": "modifyMetrics"
},
{
"name": "addMetadataFields",
"plugin": {
"name": "MultiFieldAdder",
"type": "transform",
"label": "addMetadataFields",
"artifact": {
"name": "field-adder",
"version": "2.2.0",
"scope": "USER"
},
"properties": {
"fieldValue": "runid:${runid},namespace:${namespace},pipeline_name:${pipeline_name}"
}
},
"outputSchema": [
{
"name": "etlSchemaBody",
"schema": "{\"type\":\"record\",\"name\":\"record.typeadded\",\"fields\":[{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"]},{\"name\":\"pipeline_name\",\"type\":[\"string\",\"null\"]},{\"name\":\"runid\",\"type\":[\"string\",\"null\"]}]}"
}
],
"inputSchema": [
{
"name": "modifyMetrics",
"schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"metricName\",\"type\":[\"string\",\"null\"]},{\"name\":\"metricValue\",\"type\":[\"long\",\"null\"]}]}"
}
],
"id": "addMetadataFields"
}
],
"schedule": "0 1 */1 * *",
"engine": "spark",
"numOfRecordsPreview": 100,
"rangeRecordsPreview": {
"min": 1,
"max": "5000"
},
"description": "Data Pipeline Application",
"maxConcurrentRuns": 1
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment