Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save onefoursix/bd163ac4594d317881fc1f9362c2b3a2 to your computer and use it in GitHub Desktop.
Save onefoursix/bd163ac4594d317881fc1f9362c2b3a2 to your computer and use it in GitHub Desktop.
streamsets_job_metrics.py
import os
from datetime import datetime
from time import time
import sys
from streamsets.sdk import ControlHub
import json
# Set to true to echo the metrics to stdout
print_metrics = True
output_file = '/Users/mark/Desktop/streamsets_job_metrics.json'
# Control Hub Credentials
cred_id = ''
cred_token = ''
def print_usage_and_exit():
print('Usage: $ python3 streamsets_metrics_to_snowflake.py <lookback_minutes>')
print('Usage Example: $ python3 streamsets_metrics_to_snowflake.py 60')
sys.exit(1)
def convert_millis_to_datetime_string(millis):
return datetime.fromtimestamp(millis).strftime("%Y-%m-%d %H:%M:%S")
# Check the number of command line args
if len(sys.argv) != 2:
print('Error: Wrong number of arguments')
print_usage_and_exit()
# Validate the command line args
lookback_minutes = sys.argv[1]
try:
lookback_minutes = int(lookback_minutes)
except ValueError as ve:
print('Error: lookback_minutes arg \'{}\' is not an integer'.format(lookback_minutes))
print_usage_and_exit()
# Current time
current_time_seconds = time()
# Starting time to look for Jobs
start_time_seconds = int(current_time_seconds - (lookback_minutes * 60))
start_time_millis = start_time_seconds * 1000
# Print the settings
print('-------------------------------------')
print('Current time is {}'
.format(convert_millis_to_datetime_string(current_time_seconds)))
print('Lookback minutes is {}'.format(lookback_minutes))
print('Will get metrics for Jobs started after {}'
.format(convert_millis_to_datetime_string(start_time_seconds)))
print('-------------------------------------')
# Connect to Control Hub
sch = None
try:
sch = ControlHub(
credential_id=cred_id,
token=cred_token)
except Exception as e:
print('Error connecting to Control Hub')
print(str(e))
sys.exit(1)
print ('Connected to Control Hub')
# Job runs to get metrics for
job_runs = []
# Get Job runs that were started after the lookback time
def get_run_metrics(job, job_run, metrics):
for m in metrics:
if m.run_count == job_run.run_count:
return m;
print('Error finding metrics for run #{} for Job {}'.format(job_run.run_count, job.job_name))
return None
for job in sch.jobs:
history = job.history
metrics = job.metrics
start_time = None
done = False
for job_run in history:
if (job_run.start_time >= start_time_millis ):
run= {}
run['ID'] = job.job_id
run['NAME'] = job.job_name
run['CREATETIME'] = job.created_on
run['LASTMODIFIEDON'] = job.last_modified_on
run['PIPELINENAME'] = job.pipeline_name
run['PIPELINECOMMITLABEL'] = job.commit_label
run['RUNCOUNT'] = job_run.run_count
run['STARTTIME'] = job_run.start_time
run['FINISHTIME'] = job_run.finish_time
run['ERRORMESSAGE'] = job_run.error_message
run['COLOR'] = job_run.color
run['STATUS'] = job_run.status
run_metrics = get_run_metrics(job, job_run, metrics)
if run_metrics is not None:
run['INPUTRECORDS'] = run_metrics.input_count
run['OUTPUTRECORDS'] = run_metrics.output_count
run['ERRORRECORDS'] = run_metrics.error_count
else:
run['INPUTRECORDS'] = -1
run['OUTPUTRECORDS'] = -1
run['ERRORRECORDS'] = -1
job_runs.append(run)
else:
break
with open(output_file, 'w', encoding='utf-8') as f:
for run in job_runs:
f.write(json.dumps(run) + '\n')
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment