Skip to content

Instantly share code, notes, and snippets.

@onetown
Created August 2, 2022 01:39
Show Gist options
  • Save onetown/14b28e92f277c610382d860fe1f32fb4 to your computer and use it in GitHub Desktop.
Save onetown/14b28e92f277c610382d860fe1f32fb4 to your computer and use it in GitHub Desktop.
import time
import re
from datetime import datetime
DATE_REGEX = re.compile(r"(.*?) dnsmasq\[.*?\]:.*$")
FORWARDED_REGEX = re.compile(r".*: forwarded (.*?) to (.*?)$")
QUERY_REGEX = re.compile(r".*: query\[(.*?)\] (.*?) from (.*?)$")
REPLY_REGEX = re.compile(r".*: reply (.*?) is (.*?)$")
CACHE_HITS_REGEX = re.compile(r".*: queries forwarded (\d+), queries answered locally (\d+)$")
QUERIES_REGEX = re.compile(r".*: server .*?: queries sent (\d+), retried or failed (\d+)$")
def parse(logger, line):
if len(line) == 0:
return None
m = re.match(DATE_REGEX, line)
date = datetime.strptime(m.group(1), "%b %d %H:%M:%S").replace(year=datetime.today().year)
t = int(time.mktime(date.timetuple()))
m = re.match(QUERY_REGEX, line)
if m:
return [("dnsmasq.query", t, 1, {"metric_type": "counter", "unit": "queries", "tags": ["query_type:" + m.group(1), "query:" + m.group(2)]})]
m = re.match(FORWARDED_REGEX, line)
if m:
return [("dnsmasq.forwarded", t, 1, {"metric_type": "counter", "unit": "queries", "tags": ["query:" + m.group(1)]})]
m = re.match(QUERIES_REGEX, line)
if m:
return [
("dnsmasq.queries.sent", t, int(m.group(1)), {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.queries.retried_or_failed", t, int(m.group(2)), {"metric_type": "gauge", "unit": "queries"})]
m = re.match(CACHE_HITS_REGEX, line)
if m:
miss = float(m.group(1))
hit = float(m.group(2))
total = miss + hit
ratio = hit / total
return [
("dnsmasq.cache.miss", t, miss, {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.cache.hit", t, hit, {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.cache.ratio", t, ratio, {"metric_type": "gauge", "unit": "queries"})]
def test_queries(logging):
line = "Mar 10 01:02:44 dnsmasq[4340]: server 10.128.0.2#53: queries sent 356, retried or failed 0"
expected = [
("dnsmasq.queries.sent", 1457600564, 356, {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.queries.retried_or_failed", 1457600564, 0, {"metric_type": "gauge", "unit": "queries"})]
actual = parse(logging, line)
assert expected == actual, "%s != %s" % (expected, actual)
def test_query(logging):
line = "Mar 10 17:59:48 dnsmasq[31712]: query[A] collector-176.newrelic.com from 172.17.0.47"
expected = [("dnsmasq.query", 1457661588, 1, {"metric_type": "counter", "unit": "queries", "tags": ["query_type:A", "query:collector-176.newrelic.com"]})]
actual = parse(logging, line)
assert expected == actual, "%s != %s" % (expected, actual)
def test_forwarded(logging):
line = "Mar 10 01:21:18 dnsmasq[4340]: forwarded 5-6-3-app.agent.datadoghq.com to 10.128.0.2\n"
expected = [("dnsmasq.forwarded", 1457601678, 1, {"metric_type": "counter", "unit": "queries", "tags": ["query:5-6-3-app.agent.datadoghq.com"]})]
actual = parse(logging, line)
assert actual == expected, "%s != %s" % (expected, actual)
line = "Mar 10 17:17:25 dnsmasq[31712]: forwarded rabbitmqpurple.empire to 10.128.0.2"
expected = [("dnsmasq.forwarded", 1457659045, 1, {"metric_type": "counter", "unit": "queries", "tags": ["query:rabbitmqpurple.empire"]})]
actual = parse(logging, line)
assert actual == expected, "%s != %s" % (expected, actual)
def test_cache_hit(logging):
line = "Mar 10 20:18:01 dnsmasq[3555]: queries forwarded 7, queries answered locally 9"
expected = [
("dnsmasq.cache.miss", 1457669881, 7, {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.cache.hit", 1457669881, 9, {"metric_type": "gauge", "unit": "queries"}),
("dnsmasq.cache.ratio", 1457669881, 0.5625, {"metric_type": "gauge", "unit": "queries"})]
actual = parse(logging, line)
assert actual == expected, "%s != %s" % (expected, actual)
def test_unknown(logging):
line = "Mar 10 01:02:44 dnsmasq[4340]: dualstack.agent-520-209329848.us-east-1. 54.243.66.16 4F"
expected = None
actual = parse(logging, line)
assert actual == expected, "%s != %s" % (expected, actual)
def test_empty_string(logging):
line = ""
expected = None
actual = parse(logging, line)
assert actual == expected, "%s != %s" % (expected, actual)
if __name__ == '__main__':
import logging
logging.basicConfig(level=logging.DEBUG)
test_queries(logging)
test_forwarded(logging)
test_query(logging)
test_cache_hit(logging)
test_unknown(logging)
test_empty_string(logging)
print 'tests passed'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment