Created
January 14, 2022 14:46
-
-
Save danhanks/9c59734f380ac56a8c1bdb7bec54bdb4 to your computer and use it in GitHub Desktop.
Simple Prometheus exporter for DSC Data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
© Copyright 2022 Adobe. All rights reserved. | |
This is a Prometheus exporter which watches for JSON files produced by dsc. | |
As files are generated, they are parsed and the metrics/data therein are | |
added to the Prometheus metrics in the running exporter. | |
""" | |
#!/usr/bin/env python | |
from prometheus_client import start_http_server, Counter, Summary | |
from syslog import syslog, openlog, closelog, LOG_INFO | |
from pprint import pprint | |
import pyinotify | |
import json | |
import daemon | |
import sys | |
import re | |
import signal | |
import base64 | |
class DSCConf(): | |
def __init__(self, dsc_conf_file): | |
self.dsc_conf_file = dsc_conf_file | |
self._parse_dsc_conf() | |
def _parse_dsc_conf(self): | |
syslog("Parsing datasets and dsc run dir from %s" % self.dsc_conf_file) | |
dataset_re = re.compile('^dataset\s+(?P<dataset_name>[^\s]+)\s+(?P<indexer_type>[^\s]+)\s+(?P<d1_label>[^\s:]+):(?P<d1_indexer>[^\s]+)\s+(?P<d2_label>[^\s:]+):(?P<d2_indexer>[^\s]+)\s+(?P<params>.+);$') | |
rundir_re = re.compile('^run_dir\s+"(?P<run_dir>[^\"\s]+)";$') | |
datasets = {} | |
with open(self.dsc_conf_file, 'r') as conf: | |
for line in conf: | |
m = dataset_re.search(line) | |
if m: | |
matches = m.groupdict() | |
datasets[matches['dataset_name']] = matches | |
m = rundir_re.search(line) | |
if m: | |
self.dsc_run_dir = m.groupdict()['run_dir'] | |
self.dsc_datasets = datasets | |
def reload_dsc_conf(self): | |
self._parse_dsc_conf() | |
def dataset(self, dataset_name): | |
return self.dsc_datasets[dataset_name] | |
# self.dsc_datasets = | |
# 'rcode' : { | |
# 'indexer_type' : 'dns', | |
# 'd1_label' : 'All', | |
# 'd1_indexer' : 'null', | |
# 'd2_label' : 'Rcode', | |
# 'd2_indexer' : 'rcode', | |
# 'params' : 'replies-only | |
# }, | |
# 'opcode' ... | |
DSC_LOG_NAME = 'dsc_exporter' | |
DSC_CONF = '/etc/dsc/dsc.conf' | |
CONF = DSCConf(DSC_CONF) | |
EXPORTER_PORT = 8000 | |
METRICS = {} # Metric object cache | |
PROCESS_FILE = Summary('file_processing_seconds', 'Time spent processing file') | |
PROCESS_DATASET = Summary('dataset_processing_seconds', 'Time spent processing dataset') | |
DNS_CODES = { | |
'qtype' : { | |
'1' : 'A', | |
'2' : 'NS', | |
'5' : 'CNAME', | |
'6' : 'SOA', | |
'12' : 'PTR', | |
'13' : 'HINFO', | |
'15' : 'MX', | |
'16' : 'TXT', | |
'24' : 'SIG', | |
'25' : 'KEY', | |
'28' : 'AAAA', | |
'29' : 'LOC', | |
'30' : 'NXT', | |
'33' : 'SRV', | |
'38' : 'A6', | |
'41' : 'OPT', | |
'43' : 'DS', | |
'46' : 'RRSIG', | |
'47' : 'NSEC', | |
'48' : 'DNSKEY', | |
'50' : 'NSEC3', | |
'99' : 'SPF', | |
'250' : 'TSIG', | |
'251' : 'IXFR', | |
'252' : 'AXFR', | |
'255' : 'ANY', | |
}, | |
'rcode' : { | |
'0' : 'NOERROR', | |
'1' : 'FORMERR', | |
'2' : 'SERVFAIL', | |
'3' : 'NXDOMAIN', | |
'4' : 'NOTIMPL', | |
'5' : 'REFUSED', | |
'6' : 'YXDomain', | |
'7' : 'YXRRSet', | |
'8' : 'NXRRSET', | |
'9' : 'NotAuth', | |
'10' : 'NotZone', | |
}, | |
'opcode' : { | |
'0' : 'Query', | |
'1' : 'Iquery', | |
'2' : 'Status', | |
'4' : 'Notify', | |
'5' : 'Update_', | |
} | |
} | |
# Summary / Histogram indexers - Leaving this here for future reference | |
# when we add better support / treatment for such | |
summary_indexers = [ | |
# Divided up into 1024 port chunks (e.g., 0-1023, 1023-2047, etc.) | |
# Indexer name = dns_sport_range, dataset name = client_port_range | |
'dns_sport_range', | |
# Buckets are based on the response_time_mode parameter. | |
# 'bucket' = buckets are sized according to the response_time_bucket_size | |
# 'log10' Buckets are sized on a log10 | |
# 'log2' Buckets are sized on a log2 | |
'response_time', | |
# The EDNS buffer size in chunks of 512 size (e.g., 0-511, 512-1023, etc.) | |
'edns_bufsize', | |
] | |
# Help text derived from https://github.com/DNS-OARC/dsc/blob/develop/src/dsc.conf.5.in | |
# License for which can be found at https://github.com/DNS-OARC/dsc/blob/develop/LICENSE | |
HELP_TEXT = { | |
'ip_direction' : 'One of three values: sent, recv or else. Direction is determined based on the setting for local_address in the configuration file.', | |
'ip_proto' : 'The IP protocol type, e.g.: tcp, udp or icmp.', | |
'ip_version' : 'The IP version number, e.g.: 4 or 6', | |
'certain_qnames' : 'This indexer isolates the two most popular query names seen by DNS root servers: localhost and [a--m].root-servers.net.', | |
'client_subnet' : "Subnet of the client's IP address.", | |
'client' : 'The IP (v4 and v6) address of the DNS client.', | |
'server' : 'The IP (v4 and v6) address of the DNS server.', | |
'country' : 'The country code of the client IP address.', | |
'asn' : 'The AS (autonomous system) number of the IP (v4 and v6).', | |
'do_bit' :'0 or 1. It indicates whether or not the "DO" bit is set in a DNS query. According to RFC 2335: Setting the DO bit to one in a query indicates to the server that the resolver is able to accept DNSSEC security RRs.', | |
'edns_version' : 'The EDNS version number, if any, in a DNS query. EDNS Version 0 is documented in RFC 2671.', | |
'edns_bufsiz' : 'The EDNS buffer size per 512 chunks (0-511, 512-1023 etc).', | |
'idn_qname' : '1 when the first QNAME in the DNS message question section is an internationalized domain name (i.e., containing non-ASCII characters). Such QNAMEs begin with the string "xn--". This convention is documented in RFC 3490.', | |
'msglen' : 'The overall length (size) of the DNS message.', | |
'null' : 'No Help', | |
'opcode' : 'The DNS message opcode: QUERY, IQUERY, STATUS, NOTIFY, or UPDATE', | |
'qclass' : 'The DNS message query class (QCLASS). IN, CHAOS, HS, NONE, or ANY', | |
'qname' : 'The full QNAME string from the first (and usually only) QNAME in the question section of a DNS message.', | |
'qnamelen' : 'The length of the first (and usually only) QNAME in a DNS message question section.', | |
'qtype' :'The query type (QTYPE) for the first QNAME in the DNS message question section. Well-known query types include: A, AAAA, A6, CNAME, PTR, MX, NS, SOA, and ANY.', | |
'query_classification' : 'A stateless classification of bogus queries (See dsc.conf(5) for more details).', | |
'rcode' : 'The RCODE value in a DNS response. The most common response codes are NO ERROR and NXDOMAIN', | |
'rd_bit' : '1 if the RD (recursion desired) bit is set in the query. Usually only stub resolvers set the RD bit. Usually authoritative servers do not offer recursion to their clients.', | |
'tc_bit' : "1 if the TC (truncated) bit is set (in a response). An authoritative server sets the TC bit when the entire response won't fit into a UDP message.", | |
'tld' : "The TLD of the first QNAME in a DNS message's question section.", | |
'second_ld' : "The Second LD of the first QNAME in a DNS message's question section.", | |
'third_ld' : "The Third LD of the first QNAME in a DNS message's question section.", | |
'transport' : 'Indicates whether the DNS message is carried via UDP or TCP.', | |
'dns_ip_version' : 'The IP version number that carried the DNS message.', | |
'dns_source_port' : 'The source port of the DNS message.', | |
'dns_sport_range' : 'The source port of the DNS message per 1024 chunks (0-1023, 1024-2047 etc).', | |
'qr_aa_bits' : 'The "qr_aa_bits" dataset may be useful when dsc is monitoring an authoritative name server. This dataset counts the number of DNS messages received with each combination of QR,AA bits. Normally the authoritative name server should *receive* only *queries*. If the name server is the target of a DNS reflection attack, it will probably receive DNS *responses* which have the QR bit set.', | |
'response_time' : 'Response time of a query', | |
} | |
def translate_val(indexer, stat): | |
if isinstance(stat, dict): # From 1D metrics | |
if 'base64' in stat: | |
return base64.b64decode(stat['val']) | |
if indexer in DNS_CODES: | |
return DNS_CODES[indexer].get(stat['val'], 'Other') | |
return stat['val'] | |
else: | |
if indexer in DNS_CODES: # From 1D metrics | |
return DNS_CODES[indexer].get(stat, 'Other') | |
else: | |
return stat | |
def handle_pcap_stats(dataset): | |
# dsc_pcap_stats_captured_packets | |
# dsc_pcap_stats_filter_received_packets | |
for item in dataset['data']: | |
for stat in item['pcap_stat']: | |
if stat['val'] == 'pkts_captured': | |
metric_name = 'dsc_pcap_stats_captured_packets' | |
if stat['val'] == 'filter_received': | |
metric_name = 'dsc_pcap_stats_filter_received_packets' | |
if metric_name not in METRICS: | |
METRICS[metric_name] = Counter(metric_name, | |
'libpcap statistics for DNS traffic', | |
['ifname']) | |
METRICS[metric_name].labels(item['ifname']).inc(stat['count']) | |
def make_help_text(dataset): | |
dataset_conf = CONF.dataset(dataset['name']) | |
d1_indexer = dataset_conf['d1_indexer'] | |
d2_indexer = dataset_conf['d2_indexer'] | |
if d1_indexer == 'null': | |
return HELP_TEXT.get(dataset['name'], 'No Help') | |
else: | |
return "2-dimensional DSC metric composing data from %s and %s indexers. %s: %s %s: %s" % (d1_indexer, d2_indexer, d1_indexer, HELP_TEXT.get(d1_indexer), d2_indexer, HELP_TEXT.get(d2_indexer, 'No Help')) | |
@PROCESS_DATASET.time() | |
def process_dataset(dataset): | |
name = dataset['name'] | |
# Handle pcap_stats specially | |
if name == 'pcap_stats': | |
return handle_pcap_stats(dataset) | |
dataset_conf = CONF.dataset(name) | |
d1_label = dataset['dimensions'][0] | |
d2_label = dataset['dimensions'][1] | |
d1_indexer = dataset_conf['d1_indexer'] | |
d2_indexer = dataset_conf['d2_indexer'] | |
for item in dataset['data']: # For each D1 value | |
for stat in item[d2_label]: # For each D2 stat associated with the D1 value | |
labels = {d2_label : translate_val(d2_indexer, stat)} | |
if d1_indexer != 'null': # A 2D dataset | |
labels[d1_label] = translate_val(d1_indexer, item[d1_label]) | |
if name not in METRICS: | |
METRICS[name] = Counter('dsc_%s_count' % (name), | |
make_help_text(dataset), | |
labels.keys()) | |
METRICS[name].labels(**labels).inc(stat['count']) | |
@PROCESS_FILE.time() | |
def process_file(file): | |
syslog("Processing file: %s" % file) | |
with open(file) as fin: | |
datasets = json.load(fin) | |
for dataset in datasets: | |
process_dataset(dataset) | |
def handle_inotify(event): | |
if('.json' in event.pathname): | |
process_file(event.pathname) | |
def handle_reload(signum, frame): | |
syslog("Caught SIGHUP, reloading dsc config") | |
CONF.reload_dsc_conf() | |
def main(): | |
openlog(DSC_LOG_NAME) | |
wm = pyinotify.WatchManager() | |
notifier = pyinotify.Notifier(wm, handle_inotify) | |
wm.add_watch(CONF.dsc_run_dir, pyinotify.IN_MOVED_TO) | |
syslog("Starting prometheus_client HTTP server") | |
start_http_server(EXPORTER_PORT) | |
syslog("Starting event loop") | |
notifier.loop() | |
if __name__ == '__main__': | |
# BUG: Need to be able to prevent other instances from running at the same time | |
openlog(DSC_LOG_NAME) | |
syslog("Starting DSC Prometheus Exporter") | |
closelog() | |
if(len(sys.argv) > 1 and sys.argv[1] == '-f'): | |
signal.signal(signal.SIGHUP, handle_reload) | |
main() | |
else: | |
with daemon.DaemonContext(signal_map = {signal.SIGHUP : handle_reload}): | |
main() | |
''' | |
Prometheus metric format: | |
# HELP <metric_name> <help_text> | |
# TYPE <metric_name> <gauge|counter> | |
<metric name>{label="value", label="value", ...} <value> [<timestamp>] | |
See dsc.conf(5) for details about the data format. | |
'dimensions' key is always a 2-element array of dimension labels | |
The two elements tell us the names of keys in each dict in the 'data' array. | |
'data' key is an array of dicts (and may be empty) | |
The value of the first key in each dict is a string ('Label1'). | |
The value of the second key in each dict is always an array of zero or more dicts | |
Each of these dicts will have a 'val' key, and a 'count' key. | |
There may be an optional 'base64' key set to true or false, indicating | |
that the value is base64 encoded | |
What we're aiming for: | |
2D Arrays | |
dsc_pcap_stats_captured_packets{ifname="eth0"} 120 | |
dsc_pcap_stats_filter_received_packets{ifname="eth0"} 120 | |
dsc_pcap_stats_filter_received_packets{ifname="lo"} 120 | |
dsc_direction_vs_ipproto_count{direction="sent", IPProto="udp"} 82 | |
dsc_direction_vs_ipproto_count{direction="recv", IPProto="udp"} 1000 | |
dsc_direction_vs_ipproto_count{direction="sent", IPProto="tcp"} 82 | |
1D Arrays | |
dsc_response_time_count{ResponseTime="100-1000"} 41 | |
dsc_response_time_count{ResponseTime="10-100"} 32 | |
dsc_response_time_count{ResponseTime="1-10"} 32 | |
dsc_third_ld_count{thirdLD="www.google.com"} 29 | |
dsc_third_ld_count{thirdLD="s3.amazonaws.com"} 29 | |
dsc_client_port_range_count_{PortRange="24576-25599"} 3 | |
dsc_rd_bit_count_{RD="set"} 41 | |
dsc_do_bit_count_{D0="clr"} 41 | |
dsc_edns_bufsiz_count{EDNSBufSize="None"} 41 | |
If we ever want to print out the stats data for textfile collector: | |
def process_file(self, file): | |
with open(file) as fin: | |
datasets = json.load(fin) | |
for dataset in datasets: | |
name = dataset['name'] | |
dimension1 = dataset['dimensions'][0] | |
dimension2 = dataset['dimensions'][1] | |
for datum in dataset['data']: | |
for stat in datum[dimension2]: | |
if dimension1 == 'All': | |
print 'dsc_{}{{{}="{}"}} {} {}'.format( | |
name, | |
dimension2, | |
translate_val(dimension2, stat), | |
stat['count'], | |
dataset['stop_time'] | |
) | |
else: | |
print 'dsc_{}{{{}="{}", {}="{}"}} {}'.format( | |
name, | |
dimension1, | |
datum[dimension1], | |
dimension2, | |
translate_val(dimension2, stat), | |
stat['count'], | |
dataset['stop_time'] | |
) | |
''' | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment