Created
January 5, 2023 13:49
-
-
Save eddyg/e0822d02c9d08102af86edacce4e1292 to your computer and use it in GitHub Desktop.
AnyConnect Log Watcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Watch the macOS logging system for certain AnyConnect-related events | |
# | |
# EddyG | |
# June 2021 | |
# Createa a venv to run this: | |
# | |
# python3 -m venv aclogwatch (only required once) | |
# source aclogwatch/bin/activate | |
# pip install dictdiffer (only required once) | |
# sudo python3 aclogwatch/aclogwatch.py | |
# Ctrl-C to stop | |
# deactivate | |
# rm -fr aclogwatch (to remove python venv) | |
import asyncio | |
import json | |
import re | |
from dictdiffer import diff | |
prev_intfs = {} | |
async def exec_command_and_parse(cmd): | |
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE) | |
global prev_intfs | |
json_str = "" | |
while line := (await proc.stdout.readline()).decode("utf-8").rstrip(): | |
if line.startswith("}"): | |
json_str = json_str.lstrip("[,") + "}" | |
msg = json.loads(json_str) | |
ts = msg["timestamp"].split(".")[0] | |
em = msg["eventMessage"] | |
if re.match(r"IP addresses from active interfaces: ", em): | |
intfs = {} | |
ips = re.sub(r"IP addresses from active interfaces: ", "", em) | |
intfs_str = re.findall(r"\w+\d+: [\dA-F:,. ]+", ips) | |
for intf_str in intfs_str: | |
intf, _, addrs = intf_str.partition(" ") | |
intfs[intf.rstrip(":")] = {addr.strip() for addr in addrs.split(",")} | |
if prev_intfs: | |
for change, key, what in diff(prev_intfs, intfs, expand=True): | |
name = key if key else what[0][0] | |
addrs = ", ".join([addr for addr in what[0][1]]) | |
print(f"{ts} Address {change}: {name} {addrs}") | |
prev_intfs = intfs | |
elif re.match(r"PLATDiscovery", em): | |
# skip messages we don't care about | |
continue | |
else: | |
print(f"{ts} {msg['eventMessage']}") | |
json_str = "{" if line.endswith("{") else "" | |
elif not line.startswith("Filtering the log data using"): | |
json_str += line | |
await proc.wait() | |
return proc.returncode | |
def main(): | |
predicate = ' '.join(""" | |
(process == "vpnagentd" and ( | |
eventMessage beginswith "Automatic correction" or | |
eventMessage contains "Cisco AnyConnect" or | |
eventMessage contains "VPN connection" or | |
eventMessage contains "Primary SSL" or | |
eventMessage contains "network control state" or | |
eventMessage contains "network interface" or | |
eventMessage beginswith "IP addresses from" or | |
eventMessage beginswith "A routing table change" or | |
eventMessage beginswith "The entire VPN" | |
)) | |
or | |
(process = "configd" and ( | |
(subsystem == "com.apple.SystemConfiguration" and ( | |
category == "IPConfiguration" or | |
category == "IPMonitor" | |
)) or ( | |
subsystem == "com.apple.IPConfiguration" | |
) | |
)) | |
""".split()) | |
try: | |
# start with stuff in the last 2 hours | |
cmd = f"""log show --style json --last 2h --predicate '{predicate}'""" | |
asyncio.run(exec_command_and_parse(cmd)) | |
# then start streaming new log messages | |
cmd = f"""log stream --style json --predicate '{predicate}'""" | |
asyncio.run(exec_command_and_parse(cmd)) | |
except KeyboardInterrupt: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment