Skip to content

Instantly share code, notes, and snippets.

@dreness
Last active April 16, 2024 07:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dreness/2dd40adff5e8cf03d528f1c542012db6 to your computer and use it in GitHub Desktop.
Save dreness/2dd40adff5e8cf03d528f1c542012db6 to your computer and use it in GitHub Desktop.
#!python -u
import re
from datetime import datetime
import click
import subprocess
import json
"""
NOTE: this requires a log config profile that enables 'private data' logging.
Run a `log` query for mDNSResponder resolver events and produce concise DNS questions and answers.
The output is grouped by query ID and sorted by timestamp or query ID.
Each query ID groups all questions and answers for that query
The default output mode is human-readable, but a CSV output mode is also available.
"""
@click.command()
@click.option(
"--sort_by",
type=click.Choice(["ts", "id"]),
default="ts",
help="Sort output groups by either timestamp or query ID",
)
@click.option(
"--last",
type=str,
default="10m",
help='Limit the amount of log data returned using an offset like 5m or 1h, passed to `log`s "--last" option',
)
@click.option(
"--csv",
is_flag=True,
default=False,
help="Output in CSV format"
)
def main(sort_by, last, csv):
predicate = r'sender == "mDNSResponder" and category == "resolver" and (formatString contains "%{public}sSent" or formatString contains "%{public}sReceived")'
log_command = ["log", "show", "--last", last, "--predicate", predicate, "--style", "json"]
proc = subprocess.run(log_command, stdout=subprocess.PIPE, text=True)
lines = proc.stdout.split("\n")
print(f"Found {len(lines)} log lines...")
log_output = json.loads(proc.stdout)
print(f"Found {len(log_output)} interesting resolver log events...")
results = process_log_entries(log_output, sort_by=sort_by)
if csv:
print_csv(results)
else:
print_sorted_results(sort_by, results)
def print_csv(results):
print("Timestamp,Query ID,Event Type,Event Data")
for query_id, data in results.items():
questions = data["questions"]
answers = data["answers"]
for question in questions:
print(f"{question[0].strftime('%Y-%m-%d %H:%M:%S.%f%z')},{query_id},Sent,{question[1]}")
for answer in answers:
print(f"{answer[0].strftime('%Y-%m-%d %H:%M:%S.%f%z')},{query_id},Received,{answer[1]}")
def process_log_entries(entries, sort_by):
results = {}
rex = re.compile("\[(Q\d+)\] (Sent|Received).*,\s*(.*)$")
results = parseDNSMessage(entries, results, rex)
# Sort the results either by ts or query_id
results = sortResults(sort_by, results)
return results
def sortResults(sort_by, results):
if sort_by == "ts":
# consider all timestamps of both questions and answers
all_timestamps = []
for query_id, data in results.items():
all_timestamps.extend([q[0] for q in data["questions"]])
all_timestamps.extend([a[0] for a in data["answers"]])
# sort all timestamps
all_timestamps.sort()
# create a new dictionary with the sorted timestamps
sorted_results = {}
for ts in all_timestamps:
for query_id, data in results.items():
for question in data["questions"]:
if question[0] == ts:
if query_id not in sorted_results:
sorted_results[query_id] = {"questions": [], "answers": []}
sorted_results[query_id]["questions"].append(question)
for answer in data["answers"]:
if answer[0] == ts:
if query_id not in sorted_results:
sorted_results[query_id] = {"questions": [], "answers": []}
sorted_results[query_id]["answers"].append(answer)
results = sorted(sorted_results.items(), key=lambda x: x[1]["questions"][0][0])
else:
# qyery ID is the first element of the tuple
results = sorted(results.items(), key=lambda x: int(x[0][1:]))
return results
def parseDNSMessage(entries, results, rex):
for entry in entries:
# in each entry, we want the timestamp and the eventMessage
timestamp_str = entry["timestamp"]
event_message = entry["eventMessage"]
# The eventMessage data looks like:
# \[(Q\d+)\] (Sent|Received).*,\s*(.*)$')
match = rex.match(event_message)
# match = re.match(rex, event_message)
# print(f"trying to match against {event_message}")
if match:
query_id = match.group(1)
entry_type = match.group(2)
final_field = match.group(3)
# Parse the timestamp
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f%z")
if query_id not in results:
results[query_id] = {"questions": [], "answers": []}
if entry_type == "Sent":
results[query_id]["questions"].append((timestamp, final_field))
elif entry_type == "Received":
results[query_id]["answers"].append((timestamp, final_field))
return results
def print_sorted_results(sort_by, sresults):
for query_id, data in sresults:
questions = data["questions"]
answers = data["answers"]
# For the 'header' line for each group, include the ts of the oldest question and also the requested record
oldest_query_ts = questions[0][0]
# print only the h, m, s, ms components
oldest_query_printable = oldest_query_ts.strftime("%H:%M:%S.%f")[:-3]
requested_record = questions[0][1]
print(f"{oldest_query_printable} {query_id:<8} {requested_record:<38}")
for question in questions[1:]:
# for each question, display a timedelta between the previous question and this one.
time_delta = question[0] - questions[questions.index(question) - 1][0]
# print time_delta in ms
time_delta = round(time_delta.total_seconds() * 1000, 0)
print(f"{time_delta:12} {' ':8} {question[1]:<30}")
for answer in answers:
# Find the closest previous question to calculate the time delta
closest_question = min(questions, key=lambda q: abs(q[0] - answer[0]))
time_delta = answer[0] - closest_question[0]
# print time_delta in ms
time_delta = time_delta.total_seconds() * 1000
# round to two decimal places
time_delta = round(time_delta, 2)
# left edge of answers should be vertically aligned with each other and the questions
print(f"{time_delta:>12} {' ':8} {answer[1]:<31}")
# Example usage: Read from standard input or list of lines
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment