Skip to content

Instantly share code, notes, and snippets.

@davesteele
Last active March 19, 2020 01:11
Show Gist options
  • Save davesteele/49e0fd6fa24ab2d36de8cada233ddfb4 to your computer and use it in GitHub Desktop.
Save davesteele/49e0fd6fa24ab2d36de8cada233ddfb4 to your computer and use it in GitHub Desktop.
Python asyncio MAC address monitoring
#!/usr/bin/python3
import socket
import logging
import asyncio
import time
from collections import defaultdict
import json
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
from scapy.all import Ether, IP, send, ARP, sendp
async def get_macs(loop, macs):
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(3))
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 212992)
s.bind(('wlan1', 3))
while True:
data = await loop.sock_recv(s, 65536)
packet = Ether(data)
try:
ip = packet.getlayer(IP).src
except:
ip = 'None'
try:
if ip == 'None':
ip = macs[packet[Ether].src][1]
except:
pass
try:
macs[packet[Ether].src] = (int(time.time()), ip)
except:
pass
await asyncio.sleep(0)
async def save_macs(macs, macs_max):
while True:
for key in macs:
age = int(time.time()) - macs[key][0]
if key in macs_max:
macs_max[key] = max(macs_max[key], age)
else:
macs_max[key] = age
with open('macs.json', 'w') as fp:
json.dump(macs, fp, sort_keys=True, indent=4)
with open('macs_max.json', 'w') as fp:
json.dump(macs_max, fp, sort_keys=True, indent=4)
print(time.time())
print(json.dumps(macs_max, sort_keys=True, indent=4))
await asyncio.sleep(10)
async def arp_refresh(macs):
while True:
maclist = [x for x in macs]
for mac in maclist:
ip = macs[mac][1]
if ip != 'None':
print("arping ", ip)
sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), verbose=False)
#await asyncio.sleep(0)
await asyncio.sleep(0)
await asyncio.sleep(10*60*0.9)
def get_json_dict(filename):
thedict = {}
try:
with open(filename, 'r') as fp:
thedict = json.load(fp)
except:
pass
return thedict
macs = get_json_dict('macs.json')
macs_max = get_json_dict('macs_max.json')
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(get_macs(loop, macs)),
loop.create_task(save_macs(macs, macs_max)),
loop.create_task((arp_refresh(macs))),
]
wait_tasks = asyncio.wait(tasks)
loop.run_until_complete(wait_tasks)
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment