Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active August 28, 2023 05:55
Show Gist options
  • Save onefoursix/3704e6e09cd4157f2f8b72c91bbbf389 to your computer and use it in GitHub Desktop.
Save onefoursix/3704e6e09cd4157f2f8b72c91bbbf389 to your computer and use it in GitHub Desktop.
Python script to get pipeline and engine metrics for StreamSets platform
#!/usr/bin/env python
'''
This script writes a rolling log file that contains running Pipeline names and record counts,
along with SDC CPU usage and JVM heap memory metrics for all Data Collectors registered
with StreamSets Platform that match the specified set of Labels.
The script writes a sdc-resource-metrics.log as a rolling log file with the pipeline and SDC metrics
as well as a sdc-resource-metrics-messages-and-errors.log file that shows the SDCs that are discovered
and whether connections to them are successful.
Prerequisites:
- Python 3.6+
- StreamSets Platform SDK for Python v5 or v6
See: https://docs.streamsets.com/platform-sdk/latest/learn/installation.html
- Control Hub API Credentials
- Set the following variables in the script:
# Comma delimited list of SDC Engine Labels to get metrics for
target_engine_label_list = 'label1,label2,label3'
# The output directory to write metrics and logs to
output_dir = '/path/to/sdc-metrics'
# Control Hub API credentials
cred_id = '<your CRED ID>'
cred_token = '<your CRED TOKEN>'
# Control Hub URL
sch_url = 'https://na02.hub.streamsets.com'
# Set to True if WebSocket Communication is enabled
# Set to False if Direct REST APIs are used
websockets_enabled = False
# How often to capture SDC metrics
metrics_capture_interval_seconds = 5 * 60 # five minutes
# How often to query Control Hub to refresh the list of SDCs
sdc_list_refresh_seconds = 5 * 60 # five minutes
# Rolling Logfile config
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB
number_of_rolling_logfiles = 5
- Run the script
$ python get-sdc-metrics.py
- To run the script as a background process, launch the script using a command like this:
$ nohup python get-sdc-metrics.py > /dev/null 2>&1 &
- Sample metrics in the rolling log file looks like this:
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119",
"pipeline_name": "Get Weather Events", "pipeline_state": "RUNNING",
"input_record_count": 135, "output_record_count": 135, "error_record_count": 0,
"cpu_load_percentage": 46, "heap_memory_used": 523244216, "heap_memory_max": 32125353984,
"heap_memory_percentage": 1}
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119",
"pipeline_name": "Weather Raw to Refined", "pipeline_state": "RUNNING",
"input_record_count": 134, "output_record_count": 134, "error_record_count": 0,
"cpu_load_percentage": 33, "heap_memory_used": 591783112, "heap_memory_max": 32125353984,
"heap_memory_percentage": 1}
{"timestamp": "2023-08-27 22:12:37", "sdc_url": "https://sequoia.onefoursix.com:11119",
"pipeline_name": "Weather to S3", "pipeline_state": "RUNNING",
"input_record_count": 132, "output_record_count": 132, "error_record_count": 0,
"cpu_load_percentage": 28, "heap_memory_used": 653891480, "heap_memory_max": 32125353984,
"heap_memory_percentage": 2}
...
'''
# Imports
import os, sys, json, time, logging
from datetime import datetime
from streamsets.sdk import ControlHub, DataCollector
from logging.handlers import RotatingFileHandler
# Comma delimited list of SDC Engine Labels to get metrics for
target_engine_label_list = 'label1,label2,label3'
# The output directory to write metrics and logs to
output_dir = '/path/to/sdc-metrics'
# Control Hub API credentials
cred_id = '<your CRED ID>'
cred_token = '<your CRED TOKEN>'
# Control Hub URL
sch_url = 'https://na02.hub.streamsets.com'
# Set to True if WebSocket Communication is enabled
# Set to False if Direct REST APIs are used
websockets_enabled = False
# How often to capture SDC metrics
metrics_capture_interval_seconds = 5 * 60 # five minutes
# How often to query Control Hub to refresh the list of SDCs
sdc_list_refresh_seconds = 5 * 60 # five minutes
# Rolling Logfile config
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB
number_of_rolling_logfiles = 5
# Method to create a rolling log file
def create_rotating_log():
log_file_name = 'sdc-resource-metrics.log'
log_file = output_dir + '/' + log_file_name
logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(log_file, maxBytes=max_bytes_pre_log_file, backupCount=number_of_rolling_logfiles)
logger.addHandler(handler)
return logger
def log_message(message):
messages_and_errors_log.write(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message + '\n')
messages_and_errors_log.flush()
# Confirm the logging directory exists
if not os.path.isdir(output_dir):
log_event('Error: the directory \'' + output_dir + '\' does not exist')
log_event('Please create that directory in advance')
sys.exit(-1)
# Parse the label list
target_engine_labels = target_engine_label_list.split(',')
try:
if len(target_engine_labels) == 0:
log_message('Error: no engine labels provided')
sys.exit(-1)
except Exception as e:
log_message('Error parsing the engine label list')
log_message('Exception: ' + str(e))
sys.exit(-1)
# Open the event messages and errors log
messages_and_errors_log_file_name = 'sdc-resource-metrics-messages-and-errors.log'
messages_and_errors_log = open(output_dir + '/' + messages_and_errors_log_file_name, mode = 'a')
# Log the target engine labels
log_message('Starting metrics collection for SDCs with labels ' + str(target_engine_labels))
# Create the rotating metrics log file
metrics_logger = create_rotating_log()
# Connect to Control Hub
sch = None
try:
sch = ControlHub(credential_id=cred_id, token=cred_token, use_websocket_tunneling=websockets_enabled)
except Exception as e:
log_message('Error: Could not connect to Control Hub.')
log_message('Check your API credentials and the Control Hub URL')
log_message('Exception: ' + str(e))
sys.exit(-1)
# A method to filter the SDCs to get metrics from, from the full set of
# SDCs registered with Control Hub. You can implement your own filtering logic here.
# For this example, I'll include only SDCs whose engine labels are contained in
# the given label list
# Returns a list of SDCs to get metrics for
def get_sdcs_to_get_metrics_for(sch):
log_message('Refreshing the list of Data Collectors to get metrics for')
sdcs_to_include = []
sdcs_to_exclude = []
for sdc in sch.data_collectors:
try:
include_sdc = False
# Get SDC's labels (labels and reported labels)
engine_labels = sdc.labels
engine_labels.extend(sdc.reported_labels)
for label in engine_labels:
if label in target_engine_labels:
include_sdc = True
break
if include_sdc:
sdcs_to_include.append(sdc)
else:
sdcs_to_exclude.append(sdc)
except:
log_message('Error connecting to SDC with URL: ' + sdc.engine_url)
sdcs_to_exclude.append(sdc)
log_message('SDCs that will be included in metrics collection:')
for sdc in sdcs_to_include:
log_message(sdc.engine_url)
log_message('SDCs that will NOT be included in metrics collection:')
for sdc in sdcs_to_exclude:
log_message(sdc.engine_url)
return sdcs_to_include
# The initial list of SDCs to get metrics for
sdcs = get_sdcs_to_get_metrics_for(sch)
# The time when we got the list of SDCs
sdc_refresh_time = time.time()
# The time when we got metrics = time.time
metrics_capture_time = 0
try:
# Loop forever
while (True):
# Refresh the SDC list from Control Hub based if its been longer than sdc_list_refresh_seconds
if time.time() > sdc_refresh_time + sdc_list_refresh_seconds:
sdcs = get_sdcs_to_get_metrics_for(sch)
sdc_refresh_time = time.time()
# Get the time of metrics collection loop
metrics_capture_time = time.time()
# For every SDC this script is getting metrics for
for sdc in sdcs:
sdc_instance = None
# Try to connect directly to the SDC
try:
sdc_instance = sdc._instance
except:
log_message('Error connecting to SDC with URL: ' + sdc.engine_url)
# If we connected to the SDC..
if sdc_instance is not None:
try:
# For each pipeline on the SDC
for pipeline in sdc_instance.pipelines:
# Get pipeline state
pipeline_status = sdc_instance.get_pipeline_status(pipeline)
pipeline_state = json.loads(pipeline_status.response.text)['status']
# Only get metrics for pipelines that should be running
if pipeline_state not in ['EDITED', 'FINISHED', 'STOPPED']:
metrics = {}
metrics['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
metrics['sdc_url'] = sdc.engine_url
metrics['pipeline_name'] = pipeline.title
metrics['pipeline_state'] = pipeline_state
# Get pipeline metrics
pipeline_metrics = sdc_instance.get_pipeline_metrics(pipeline)
metrics['input_record_count'] = pipeline_metrics.pipeline.input_record_count
metrics['output_record_count'] = pipeline_metrics.pipeline.output_record_count
metrics['error_record_count'] = pipeline_metrics.pipeline._data['counters']['pipeline.batchErrorRecords.counter']['count']
# Get CPU Metrics
jmx_metrics = sdc_instance.get_jmx_metrics()
cpu_metrics = jmx_metrics.get('java.lang:type=OperatingSystem')
metrics['cpu_load_percentage'] = int(cpu_metrics['SystemCpuLoad'] * 100)
# Get Heap Metrics
# heap_metrics = jmx_metrics.get('java.lang:type=Memory')['HeapMemoryUsage']
metrics['heap_memory_used'] = pipeline_metrics.pipeline._data['gauges']['jvm.memory.heap.used']['value']
metrics['heap_memory_max'] = pipeline_metrics.pipeline._data['gauges']['jvm.memory.heap.max']['value']
metrics['heap_memory_percentage'] = int(( metrics['heap_memory_used'] / metrics['heap_memory_max']) * 100)
# Convert the metric data to JSON
data = json.dumps(metrics)
# Write metrics to the rolling logfile
metrics_logger.info(data)
except Exception as e:
log_message('Error getting metrics for SDC URL: ' + sdc.engine_url + str(e))
# Sleep between metrics capture times
end_metrics_capture_time = time.time()
if end_metrics_capture_time < metrics_capture_time + metrics_capture_interval_seconds:
sleep_seconds = int(metrics_capture_time + metrics_capture_interval_seconds - end_metrics_capture_time)
log_message('Metrics capture sleeping for ' + str(sleep_seconds) + ' seconds')
time.sleep(sleep_seconds)
log_message('Metrics capture resuming')
except Exception as e:
log_message('Error getting metrics ' + str(e))
finally:
messages_and_errors_log.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment