Skip to content

Instantly share code, notes, and snippets.

@eddyg
Created January 5, 2023 13:49
Show Gist options
  • Save eddyg/e0822d02c9d08102af86edacce4e1292 to your computer and use it in GitHub Desktop.
Save eddyg/e0822d02c9d08102af86edacce4e1292 to your computer and use it in GitHub Desktop.
AnyConnect Log Watcher
#!/usr/bin/env python
#
# Watch the macOS logging system for certain AnyConnect-related events
#
# EddyG
# June 2021
# Createa a venv to run this:
#
# python3 -m venv aclogwatch (only required once)
# source aclogwatch/bin/activate
# pip install dictdiffer (only required once)
# sudo python3 aclogwatch/aclogwatch.py
# Ctrl-C to stop
# deactivate
# rm -fr aclogwatch (to remove python venv)
import asyncio
import json
import re
from dictdiffer import diff
prev_intfs = {}
async def exec_command_and_parse(cmd):
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE)
global prev_intfs
json_str = ""
while line := (await proc.stdout.readline()).decode("utf-8").rstrip():
if line.startswith("}"):
json_str = json_str.lstrip("[,") + "}"
msg = json.loads(json_str)
ts = msg["timestamp"].split(".")[0]
em = msg["eventMessage"]
if re.match(r"IP addresses from active interfaces: ", em):
intfs = {}
ips = re.sub(r"IP addresses from active interfaces: ", "", em)
intfs_str = re.findall(r"\w+\d+: [\dA-F:,. ]+", ips)
for intf_str in intfs_str:
intf, _, addrs = intf_str.partition(" ")
intfs[intf.rstrip(":")] = {addr.strip() for addr in addrs.split(",")}
if prev_intfs:
for change, key, what in diff(prev_intfs, intfs, expand=True):
name = key if key else what[0][0]
addrs = ", ".join([addr for addr in what[0][1]])
print(f"{ts} Address {change}: {name} {addrs}")
prev_intfs = intfs
elif re.match(r"PLATDiscovery", em):
# skip messages we don't care about
continue
else:
print(f"{ts} {msg['eventMessage']}")
json_str = "{" if line.endswith("{") else ""
elif not line.startswith("Filtering the log data using"):
json_str += line
await proc.wait()
return proc.returncode
def main():
predicate = ' '.join("""
(process == "vpnagentd" and (
eventMessage beginswith "Automatic correction" or
eventMessage contains "Cisco AnyConnect" or
eventMessage contains "VPN connection" or
eventMessage contains "Primary SSL" or
eventMessage contains "network control state" or
eventMessage contains "network interface" or
eventMessage beginswith "IP addresses from" or
eventMessage beginswith "A routing table change" or
eventMessage beginswith "The entire VPN"
))
or
(process = "configd" and (
(subsystem == "com.apple.SystemConfiguration" and (
category == "IPConfiguration" or
category == "IPMonitor"
)) or (
subsystem == "com.apple.IPConfiguration"
)
))
""".split())
try:
# start with stuff in the last 2 hours
cmd = f"""log show --style json --last 2h --predicate '{predicate}'"""
asyncio.run(exec_command_and_parse(cmd))
# then start streaming new log messages
cmd = f"""log stream --style json --predicate '{predicate}'"""
asyncio.run(exec_command_and_parse(cmd))
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment