Skip to content

Instantly share code, notes, and snippets.

@wolfhechel
Created July 7, 2016 10:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save wolfhechel/f3e5ac620f96ddfc26440cddbc307f91 to your computer and use it in GitHub Desktop.
Save wolfhechel/f3e5ac620f96ddfc26440cddbc307f91 to your computer and use it in GitHub Desktop.
Testing KEvent notifcations with asyncio
import asyncio
import socket
import fcntl
import struct
import sys
SIOCSKEVFILT = 0x800c6502
SYSPROTO_EVENT = 1
KEV_VENDOR_APPLE = 1
KEV_NETWORK_CLASS = 1
KEV_ANY_SUBCLASS = 0
KEV_INET_SUBCLASS = 1 # inet subclass identifier
KEV_INET_NEW_ADDR = 1 # Userland configured IP address
KEV_INET_CHANGED_ADDR = 2 # Address changed event
KEV_INET_ADDR_DELETED = 3 # IPv6 address was deleted
KEV_INET6_SUBCLASS = 6 # inet6 subclass identifier
KEV_INET6_NEW_USER_ADDR = 1 # Userland configured IPv6 address
KEV_INET6_CHANGED_ADDR = 2 # Address changed event (future)
KEV_INET6_ADDR_DELETED = 3 # IPv6 address was deleted
KEV_INET6_NEW_LL_ADDR = 4 # Autoconf LL address appeared
KEV_INET6_NEW_RTADV_ADDR = 5 # Autoconf address has appeared
KEV_DL_SUBCLASS = 2
KEV_DL_LINK_OFF = 12
KEV_DL_LINK_ON = 13
kern_event_msg_hdr = struct.Struct('6I')
net_event_data = struct.Struct('2I16s')
def sysproto_event_socket(event_vendor, event_class, event_subclass):
sock = socket.socket(socket.PF_SYSTEM, socket.SOCK_RAW, SYSPROTO_EVENT)
fcntl.ioctl(sock, SIOCSKEVFILT, struct.pack('3I', event_vendor, event_class, event_subclass))
return sock
class KEventNetworkMonitor(object):
_sock = None
_loop = None
def __init__(self, loop):
self._loop = loop
self._sock = self._construct_socket()
self._loop.create_task(self._start_reading())
def _construct_socket(self):
sock = sysproto_event_socket(KEV_VENDOR_APPLE, KEV_NETWORK_CLASS, KEV_ANY_SUBCLASS)
sock.setblocking(False)
return sock
async def _receive_msg(self):
buffer = await self._loop.sock_recv(self._sock, 1024)
total_size = int.from_bytes(buffer[:4], sys.byteorder)
if len(buffer) > total_size:
print('discarded data, shit')
return buffer[:total_size]
async def interface_down(self, interface):
print('%s down' % interface)
async def interface_up(self, interface):
print('%s up' % interface)
async def interface_changed(self, interface):
print('%s changed' % interface)
async def handle_event(self, vendor_code, kev_class, kev_subclass, id, event_code, event_data):
method = None
if kev_subclass == KEV_DL_SUBCLASS:
if event_code == KEV_DL_LINK_ON:
method = self.interface_up
elif event_code == KEV_DL_LINK_OFF:
method = self.interface_down
elif kev_subclass == KEV_INET_SUBCLASS:
if event_code in (KEV_INET_NEW_ADDR, KEV_INET_CHANGED_ADDR, KEV_INET_ADDR_DELETED):
method = self.interface_changed
if method:
a, b, c = net_event_data.unpack(event_data[:net_event_data.size])
iface = '%s%d' % (c.decode('ascii').strip('\0'), b)
await method(iface)
async def _start_reading(self):
message = await self._receive_msg()
total_size, vendor_code, kev_class, kev_subclass, id, event_code = \
kern_event_msg_hdr.unpack(message[:kern_event_msg_hdr.size])
event_data = message[kern_event_msg_hdr.size:]
self._loop.create_task(self._start_reading())
await self.handle_event(vendor_code, kev_class, kev_subclass, id, event_code, event_data)
def close(self):
self._sock.close()
loop = asyncio.get_event_loop()
monitor = KEventNetworkMonitor(loop)
loop.run_forever()
monitor.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment