Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save onefoursix/9af687e6205e371dad9451d62600b729 to your computer and use it in GitHub Desktop.
Save onefoursix/9af687e6205e371dad9451d62600b729 to your computer and use it in GitHub Desktop.
Python-based StreamSets REST API script that get SDC CPU and memory metrics as well as running pipeline record counts
#!/usr/bin/env python
'''
This script writes a continuous stream of CPU and Memory metrics for a given SDC
as well as counts of all running pipelines' input records, output records, and error records.
The script uses StreamSets Platform's REST API to pull metrics directly from SDCs; it does not connect to Control Hub
On each refresh interval, the script will record CPU and memory metrics and then the record counts for each running pipeline.
Sample output looks like this:
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111", "system_cpu_load": 9,
"heap_max": 4216455168, "heap_used": 1132732984, "heap_percentage": 26}
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111",
"pipeline_id": "Weatherto__0dd0db12-7686-4974-bada-b51ade96c695__8030c2e9-1a39-11ec-a5fe-97c8d4369386",
"input_records": 24414, "output_records": 24414, "error_records": 0}
{"timestamp": "2023-09-17 22:31:19", "sdc_url": "https://sequoia.onefoursix.com:11111",
"pipeline_id": "GetWeathe__d18bc85d-659f-45b5-b8cb-27ea7c035689__8030c2e9-1a39-11ec-a5fe-97c8d4369386",
"input_records": 5881, "output_records": 5881, "error_records": 0}
Prerequisites:
- Python 3.6+
- DataOps Platform API Credentials for a user with permissions to read SDC metrics
Set the following variables in the script:
# Cred ID
cred_id = '<your cred id>'
# Cred Token
cred_token = '<your cred token>'
# How often to get metrics
refresh_seconds = 60
Execute the script by passing the name of the SDC to monitor and the name of the log file the script
should write to as arguments, like this:
$ python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /Users/mark/data/logs/sdc1.log
If you need to use a custom cacerts file for Python Requests, set and export the environment
variable REQUESTS_CA_BUNDLE before running this script, like this:
$ export REQUESTS_CA_BUNDLE=/path/to/ca-certificates.pem
You can launch multiple instances of this script to run asychronously, to monitor multiple SDCs, with each SDC's
metrics written to its own log file, by creating a shell script named get-sdc-metrics.sh like this:
#!/usr/bin/env bash
nohup python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /logs/sdc1.log &
nohup python get-sdc-metrics.py https://sdc2.onefoursix.com:11119 /logs/sdc2.log &
nohup python get-sdc-metrics.py https://sdc3.onefoursix.com:11119 /logs/sdc3.log &
Launch that shell script as a background process:
$ nohup ./get-sdc-metrics.sh &
and you should see one log file being written for each line in the shell script.
'''
# Imports
import requests, json, datetime, time, sys
## User variables #############
# Cred ID
cred_id = ''
# Cred Token
cred_token = ''
# How often to get metrics
refresh_seconds = 60
## End of User variables #############
# Check command line args
if len(sys.argv) != 3:
print('Error: wrong number of arguments')
print('Usage: $ python get-sdc-metrics.py <SDC_URL> </path/to/log_file>')
print('For example: $ python get-sdc-metrics.py https://sdc1.onefoursix.com:11111 /logs/sdc1.log')
sys.exit(1)
sdc_url = sys.argv[1]
log_file = sys.argv[2]
def log_message(message):
log.write(message + '\n')
log.flush()
with open(log_file, "a") as log:
# Ignore pipelines in these states as they are not running
pipeline_states_to_exclude = ['EDITED', 'FINISHED', 'RUN_ERROR', 'STOPPED']
# Create a session
s = requests.Session()
# Set session headers
s.headers.update({
'Content-Type': 'application/json',
'X-Requested-By': 'sdc',
'X-SS-REST-CALL': 'true',
'X-SS-App-Component-Id': cred_id,
'X-SS-App-Auth-Token': cred_token})
# SDC JMX URL
sdc_jmx_url = sdc_url + '/rest/v1/system/jmx'
# URL to get pipeline status
sdc_pipelines_status_url = sdc_url + '/rest/v1/pipelines/status'
# Loop forever
while (True):
start_loop_seconds = time.time()
# Get SDC's CPU and Memory Metrics
try:
system_cpu_load = None
heap_percentage = None
metrics = s.get(sdc_jmx_url).json()
for bean in metrics['beans']:
if bean['name'] == 'java.lang:type=OperatingSystem':
system_cpu_load = int(bean['SystemCpuLoad'] * 100)
elif bean['name'] == 'java.lang:type=Memory':
heap_max = bean['HeapMemoryUsage']['max']
heap_used = bean['HeapMemoryUsage']['used']
heap_percentage = int((heap_used / heap_max) * 100)
if system_cpu_load is not None and heap_percentage is not None:
break
# Log the CPU and memory metrics
if system_cpu_load is not None and heap_percentage is not None:
data = {}
data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data['sdc_url'] = sdc_url
data['system_cpu_load'] = system_cpu_load
data['heap_max'] = heap_max
data['heap_used'] = heap_used
data['heap_percentage'] = heap_percentage
log_message(json.dumps(data))
else:
log_message('Error getting CPU and memory metrics for SDC at ' + sdc_url)
except Exception as e:
log_message('Error getting CPU and memory metrics for SDC at ' + sdc_url)
log_message(str(e))
# List of pipelines to get metrics for
pipelines = []
try:
# Get all pipelines
all_pipelines = s.get(sdc_pipelines_status_url).json()
# Filter out pipelines that are not running
for key in all_pipelines.keys():
pipeline = all_pipelines[key]
if pipeline['status'] not in pipeline_states_to_exclude:
pipelines.append(pipeline)
# For each running pipeline
for pipeline in pipelines:
try:
# Get metrics for pipeline
pipeline_id = pipeline['pipelineId']
pipeline_metrics_url = sdc_url + '/rest/v1/pipeline/' + pipeline_id + '/metrics?rev=0'
metrics = s.get(pipeline_metrics_url).json()
# Log the metrics for the pipeline
data = {}
data['timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data['sdc_url'] = sdc_url
data['pipeline_id'] = pipeline_id
counters = metrics['counters']
data['input_records'] = counters['pipeline.batchInputRecords.counter']['count']
data['output_records'] = counters['pipeline.batchOutputRecords.counter']['count']
data['error_records'] = counters['pipeline.batchErrorRecords.counter']['count']
log_message(json.dumps(data))
except Exception as e:
log_message('Error getting metrics for pipeline ' + pipeline['pipelineId'] + ' on SDC ' + sdc_url)
log_message(str(e))
except Exception as e:
log_message('Error connecting to SDC at ' + sdc_url)
log_message(str(e))
finally:
# Sleep
loop_time = time.time() - start_loop_seconds
if loop_time < refresh_seconds:
time.sleep(refresh_seconds - loop_time)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment