Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active July 15, 2023 20:27
Show Gist options
  • Save onefoursix/fc962724d7f5042e8ed38367818d0331 to your computer and use it in GitHub Desktop.
Save onefoursix/fc962724d7f5042e8ed38367818d0331 to your computer and use it in GitHub Desktop.
A StreamSets SDK for Python script for Control Hub 3.x to capture CPU and memory metrics as well as running pipelines from Data Collector
#!/usr/bin/env python
'''
This script writes a rolling log file that contains CPU usage and JVM heap memory metrics
for a given Data Collector registered with Control Hub 3.x, with a user definable refresh
interval, along with the number and names of Jobs running on the Data Collector at the time
of metrics collection.
Prerequisites:
- Python 3.6 - 3.8
- StreamSets SDK for Python v3.x
See: https://docs.streamsets.com/sdk/latest/installation.html
- Control Hub login and password for a user with Organization Administrator role
- Set the following variable in the script:
output_dir - a pre-existing directory to write the Data Collector metric to
metrics_capture_interval_seconds - how frequently to capture metrics
- To avoid including credentials in the script, export these environment variables
prior to running the script:
export SCH_USER=<the Control Hub user>
export SCH_PASSWORD=<the Control Hub password>
- Run the script with an argument that is the URL of the Data Collector to monitor, like this:
$ python3 get-sdc-memory-and-cpu-metrics.py <SDC_URL>
for example:
$ python3 get-sdc-memory-and-cpu-metrics.py https://sequoia.onefoursix.com:18631
- To run the script as a background process you can set the variable print_metrics_to_console in the
script to False and then launch the script using a command like this:
$ nohup python3 get-sdc-memory-and-cpu-metrics.py \ https://sequoia.onefoursix.com:18631 > /dev/null 2>&1 &
- Sample output looks like this:
$ python3 get-sdc-memory-and-cpu-metrics.py https://sequoia.onefoursix.com:18631
Getting resource metrics for Data Collector at https://sequoia.onefoursix.com:18631
{"sdc_url": "https://sequoia.onefoursix.com:18631", "metric_timestamp": "2023-07-14 21:21:57",
"heap_memory_used": 335584696, "heap_memory_max": 1073741824, "heap_memory_percentage": 31,
"cpu_load_percentage": 9, "number_of_running_jobs": 3, "running_jobs": ["Job 2", "Job 1", "Job 3"]}
{"sdc_url": "https://sequoia.onefoursix.com:18631", "metric_timestamp": "2023-07-14 21:22:13",
"heap_memory_used": 450928056, "heap_memory_max": 1073741824, "heap_memory_percentage": 41,
"cpu_load_percentage": 7, "number_of_running_jobs": 3, "running_jobs": ["Job 2", "Job 1", "Job 3"]}
...
'''
# Imports
import os, sys, json, time, logging
from datetime import datetime
from streamsets.sdk import ControlHub, DataCollector
from logging.handlers import RotatingFileHandler
# Control Hub URL
sch_url = 'https://cloud.streamsets.com'
# Control Hub credentials read from the environment
sch_user = os.getenv('SCH_USER')
sch_password = os.getenv('SCH_PASSWORD')
# The directory and name for the rolling log file
output_dir = '/path/to/your-output-dir'
log_file_name = 'sdc-resource-metrics.log'
# How often to capture SDC metrics
metrics_capture_interval_seconds = 15
# Whether or not to print metrics to the console
print_metrics_to_console = True
# Rolling Logfile config
log_file = output_dir + '/' + log_file_name
max_bytes_pre_log_file = 100 * 1024 * 1024 # 100MB
number_of_rolling_logfiles = 5
# Method to create a rolling log file
def create_rotating_log():
logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(log_file, maxBytes = max_bytes_pre_log_file, backupCount = number_of_rolling_logfiles)
logger.addHandler(handler)
return logger
# Method that returns a list of currently running Jobs on the Data Collector
def get_running_jobs():
sch_sdc = sch.data_collectors.get(url=sdc_url)
jobs_list = []
for job in sch_sdc.jobs:
jobs_list.append(job.job_name)
return jobs_list
# Validate command line args
if len(sys.argv) != 2:
print('Incorrect number of arguments')
print('Usage: python3 get-sdc-memory-and-cpu-metrics.py <SDC_URL>')
sys.exit(-1)
# Confirm the logging directory exists]
if not os.path.isdir(output_dir):
print('Error: the directory \'' + output_dir + '\' does not exist')
print('Please create that directory in advance')
sys.exit(-1)
# Get the SDC URL from the command line
sdc_url = sys.argv[1]
# Create the log file
logger = create_rotating_log()
# Connect to Control Hub
sch = None
try:
sch = ControlHub(sch_url, username=sch_user, password=sch_password)
except Exception as e:
print('Error: Could not connect to Control Hub.')
print('Check your credentials and the Control Hub URL,')
print ('and you must have Organization Administrator role')
print('Exception: '+ str(e))
sys.exit(-1)
# Connect to the Data Collector
sdc = None
try:
sdc = DataCollector(sdc_url, control_hub = sch)
except Exception as e:
print('Error: Could not connect to Data Collector')
print('Error; ' + str(e))
sys.exit(-1)
print('Getting resource metrics for Data Collector at ' + sdc_url)
# Get Data Collector metrics in an endless loop until this script is stopped
while(True):
try:
jmx_metrics = sdc.get_jmx_metrics()
heap_metrics = jmx_metrics.get('java.lang:type=Memory')['HeapMemoryUsage']
cpu_metrics = jmx_metrics.get('java.lang:type=OperatingSystem')
metrics = {}
metrics['sdc_url'] = sdc_url
metrics['metric_timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
metrics['heap_memory_used'] = heap_metrics['used']
metrics['heap_memory_max'] = heap_metrics['max']
metrics['heap_memory_percentage'] = int((heap_metrics['used'] / heap_metrics['max']) * 100)
metrics['cpu_load_percentage'] = int(cpu_metrics['CpuLoad'] * 100)
running_jobs = get_running_jobs()
metrics['number_of_running_jobs'] = len(running_jobs)
if metrics['number_of_running_jobs'] > 0:
metrics['running_jobs'] = running_jobs
# Convert the metrics to JSON
data = json.dumps(metrics)
# Print messages to the console if needed
if print_metrics_to_console:
print(data)
# Write metrics to the rolling logfile
logger.info(data)
except Exception as e:
print('Exception occurred while reading SDC metrics: ' + str(e))
# Sleep
time.sleep(metrics_capture_interval_seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment