Last active
April 16, 2024 07:40
-
-
Save dreness/2dd40adff5e8cf03d528f1c542012db6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!python -u | |
import re | |
from datetime import datetime | |
import click | |
import subprocess | |
import json | |
""" | |
NOTE: this requires a log config profile that enables 'private data' logging. | |
Run a `log` query for mDNSResponder resolver events and produce concise DNS questions and answers. | |
The output is grouped by query ID and sorted by timestamp or query ID. | |
Each query ID groups all questions and answers for that query | |
The default output mode is human-readable, but a CSV output mode is also available. | |
""" | |
@click.command() | |
@click.option( | |
"--sort_by", | |
type=click.Choice(["ts", "id"]), | |
default="ts", | |
help="Sort output groups by either timestamp or query ID", | |
) | |
@click.option( | |
"--last", | |
type=str, | |
default="10m", | |
help='Limit the amount of log data returned using an offset like 5m or 1h, passed to `log`s "--last" option', | |
) | |
@click.option( | |
"--csv", | |
is_flag=True, | |
default=False, | |
help="Output in CSV format" | |
) | |
def main(sort_by, last, csv): | |
predicate = r'sender == "mDNSResponder" and category == "resolver" and (formatString contains "%{public}sSent" or formatString contains "%{public}sReceived")' | |
log_command = ["log", "show", "--last", last, "--predicate", predicate, "--style", "json"] | |
proc = subprocess.run(log_command, stdout=subprocess.PIPE, text=True) | |
lines = proc.stdout.split("\n") | |
print(f"Found {len(lines)} log lines...") | |
log_output = json.loads(proc.stdout) | |
print(f"Found {len(log_output)} interesting resolver log events...") | |
results = process_log_entries(log_output, sort_by=sort_by) | |
if csv: | |
print_csv(results) | |
else: | |
print_sorted_results(sort_by, results) | |
def print_csv(results): | |
print("Timestamp,Query ID,Event Type,Event Data") | |
for query_id, data in results.items(): | |
questions = data["questions"] | |
answers = data["answers"] | |
for question in questions: | |
print(f"{question[0].strftime('%Y-%m-%d %H:%M:%S.%f%z')},{query_id},Sent,{question[1]}") | |
for answer in answers: | |
print(f"{answer[0].strftime('%Y-%m-%d %H:%M:%S.%f%z')},{query_id},Received,{answer[1]}") | |
def process_log_entries(entries, sort_by): | |
results = {} | |
rex = re.compile("\[(Q\d+)\] (Sent|Received).*,\s*(.*)$") | |
results = parseDNSMessage(entries, results, rex) | |
# Sort the results either by ts or query_id | |
results = sortResults(sort_by, results) | |
return results | |
def sortResults(sort_by, results): | |
if sort_by == "ts": | |
# consider all timestamps of both questions and answers | |
all_timestamps = [] | |
for query_id, data in results.items(): | |
all_timestamps.extend([q[0] for q in data["questions"]]) | |
all_timestamps.extend([a[0] for a in data["answers"]]) | |
# sort all timestamps | |
all_timestamps.sort() | |
# create a new dictionary with the sorted timestamps | |
sorted_results = {} | |
for ts in all_timestamps: | |
for query_id, data in results.items(): | |
for question in data["questions"]: | |
if question[0] == ts: | |
if query_id not in sorted_results: | |
sorted_results[query_id] = {"questions": [], "answers": []} | |
sorted_results[query_id]["questions"].append(question) | |
for answer in data["answers"]: | |
if answer[0] == ts: | |
if query_id not in sorted_results: | |
sorted_results[query_id] = {"questions": [], "answers": []} | |
sorted_results[query_id]["answers"].append(answer) | |
results = sorted(sorted_results.items(), key=lambda x: x[1]["questions"][0][0]) | |
else: | |
# qyery ID is the first element of the tuple | |
results = sorted(results.items(), key=lambda x: int(x[0][1:])) | |
return results | |
def parseDNSMessage(entries, results, rex): | |
for entry in entries: | |
# in each entry, we want the timestamp and the eventMessage | |
timestamp_str = entry["timestamp"] | |
event_message = entry["eventMessage"] | |
# The eventMessage data looks like: | |
# \[(Q\d+)\] (Sent|Received).*,\s*(.*)$') | |
match = rex.match(event_message) | |
# match = re.match(rex, event_message) | |
# print(f"trying to match against {event_message}") | |
if match: | |
query_id = match.group(1) | |
entry_type = match.group(2) | |
final_field = match.group(3) | |
# Parse the timestamp | |
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f%z") | |
if query_id not in results: | |
results[query_id] = {"questions": [], "answers": []} | |
if entry_type == "Sent": | |
results[query_id]["questions"].append((timestamp, final_field)) | |
elif entry_type == "Received": | |
results[query_id]["answers"].append((timestamp, final_field)) | |
return results | |
def print_sorted_results(sort_by, sresults): | |
for query_id, data in sresults: | |
questions = data["questions"] | |
answers = data["answers"] | |
# For the 'header' line for each group, include the ts of the oldest question and also the requested record | |
oldest_query_ts = questions[0][0] | |
# print only the h, m, s, ms components | |
oldest_query_printable = oldest_query_ts.strftime("%H:%M:%S.%f")[:-3] | |
requested_record = questions[0][1] | |
print(f"{oldest_query_printable} {query_id:<8} {requested_record:<38}") | |
for question in questions[1:]: | |
# for each question, display a timedelta between the previous question and this one. | |
time_delta = question[0] - questions[questions.index(question) - 1][0] | |
# print time_delta in ms | |
time_delta = round(time_delta.total_seconds() * 1000, 0) | |
print(f"{time_delta:12} {' ':8} {question[1]:<30}") | |
for answer in answers: | |
# Find the closest previous question to calculate the time delta | |
closest_question = min(questions, key=lambda q: abs(q[0] - answer[0])) | |
time_delta = answer[0] - closest_question[0] | |
# print time_delta in ms | |
time_delta = time_delta.total_seconds() * 1000 | |
# round to two decimal places | |
time_delta = round(time_delta, 2) | |
# left edge of answers should be vertically aligned with each other and the questions | |
print(f"{time_delta:>12} {' ':8} {answer[1]:<31}") | |
# Example usage: Read from standard input or list of lines | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment