Skip to content

Instantly share code, notes, and snippets.

@danhanks
Created January 14, 2022 14:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save danhanks/9c59734f380ac56a8c1bdb7bec54bdb4 to your computer and use it in GitHub Desktop.
Save danhanks/9c59734f380ac56a8c1bdb7bec54bdb4 to your computer and use it in GitHub Desktop.
Simple Prometheus exporter for DSC Data
"""
© Copyright 2022 Adobe. All rights reserved.
This is a Prometheus exporter which watches for JSON files produced by dsc.
As files are generated, they are parsed and the metrics/data therein are
added to the Prometheus metrics in the running exporter.
"""
#!/usr/bin/env python
from prometheus_client import start_http_server, Counter, Summary
from syslog import syslog, openlog, closelog, LOG_INFO
from pprint import pprint
import pyinotify
import json
import daemon
import sys
import re
import signal
import base64
class DSCConf():
def __init__(self, dsc_conf_file):
self.dsc_conf_file = dsc_conf_file
self._parse_dsc_conf()
def _parse_dsc_conf(self):
syslog("Parsing datasets and dsc run dir from %s" % self.dsc_conf_file)
dataset_re = re.compile('^dataset\s+(?P<dataset_name>[^\s]+)\s+(?P<indexer_type>[^\s]+)\s+(?P<d1_label>[^\s:]+):(?P<d1_indexer>[^\s]+)\s+(?P<d2_label>[^\s:]+):(?P<d2_indexer>[^\s]+)\s+(?P<params>.+);$')
rundir_re = re.compile('^run_dir\s+"(?P<run_dir>[^\"\s]+)";$')
datasets = {}
with open(self.dsc_conf_file, 'r') as conf:
for line in conf:
m = dataset_re.search(line)
if m:
matches = m.groupdict()
datasets[matches['dataset_name']] = matches
m = rundir_re.search(line)
if m:
self.dsc_run_dir = m.groupdict()['run_dir']
self.dsc_datasets = datasets
def reload_dsc_conf(self):
self._parse_dsc_conf()
def dataset(self, dataset_name):
return self.dsc_datasets[dataset_name]
# self.dsc_datasets =
# 'rcode' : {
# 'indexer_type' : 'dns',
# 'd1_label' : 'All',
# 'd1_indexer' : 'null',
# 'd2_label' : 'Rcode',
# 'd2_indexer' : 'rcode',
# 'params' : 'replies-only
# },
# 'opcode' ...
DSC_LOG_NAME = 'dsc_exporter'
DSC_CONF = '/etc/dsc/dsc.conf'
CONF = DSCConf(DSC_CONF)
EXPORTER_PORT = 8000
METRICS = {} # Metric object cache
PROCESS_FILE = Summary('file_processing_seconds', 'Time spent processing file')
PROCESS_DATASET = Summary('dataset_processing_seconds', 'Time spent processing dataset')
DNS_CODES = {
'qtype' : {
'1' : 'A',
'2' : 'NS',
'5' : 'CNAME',
'6' : 'SOA',
'12' : 'PTR',
'13' : 'HINFO',
'15' : 'MX',
'16' : 'TXT',
'24' : 'SIG',
'25' : 'KEY',
'28' : 'AAAA',
'29' : 'LOC',
'30' : 'NXT',
'33' : 'SRV',
'38' : 'A6',
'41' : 'OPT',
'43' : 'DS',
'46' : 'RRSIG',
'47' : 'NSEC',
'48' : 'DNSKEY',
'50' : 'NSEC3',
'99' : 'SPF',
'250' : 'TSIG',
'251' : 'IXFR',
'252' : 'AXFR',
'255' : 'ANY',
},
'rcode' : {
'0' : 'NOERROR',
'1' : 'FORMERR',
'2' : 'SERVFAIL',
'3' : 'NXDOMAIN',
'4' : 'NOTIMPL',
'5' : 'REFUSED',
'6' : 'YXDomain',
'7' : 'YXRRSet',
'8' : 'NXRRSET',
'9' : 'NotAuth',
'10' : 'NotZone',
},
'opcode' : {
'0' : 'Query',
'1' : 'Iquery',
'2' : 'Status',
'4' : 'Notify',
'5' : 'Update_',
}
}
# Summary / Histogram indexers - Leaving this here for future reference
# when we add better support / treatment for such
summary_indexers = [
# Divided up into 1024 port chunks (e.g., 0-1023, 1023-2047, etc.)
# Indexer name = dns_sport_range, dataset name = client_port_range
'dns_sport_range',
# Buckets are based on the response_time_mode parameter.
# 'bucket' = buckets are sized according to the response_time_bucket_size
# 'log10' Buckets are sized on a log10
# 'log2' Buckets are sized on a log2
'response_time',
# The EDNS buffer size in chunks of 512 size (e.g., 0-511, 512-1023, etc.)
'edns_bufsize',
]
# Help text derived from https://github.com/DNS-OARC/dsc/blob/develop/src/dsc.conf.5.in
# License for which can be found at https://github.com/DNS-OARC/dsc/blob/develop/LICENSE
HELP_TEXT = {
'ip_direction' : 'One of three values: sent, recv or else. Direction is determined based on the setting for local_address in the configuration file.',
'ip_proto' : 'The IP protocol type, e.g.: tcp, udp or icmp.',
'ip_version' : 'The IP version number, e.g.: 4 or 6',
'certain_qnames' : 'This indexer isolates the two most popular query names seen by DNS root servers: localhost and [a--m].root-servers.net.',
'client_subnet' : "Subnet of the client's IP address.",
'client' : 'The IP (v4 and v6) address of the DNS client.',
'server' : 'The IP (v4 and v6) address of the DNS server.',
'country' : 'The country code of the client IP address.',
'asn' : 'The AS (autonomous system) number of the IP (v4 and v6).',
'do_bit' :'0 or 1. It indicates whether or not the "DO" bit is set in a DNS query. According to RFC 2335: Setting the DO bit to one in a query indicates to the server that the resolver is able to accept DNSSEC security RRs.',
'edns_version' : 'The EDNS version number, if any, in a DNS query. EDNS Version 0 is documented in RFC 2671.',
'edns_bufsiz' : 'The EDNS buffer size per 512 chunks (0-511, 512-1023 etc).',
'idn_qname' : '1 when the first QNAME in the DNS message question section is an internationalized domain name (i.e., containing non-ASCII characters). Such QNAMEs begin with the string "xn--". This convention is documented in RFC 3490.',
'msglen' : 'The overall length (size) of the DNS message.',
'null' : 'No Help',
'opcode' : 'The DNS message opcode: QUERY, IQUERY, STATUS, NOTIFY, or UPDATE',
'qclass' : 'The DNS message query class (QCLASS). IN, CHAOS, HS, NONE, or ANY',
'qname' : 'The full QNAME string from the first (and usually only) QNAME in the question section of a DNS message.',
'qnamelen' : 'The length of the first (and usually only) QNAME in a DNS message question section.',
'qtype' :'The query type (QTYPE) for the first QNAME in the DNS message question section. Well-known query types include: A, AAAA, A6, CNAME, PTR, MX, NS, SOA, and ANY.',
'query_classification' : 'A stateless classification of bogus queries (See dsc.conf(5) for more details).',
'rcode' : 'The RCODE value in a DNS response. The most common response codes are NO ERROR and NXDOMAIN',
'rd_bit' : '1 if the RD (recursion desired) bit is set in the query. Usually only stub resolvers set the RD bit. Usually authoritative servers do not offer recursion to their clients.',
'tc_bit' : "1 if the TC (truncated) bit is set (in a response). An authoritative server sets the TC bit when the entire response won't fit into a UDP message.",
'tld' : "The TLD of the first QNAME in a DNS message's question section.",
'second_ld' : "The Second LD of the first QNAME in a DNS message's question section.",
'third_ld' : "The Third LD of the first QNAME in a DNS message's question section.",
'transport' : 'Indicates whether the DNS message is carried via UDP or TCP.',
'dns_ip_version' : 'The IP version number that carried the DNS message.',
'dns_source_port' : 'The source port of the DNS message.',
'dns_sport_range' : 'The source port of the DNS message per 1024 chunks (0-1023, 1024-2047 etc).',
'qr_aa_bits' : 'The "qr_aa_bits" dataset may be useful when dsc is monitoring an authoritative name server. This dataset counts the number of DNS messages received with each combination of QR,AA bits. Normally the authoritative name server should *receive* only *queries*. If the name server is the target of a DNS reflection attack, it will probably receive DNS *responses* which have the QR bit set.',
'response_time' : 'Response time of a query',
}
def translate_val(indexer, stat):
if isinstance(stat, dict): # From 1D metrics
if 'base64' in stat:
return base64.b64decode(stat['val'])
if indexer in DNS_CODES:
return DNS_CODES[indexer].get(stat['val'], 'Other')
return stat['val']
else:
if indexer in DNS_CODES: # From 1D metrics
return DNS_CODES[indexer].get(stat, 'Other')
else:
return stat
def handle_pcap_stats(dataset):
# dsc_pcap_stats_captured_packets
# dsc_pcap_stats_filter_received_packets
for item in dataset['data']:
for stat in item['pcap_stat']:
if stat['val'] == 'pkts_captured':
metric_name = 'dsc_pcap_stats_captured_packets'
if stat['val'] == 'filter_received':
metric_name = 'dsc_pcap_stats_filter_received_packets'
if metric_name not in METRICS:
METRICS[metric_name] = Counter(metric_name,
'libpcap statistics for DNS traffic',
['ifname'])
METRICS[metric_name].labels(item['ifname']).inc(stat['count'])
def make_help_text(dataset):
dataset_conf = CONF.dataset(dataset['name'])
d1_indexer = dataset_conf['d1_indexer']
d2_indexer = dataset_conf['d2_indexer']
if d1_indexer == 'null':
return HELP_TEXT.get(dataset['name'], 'No Help')
else:
return "2-dimensional DSC metric composing data from %s and %s indexers. %s: %s %s: %s" % (d1_indexer, d2_indexer, d1_indexer, HELP_TEXT.get(d1_indexer), d2_indexer, HELP_TEXT.get(d2_indexer, 'No Help'))
@PROCESS_DATASET.time()
def process_dataset(dataset):
name = dataset['name']
# Handle pcap_stats specially
if name == 'pcap_stats':
return handle_pcap_stats(dataset)
dataset_conf = CONF.dataset(name)
d1_label = dataset['dimensions'][0]
d2_label = dataset['dimensions'][1]
d1_indexer = dataset_conf['d1_indexer']
d2_indexer = dataset_conf['d2_indexer']
for item in dataset['data']: # For each D1 value
for stat in item[d2_label]: # For each D2 stat associated with the D1 value
labels = {d2_label : translate_val(d2_indexer, stat)}
if d1_indexer != 'null': # A 2D dataset
labels[d1_label] = translate_val(d1_indexer, item[d1_label])
if name not in METRICS:
METRICS[name] = Counter('dsc_%s_count' % (name),
make_help_text(dataset),
labels.keys())
METRICS[name].labels(**labels).inc(stat['count'])
@PROCESS_FILE.time()
def process_file(file):
syslog("Processing file: %s" % file)
with open(file) as fin:
datasets = json.load(fin)
for dataset in datasets:
process_dataset(dataset)
def handle_inotify(event):
if('.json' in event.pathname):
process_file(event.pathname)
def handle_reload(signum, frame):
syslog("Caught SIGHUP, reloading dsc config")
CONF.reload_dsc_conf()
def main():
openlog(DSC_LOG_NAME)
wm = pyinotify.WatchManager()
notifier = pyinotify.Notifier(wm, handle_inotify)
wm.add_watch(CONF.dsc_run_dir, pyinotify.IN_MOVED_TO)
syslog("Starting prometheus_client HTTP server")
start_http_server(EXPORTER_PORT)
syslog("Starting event loop")
notifier.loop()
if __name__ == '__main__':
# BUG: Need to be able to prevent other instances from running at the same time
openlog(DSC_LOG_NAME)
syslog("Starting DSC Prometheus Exporter")
closelog()
if(len(sys.argv) > 1 and sys.argv[1] == '-f'):
signal.signal(signal.SIGHUP, handle_reload)
main()
else:
with daemon.DaemonContext(signal_map = {signal.SIGHUP : handle_reload}):
main()
'''
Prometheus metric format:
# HELP <metric_name> <help_text>
# TYPE <metric_name> <gauge|counter>
<metric name>{label="value", label="value", ...} <value> [<timestamp>]
See dsc.conf(5) for details about the data format.
'dimensions' key is always a 2-element array of dimension labels
The two elements tell us the names of keys in each dict in the 'data' array.
'data' key is an array of dicts (and may be empty)
The value of the first key in each dict is a string ('Label1').
The value of the second key in each dict is always an array of zero or more dicts
Each of these dicts will have a 'val' key, and a 'count' key.
There may be an optional 'base64' key set to true or false, indicating
that the value is base64 encoded
What we're aiming for:
2D Arrays
dsc_pcap_stats_captured_packets{ifname="eth0"} 120
dsc_pcap_stats_filter_received_packets{ifname="eth0"} 120
dsc_pcap_stats_filter_received_packets{ifname="lo"} 120
dsc_direction_vs_ipproto_count{direction="sent", IPProto="udp"} 82
dsc_direction_vs_ipproto_count{direction="recv", IPProto="udp"} 1000
dsc_direction_vs_ipproto_count{direction="sent", IPProto="tcp"} 82
1D Arrays
dsc_response_time_count{ResponseTime="100-1000"} 41
dsc_response_time_count{ResponseTime="10-100"} 32
dsc_response_time_count{ResponseTime="1-10"} 32
dsc_third_ld_count{thirdLD="www.google.com"} 29
dsc_third_ld_count{thirdLD="s3.amazonaws.com"} 29
dsc_client_port_range_count_{PortRange="24576-25599"} 3
dsc_rd_bit_count_{RD="set"} 41
dsc_do_bit_count_{D0="clr"} 41
dsc_edns_bufsiz_count{EDNSBufSize="None"} 41
If we ever want to print out the stats data for textfile collector:
def process_file(self, file):
with open(file) as fin:
datasets = json.load(fin)
for dataset in datasets:
name = dataset['name']
dimension1 = dataset['dimensions'][0]
dimension2 = dataset['dimensions'][1]
for datum in dataset['data']:
for stat in datum[dimension2]:
if dimension1 == 'All':
print 'dsc_{}{{{}="{}"}} {} {}'.format(
name,
dimension2,
translate_val(dimension2, stat),
stat['count'],
dataset['stop_time']
)
else:
print 'dsc_{}{{{}="{}", {}="{}"}} {}'.format(
name,
dimension1,
datum[dimension1],
dimension2,
translate_val(dimension2, stat),
stat['count'],
dataset['stop_time']
)
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment