Last active
March 19, 2020 01:11
-
-
Save davesteele/49e0fd6fa24ab2d36de8cada233ddfb4 to your computer and use it in GitHub Desktop.
Python asyncio MAC address monitoring
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import socket | |
import logging | |
import asyncio | |
import time | |
from collections import defaultdict | |
import json | |
logging.getLogger("scapy.runtime").setLevel(logging.ERROR) | |
from scapy.all import Ether, IP, send, ARP, sendp | |
async def get_macs(loop, macs): | |
s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(3)) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 212992) | |
s.bind(('wlan1', 3)) | |
while True: | |
data = await loop.sock_recv(s, 65536) | |
packet = Ether(data) | |
try: | |
ip = packet.getlayer(IP).src | |
except: | |
ip = 'None' | |
try: | |
if ip == 'None': | |
ip = macs[packet[Ether].src][1] | |
except: | |
pass | |
try: | |
macs[packet[Ether].src] = (int(time.time()), ip) | |
except: | |
pass | |
await asyncio.sleep(0) | |
async def save_macs(macs, macs_max): | |
while True: | |
for key in macs: | |
age = int(time.time()) - macs[key][0] | |
if key in macs_max: | |
macs_max[key] = max(macs_max[key], age) | |
else: | |
macs_max[key] = age | |
with open('macs.json', 'w') as fp: | |
json.dump(macs, fp, sort_keys=True, indent=4) | |
with open('macs_max.json', 'w') as fp: | |
json.dump(macs_max, fp, sort_keys=True, indent=4) | |
print(time.time()) | |
print(json.dumps(macs_max, sort_keys=True, indent=4)) | |
await asyncio.sleep(10) | |
async def arp_refresh(macs): | |
while True: | |
maclist = [x for x in macs] | |
for mac in maclist: | |
ip = macs[mac][1] | |
if ip != 'None': | |
print("arping ", ip) | |
sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip), verbose=False) | |
#await asyncio.sleep(0) | |
await asyncio.sleep(0) | |
await asyncio.sleep(10*60*0.9) | |
def get_json_dict(filename): | |
thedict = {} | |
try: | |
with open(filename, 'r') as fp: | |
thedict = json.load(fp) | |
except: | |
pass | |
return thedict | |
macs = get_json_dict('macs.json') | |
macs_max = get_json_dict('macs_max.json') | |
loop = asyncio.get_event_loop() | |
tasks = [ | |
loop.create_task(get_macs(loop, macs)), | |
loop.create_task(save_macs(macs, macs_max)), | |
loop.create_task((arp_refresh(macs))), | |
] | |
wait_tasks = asyncio.wait(tasks) | |
loop.run_until_complete(wait_tasks) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment