Skip to content

Instantly share code, notes, and snippets.

@joar
Created October 10, 2019 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joar/ffbb0193ee6525c2119754d6ef86ce71 to your computer and use it in GitHub Desktop.
Save joar/ffbb0193ee6525c2119754d6ef86ce71 to your computer and use it in GitHub Desktop.
import json
import multiprocessing
import datetime as dt
import logging
import time
from queue import Empty
import pandas as pd
from colorama import Fore
import pydivert
_log = logging.getLogger(__name__)
class PacketStats(object):
def __init__(self):
self.process = multiprocessing.Process(target=self.run, args=())
self.process.daemon = True
self.ips = multiprocessing.Manager().list()
self.stats_queue = multiprocessing.Manager().Queue()
def start(self):
self.process.start()
_log.info('Dispatched PacketStats process')
def stop(self):
self.process.terminate()
_log.info('Terminated PacketStats process')
_log.info('Collected a total of {} IPs'.format(len(self.ips)))
def run(self):
with pydivert.WinDivert("(udp.SrcPort == 6672 or udp.DstPort == 6672) and ip") as w:
for packet in w:
dst = packet.ip.dst_addr
src = packet.ip.src_addr
if packet.is_inbound:
remote_addr = src
else:
remote_addr = dst
self.stats_queue.put(dict(
remote_addr=remote_addr,
ts=dt.datetime.now()
))
w.send(packet)
def run():
stats_coll = PacketStats()
stats_coll.start()
packet_stats = []
try:
while True:
while True:
try:
stat = stats_coll.stats_queue.get_nowait()
packet_stats.append(stat)
except Empty:
min_ = dt.datetime.now() - dt.timedelta(seconds=1)
packet_stats = [
stat
for stat in packet_stats
if stat["ts"] > min_
]
break
if len(packet_stats) == 0:
continue
df = pd.DataFrame(packet_stats)
df.ts = pd.to_datetime(df.ts)
last_5s = df
stats = last_5s.groupby("remote_addr").count() \
.sort_values(["ts"], ascending=False)
print(stats)
time.sleep(1)
except KeyboardInterrupt:
stats_coll.stop()
with open("stats.json", "w") as fd:
json.dump(packet_stats, fd, default=lambda o: o.isoformat())
print(f"{Fore.LIGHTWHITE_EX}Stopped collecting stats{Fore.RESET}")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
try:
if not pydivert.WinDivert.is_registered():
pydivert.WinDivert.register()
run()
finally:
if pydivert.WinDivert.is_registered():
pydivert.WinDivert.unregister()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment