Skip to content

Instantly share code, notes, and snippets.

@LoranKloeze
Last active January 9, 2020 21:47
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save LoranKloeze/6b713022619c2b32b32c6400a55a8433 to your computer and use it in GitHub Desktop.
Save LoranKloeze/6b713022619c2b32b32c6400a55a8433 to your computer and use it in GitHub Desktop.
This script grabs 802.11 probe requests from the air and puts them in a sqlite3-db
#!/usr/bin/env python2.7
# 802.11 probe requests processor
# Copyright (C) 2017 Ralon cybersecurity
# Loran Kloeze - loran@ralon.nl - @lorankloeze
# MIT license: do with it what you want but use it for good
#
# This script grabs probe requests from the air and puts them in a sqlite3-db.
# It cleans up the database to prevent old entries from filling up too much disk
# space. Check the regex before starting this script because it assumes 3
# antennas on the wireless device.
#
#
import subprocess
import re
import sqlite3
import time
import threading
### Settings
# The interface that is to be inspected. Make sure monitor mode is enabled for
# this interface.
monitor_dev = "wlan1"
# Path of the sqlite3-database.
db_path = "/html/db/probes.db"
# Tcpdump may stop/crash if monitor_dev goes offline. Restart tcpdump in
# restart_delay_sec seconds after it stops/crashes.
restart_delay_sec = 5
# Purge the database every purge_db_sec seconds.
purge_db_sec = 1 * 60
# Purge entries old than max_age_sec seconds from the database.
max_age_sec = 3 * 60
### Program
mainConn = 0
mainCursor = 0
# Thread to purge the database. Entries older than max_age_sec are removed.
class CleanDBThread(threading.Thread):
def __init__(self):
super(CleanDBThread, self).__init__()
def run(self):
while True:
remove_before_ts = int(time.time()) - max_age_sec
print "Removing probes older than", remove_before_ts
cleanConn = sqlite3.connect(db_path, 60)
cClean = cleanConn.cursor()
cClean.execute("DELETE FROM probes WHERE last_seen < ?", [remove_before_ts])
cleanConn.commit()
cleanConn.close()
time.sleep(purge_db_sec)
def startProbing():
# If you want to copy/paste: tcpdump -l -I -i wlan1 -e -s 256 type mgt subtype probe-req
proc = subprocess.Popen(['tcpdump', '-l', '-I', '-i', monitor_dev, '-e', '-s', '256', 'type', 'mgt', 'subtype', 'probe-req'],stdout=subprocess.PIPE)
patt = '(-\d+)dBm signal antenna 0 (-\d+)dBm signal antenna 1 (-\d+)dBm signal antenna 2.+SA:([0-9a-f]+:[0-9a-f]+:[0-9a-f]+:[0-9a-f]+:[0-9a-f]+:[0-9a-f]+) .+(Probe Request) \((.+)\)'
while True:
line = proc.stdout.readline()
if line != '':
m = re.search(patt, line)
if m is not None and len(m.groups()) == 6:
ant0 = m.group(1).rstrip()
ant1 = m.group(2).rstrip()
ant2 = m.group(3).rstrip()
# Calculate average of three antennas
ant_avg = (int(ant0)+int(ant1)+int(ant2))/3
mac = m.group(4).rstrip()
ssid = m.group(6).rstrip()
timestamp = int(time.time())
mainCursor.execute("INSERT OR REPLACE INTO probes VALUES (?,?,?,?)", (ssid, mac, ant_avg, timestamp))
mainConn.commit()
else:
break
def main():
global mainConn
global mainCursor
mainConn = sqlite3.connect(db_path)
mainCursor = mainConn.cursor()
mainCursor.execute('''CREATE TABLE IF NOT EXISTS probes
(ssid text, mac text, ant numeric, last_seen numeric)''')
mainCursor.execute('''CREATE UNIQUE INDEX IF NOT EXISTS ssid_index ON probes
(ssid)''')
mainCursor.execute('''CREATE INDEX IF NOT EXISTS last_seen_index ON probes
(last_seen)''')
mainConn.commit()
purgeThread = CleanDBThread()
purgeThread.daemon = True
purgeThread.start()
while True:
print "Starting probe search on", monitor_dev, "saving to", db_path
startProbing()
print "Probe search stopped, waiting for", restart_delay_sec, "seconds to restart"
time.sleep(restart_delay_sec)
mainConn.close()
if __name__ == "__main__":
main()
@LoranKloeze
Copy link
Author

Deze is voor routers en andere hardware wat Linux draait.
Zie https://gist.github.com/LoranKloeze/0b21b2a563340091b5b3b9069ea51168 voor de versie voor OSX.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment