Skip to content

Instantly share code, notes, and snippets.

@AlmightyOatmeal
Last active October 20, 2020 19:15
Show Gist options
  • Save AlmightyOatmeal/c26eb3139d6a483fa71ec23a7cb8a0e3 to your computer and use it in GitHub Desktop.
Save AlmightyOatmeal/c26eb3139d6a483fa71ec23a7cb8a0e3 to your computer and use it in GitHub Desktop.
Wrapper for SignalFx's Python client library for executing SignalFlow programs and returning a dictionary object that can be serialized to JSON. This also features exception handling to retry program text execution which is especially helpful for long-running SignalFlow programs that should be restarted in the event of a failure.
"""
Disclaimer
==========
This has been designed as a solution for a specific task, or set of tasks, by the code author which is outside
the locus of SignalFx supportability. This code is provided as-is and if any support is needed then it should
be provided by the original code author on a best-effort only basis; it cannot be supported through break/fix
support channels.
Synopsis
========
#
# SignalFlow execute - large result body
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')",
]
pt = '\n'.join(pt_parts)
now_utc = datetime.datetime.utcnow()
now_utc = now_utc.replace(second=0)
pt_start = now_utc - datetime.timedelta(days=1)
pt_start_ms = convert_dt_to_milliseconds(pt_start)
pt_stop = now_utc - datetime.timedelta(minutes=10)
pt_stop_ms = convert_dt_to_milliseconds(pt_stop)
results = get_signalflow_results(
program=pt,
start=pt_start_ms,
stop=pt_stop_ms,
resolution=1000,
api_token=api_token,
immediate=True
)
#
# SignalFlow execute - generator
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')",
]
pt = '\n'.join(pt_parts)
now_utc = datetime.datetime.utcnow()
now_utc = now_utc.replace(second=0)
pt_start = now_utc - datetime.timedelta(days=1)
pt_start_ms = convert_dt_to_milliseconds(pt_start)
pt_stop = now_utc - datetime.timedelta(minutes=10)
pt_stop_ms = convert_dt_to_milliseconds(pt_stop)
results = get_signalflow_results(
program=pt,
start=pt_start_ms,
stop=pt_stop_ms,
resolution=1000,
api_token=api_token,
generator=True,
immediate=True
)
for result in results:
do_something(result)
#
# SignalFlow execute - callback
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x).publish('Process Count')",
]
pt = '\n'.join(pt_parts)
now_utc = datetime.datetime.utcnow()
now_utc = now_utc.replace(second=0)
pt_start = now_utc - datetime.timedelta(days=1)
pt_start_ms = convert_dt_to_milliseconds(pt_start)
pt_stop = now_utc - datetime.timedelta(minutes=10)
pt_stop_ms = convert_dt_to_milliseconds(pt_stop)
results = get_signalflow_results(
program=pt,
start=pt_start_ms,
stop=pt_stop_ms,
resolution=1000,
api_token=api_token,
generator=True,
callback=do_something_with_data,
immediate=True
)
#
# SignalFlow preflight
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)",
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')",
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')"
]
pt = '\n'.join(pt_parts)
results = get_signalflow_results(
program=pt,
start=1522950265000,
stop=1522950736000,
max_delay=60000,
api_token=api_token,
preflight=True
)
#
# Using SignalFlow execute to get more information than preflight
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)",
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')",
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')"
]
pt = '\n'.join(pt_parts)
results = get_signalflow_results(
program=pt,
start=1522950265000,
stop=1522950736000,
max_delay=60000,
api_token=api_token
)
#
# Using SignalFlow execute to get more information than preflight but get *ONLY* events
#
pt_parts = [
"HOSTS = filter('host', 'irony')",
"d = data('ps_count.processes', filter=filter('plugin', 'processes') and HOSTS).map(lambda x: 0 if x is None else x)",
"detect(when(d < 1, lasting='30s')).publish('Service Not Running')",
"detect(when(d > 500, lasting='30s')).publish('Process has too many children')"
]
pt = '\n'.join(pt_parts)
results = get_signalflow_results(
program=pt,
start=1522950265000,
stop=1522950736000,
max_delay=60000,
api_token=api_token,
events_only=True
)
Abstract
========
Demonstrating executing a SignalFlow program in Python and converting the results into a Python dictionary/list
structure that can be iterated, processed, otherwise interacted-with, or serialized to JSON.
Features:
* Supports passing additional keyword arguments directly into the SignalFlow execute/preflight methods (as long
as the keyword arguments are supported by their respective method).
* Retry program execution in the event of a failure which can be advantageous when using a dynamic start time
and no end time for continuous streaming.
* Simple parameter to switch between generating a large result object (useful for short time ranges) or a
generator so results can be iterated over as they become available.
* Callback functionality to call an outside function and pass in the resulting dictionary object.
License
=======
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the Software without restriction, including without
limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
Software, and to permit persons to whom the Software is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
:author: Jamie Ivanov <jamie <-at-> signalfx.com>
:organization: SignalFx <https://www.signalfx.com>
:contact: <jamie <-at-> signalfx.com>
:version: 1.0.0
:copyright: Copyright (c): Jamie Ivanov and contributors
"""
import datetime
from copy import deepcopy
import signalfx
import time
# OPTIONAL HELPER
def convert_dt_to_milliseconds(dt_obj):
"""Convert datetime object to a Unix epoch timestamp in milliseconds.
:param dt_obj: Datetime object to be converted.
:type dt_obj: instance
:return: Milliseconds since the Unix epoch.
:rtype: int or long
"""
return int((dt_obj - datetime.datetime(1970, 1, 1)).total_seconds() * 1000)
def get_signalflow_results(program, start, stop=None, realm=None, resolution=None, max_execution_retries=50,
api_token=None, preflight=False, generator=False, data_callback=None, event_callback=None,
events_only=False, **kwargs):
"""Use the SignalFlow Python client library to execute/preflight SignalFlow programs.
Datapoint:
.. example: json
{
"1509797400000": {
"AAAAAA-RANQ": {
"@value": 50.0,
"@job_id": "AAbbCCddEE"
"_properties": {
"computationId": "ZYXWVUTSRQ",
"orgId": "ABCDEFGHIJK",
"sf_isPreQuantized": true,
"sf_key": [
"sf_originatingMetric",
"orgId",
"sf_metric",
"computationId"
],
"sf_metric": "_SF_COMP_DN3vE1QAcAA_02-PUBLISH_METRIC",
"sf_organizationID": "ABCDEFGHIJK",
"sf_originatingMetric": "sf.org.numActiveTimeSeries",
"sf_resolutionMs": 600000,
"sf_singletonFixedDimensions": [
"sf_metric"
],
"sf_type": "MetricTimeSeries"
},
"_tsid": "AAAAAA-RANQ"
}
}
}
Event:
..example: json
{
"1522950690000": {
"AAAAAHMugz0": {
"@event": {
"@job_id": "DaC_KG2AgAA",
"_metadata": {},
"_properties": {
"incidentId": "DZ8uoHhAgA8",
"inputs": {
"_S1": {
"key": {
"host": "irony",
"plugin": "processes",
"plugin_instance": "transmission-daemon",
"sf_metric": "ps_count.processes"
},
"value": 1.0
}
},
"is": "ok",
"sf_resolutionMs": 10000,
"sf_schema": 3,
"sf_severity": "unknown",
"was": "anomalous"
},
"_timestamp_ms": 1522950690000,
"_tsid": "AAAAAHMugz0"
}
}
}
}
Preflight:
..example: json
{
"1522950330000": {
"AAAAAKoEhLo": {
"@job_id": "DaChf_EAYAA",
"@value": 1,
"_properties": {
"computationId": "DaChf_EAYAA",
"is": "anomalous",
"sf_isPreQuantized": true,
"sf_key": [
"sf_severity",
"is",
"sf_metric",
"computationId"
],
"sf_metric": "_SF_COMP_DaChf_EAYAA_09-PUBLISH_METRIC",
"sf_organizationID": "ABCDEFGHIJK",
"sf_resolutionMs": 10000,
"sf_severity": "unknown",
"sf_singletonFixedDimensions": [],
"sf_streamLabel": "Service Not Running",
"sf_type": "MetricTimeSeries"
},
"_tsid": "AAAAAKoEhLo"
}
}
}
Expected object structure including optional "@event"; events without datapoints will have the same structure
but only include the "@event" object data and not "@value" and etc for the datapoint timeseries.
.. example: json
{
"<TIMESTAMP IN MILLISECONDS>": {
"<TIMESERIES ID>": {
"@value": "<DATAPOINT VALUE>",
"@job_id": "<SignalFlow job ID>",
"@event": {
"@job_id": "<SignalFlow job ID>",
"_metadata": "<EVENT METADATA>",
"_properties": "<EVENT PROPERTIES INCLUDING INPUT TIMESERIES WITH DIMENSIONS, STATUS, ETC>",
"_timestamp_ms": "<TIMESTAMP IN MILLISECONDS>",
"_tsid": "<TIMESERIES ID (redundant but it is part of the metadata so it was kept)>"
},
"_properties": "<DATAPOINT PROPERTIES, DIMENSIONS, ETC>",
"_tsid": "<TIMESERIES ID (redundant but it is part of the metadata so it was kept)>",
}
}
}
:param program: Program text to be executed.
:type program: str or unicode
:param start: SignalFlow start time in milliseconds.
:param start: int or long
:param stop: (optional) SignalFlow start time in milliseconds.
(default: None)
:type stop: int or long
:param realm: (optional) Which SignalFx realm to execute against. If the wrong realm is used then execution
will fail with an unauthorized response. As of October 2019, the available realms are: us0, us1,
us2, eu0, and ap0. (default: 'us0')
:type realm: str or unicode
:param resolution: (optional) Data resolution. (default: None)
:type resolution: int, long, str, or unicode
:param max_execution_retries: (optional) Number of times to retry incase of an exception with a 1 minute grace
period before the next attempt.
(default: 50)
:type max_execution_retries: int
:param api_token: SignalFx API token to be used when executing the SignalFlow program.
:type api_token: str or unicode
:param preflight: (optional) When true, execute the SignalFlow program via the preflight API otherwise use the
execute API.
(default: False)
:type preflight: bool
:param generator: (optional) When true, returns a generator instance to yield individual result objects rather than
one large results object. This is especially useful for low-memory environments, where each
result needs to be acted upon, or when continuously streaming program text. When false, generates
a large result object.
(default: False)
:type generator: bool
:param data_callback: (optional) If specified, this function will be called for every DataMessage result and
said result will be passed into it.
(default: None)
:type data_callback: function
:param event_callback: (optional) If specified, this function will be called for every EventMessage result and
said result will be passed into it.
(default: None)
:type event_callback: function
:param events_only: (optional) Return only EventMessage results when using the SignalFlow execute API in the same
fashion as the preflight API. This will not apply to the preflight API.
(default: False)
:type events_only: bool
:param kwargs: Miscellaneous keyword arguments to be used in :meth:`signalfx.SignalFx.signalflow().execute()`.
:type kwargs: keyword arguments
:return: Dictionary of results or empty dictionary if execution retries exceeded.
:rtype: dict
"""
# TODO: Cleanup kwargs that are passed into the SignalFlow executioner for the sake of good data hygiene.
if api_token is None:
raise ValueError('Missing SignalFx API token! Please consult the documentation!')
# Setting the default to "us0" here which can be more program-friendly.
if not realm:
realm = 'us0'
# Switch stream URL in order to accommodate the appropriate organization realm.
endpoint_url = kwargs.pop('endpoint_url', None)
if not endpoint_url:
endpoint_url = 'wss://stream.{}.signalfx.com'.format(realm.lower())
if not endpoint_url.startswith('wss://'):
endpoint_url = 'wss://{}'.format(endpoint_url)
results = {}
argz = {
'program': program,
'start': start,
'stop': stop,
'resolution': resolution
}
argz.update(**kwargs)
logger.debug('endpoint_url={}'.format(endpoint_url))
# Executioner selection
if preflight:
executioner = signalfx.SignalFx().signalflow(endpoint=endpoint_url, token=api_token).preflight(**argz)
else:
executioner = signalfx.SignalFx().signalflow(endpoint=endpoint_url, token=api_token).execute(**argz)
def execute_signalflow():
meta_objs = []
metadata = {}
try_counter = 0
while True:
job_id = None
if metadata:
metadata.clear()
if try_counter >= max_execution_retries:
return
try:
logger.debug('Waiting for events from executioner stream.')
for msg in executioner.stream():
# logger.info('Received "{}" object \t {}'.format(type(msg), msg.__dict__))
if isinstance(msg, signalfx.signalflow.messages.JobStartMessage):
job_id = msg.__dict__.get('_handle')
logger.info('SignalFlow job: {}'.format(job_id))
continue
# [ ] TODO: Add option to remove metadata for timeseries that leave the membership.
if isinstance(msg, signalfx.signalflow.messages.MetadataMessage):
md = msg.__dict__
# logger.info('Received "{}" object \t {}'.format(type(msg), md))
metadata[md.get('_tsid')] = {k: md[k] for k in md.keys() if not k.startswith('__')}
meta_objs.append(md)
continue
if isinstance(msg, signalfx.signalflow.messages.DataMessage):
if not events_only and not preflight:
result = {}
for tsid, val in msg.data.items():
result[tsid] = deepcopy(metadata.get(tsid, {}))
result[tsid]['@value'] = val
result[tsid]['@job_id'] = job_id
# logger.info('GOT {} RESULTS!'.format(len(result)))
# logger.info('result:\n{}'.format(pretty_json(result)))
# logger.info('result:\n{}'.format(pretty_json(msg.data)))
# logger.info('result:\n{}'.format(msg.data))
logger.info('DataMessage: {} datapoints.'.format(len(msg.data)))
dispatchable_result = {msg.logical_timestamp_ms: result}
if data_callback is not None:
data_callback(dispatchable_result)
yield dispatchable_result
continue
if isinstance(msg, signalfx.signalflow.messages.EventMessage):
result = {
msg._tsid: {
'@event': deepcopy(msg.__dict__)
}
}
result[msg._tsid]['@event']['@job_id'] = job_id
dispatchable_result = {msg._timestamp_ms: result}
logger.info('EVENT FOUND:\n{}'.format(pretty_json(result)))
if event_callback is not None:
event_callback(dispatchable_result)
yield dispatchable_result
continue
if isinstance(msg, signalfx.signalflow.messages.JobProgressMessage):
continue
# *OPTIONAL*
# This is used for exploring objects not yet handled above.
logger.info('Received "{}" object.'.format(type(msg)))
logger.info(dir(msg))
logger.info(msg.__dict__)
logger.info(pretty_json(msg.__dict__))
# *OPTIONAL*
# Dump metadata for your viewing pleasure!
with open('signalflow_metadata.json', 'w') as f:
f.write(pretty_json(metadata))
with open('signalflow_metadata_raw-ish_objects.json', 'w') as f:
f.write(pretty_json(meta_objs))
return
except Exception as err:
# NOTE:
# If Python logging is not used, you can turn `logger.*()` into `print()` or handle setup additional
# exception handling here.
try:
logger.error('HANDLING EXCEPTION; RETRY IN 60 SECONDS')
logger.exception(err)
except Exception as log_err:
print('HANDLING EXCEPTION; RETRY IN 60 SECONDS')
print(str(log_err))
try_counter += 1
time.sleep(60)
if generator:
return execute_signalflow()
for item in execute_signalflow():
results.update(item)
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment