Skip to content

Instantly share code, notes, and snippets.

@eddienko
Forked from mrocklin/pcap.py
Created March 12, 2018 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eddienko/e61a494fccebe4bc8cc7e7712bbd49cb to your computer and use it in GitHub Desktop.
Save eddienko/e61a494fccebe4bc8cc7e7712bbd49cb to your computer and use it in GitHub Desktop.
import pandas as pd
def parse(line):
words = line.split()
time = words[0]
protocol = words[1]
if protocol == 'IP':
src_ip, src_port = words[2].rsplit('.', 1)
dst_ip, dst_port = words[4].strip(':').rsplit('.', 1)
src_port = int(src_port)
dst_port = int(dst_port)
else:
return None
try:
length = int(words[-1].strip('()'))
except ValueError:
length = 0
return {'source_ip': src_ip,
'source_port': src_port,
'destination_ip': dst_ip,
'destination_port': dst_port,
'time': pd.Timestamp(time),
'protocol': protocol,
'length': length}
from tornado.ioloop import IOLoop
import math
import pcap
from streamz import Stream
import toolz
import pandas as pd
from distributed.utils import log_errors
import sys
with open('text.pcap') as f:
lines = [pcap.parse(line) for line in toolz.take(5, f)]
columns=['time', 'source_ip', 'destination_ip', 'source_port',
'destination_port', 'protocol', 'length']
example = pd.DataFrame(lines, columns=columns)
import socket
@toolz.memoize
def hostname(ip):
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ip
def domain(addr):
if addr.split('.')[-1].isnumeric(): # probably an IP address
return addr
else:
return '.'.join(addr.split('.')[-2:])
source = Stream.from_textfile(sys.stdin)
records = source.rate_limit(0.01).map(pcap.parse).filter(None)
full_df = records.timed_window(0.200).filter(None).map(pd.DataFrame, columns=columns).to_dataframe(example=example).set_index('time')
full_df['source_domain'] = full_df.source_ip.map(hostname).map(domain)
full_df['destination_domain'] = full_df.destination_ip.map(hostname).map(domain)
histograms = {'source-domain': lambda df: df.source_domain.value_counts().reset_index(),
'source-port': lambda df: df.source_port.value_counts().reset_index(),
'destination-domain': lambda df: df.destination_domain.value_counts().reset_index(),
'destination-port': lambda df: df.destination_port.value_counts().reset_index()}
from bokeh.server.server import Server
from bokeh.application.handlers.function import FunctionHandler
from bokeh.application import Application
from bokeh.plotting import ColumnDataSource, figure
from bokeh.models import DataTable, TableColumn
from bokeh.layouts import row, column
def stream(df, source=None, doc=None, backlog=100):
d = {c: df[c] for c in df.columns}
doc.add_next_tick_callback(lambda: source.stream(d, backlog))
def update(df, source=None, doc=None):
d = {c: df[c].values for c in df.columns}
doc.add_next_tick_callback(lambda: source.data.update(d))
def factor_update(df, source=None, doc=None, fig=None, n=20):
with log_errors():
column = [c for c in df.columns if c != 'index'][0]
df = df.sort_values(column, ascending=False).iloc[:n]
d = {c: df[c].values for c in df.columns}
d['index'] = list(map(str, df['index']))
def _():
fig.x_range.factors = d['index']
source.data.update(d)
doc.add_next_tick_callback(_)
def on_selected(attr, old, new):
print('-' * 20)
print(attr)
print(old)
print(new)
def make_histograms(histograms, full, selections=None, on_change=None):
out = {}
for name, func in histograms.items():
df = func(full)
source = ColumnDataSource({c: [] for c in df.columns})
col = [c for c in df.columns if c != 'index'][0]
fig = figure(title=col.replace('_', ' ').title(), x_range=[],
width=400, height=400, tools='tap')
fig.xaxis.major_label_orientation = math.pi/4
fig.vbar(source=source, x='index', top=col, width=0.9)
if selections is not None:
def on_selected(fig, col, attr, old, new):
with log_errors():
selections[col] = [fig.x_range.factors[ind]
for ind in new['1d']['indices']]
if on_change:
on_change()
source.on_change('selected', toolz.partial(on_selected, fig, col))
# updater = df.stream.map(factor_update, source=source, doc=doc, fig=fig)
# doc._update_streams.add(updater)
out[name] = {'source': source, 'fig': fig}
for d in out.values():
d['fig'].y_range = fig.y_range
return out
def stringify(x):
if isinstance(x, str):
return '"%s"' % x
else:
return x
def main_page(doc):
doc._update_streams = set()
figs = []
selections = dict()
# Data table
table_source = ColumnDataSource({c: [] for c in full_df.example.reset_index().columns})
columns = [TableColumn(field=c, title=c.title()) for c in full_df.columns]
table = DataTable(source=table_source, columns=columns, width=1000, height=200)
last_updater = [None]
query_updaters = set()
query_source_figs = dict()
def on_change():
if last_updater[0]:
last_updater[0].destroy()
# doc._update_streams.remove(last_updater[0])
df = full_df
if selections:
cond = ' and '.join('(' + ' or '.join('%s == %s' % (column, stringify(value)) for value in v) + ')'
for column, v in selections.items()
if v)
if cond.strip():
df = df.query(cond, engine='python')
query_df = df
updater = query_df.reset_index().stream.map(stream, source=table_source, doc=doc, backlog=20)
doc._update_streams.add(updater)
last_updater[0] = updater
for s in query_updaters:
s.destroy()
query_updaters.clear()
windowed_query_df = query_df.window(value='20s')
for name, func in histograms.items():
d = query_source_figs[name]
df = histograms[name](windowed_query_df)
updater = df.stream.map(factor_update, source=d['source'], doc=doc, fig=d['fig'])
query_updaters.add(updater)
# Full Histograms
source_figs = make_histograms(histograms, full_df.example, selections, on_change)
# Query Histograms
query_source_figs.update(make_histograms(histograms, full_df.example, selections, on_change))
on_change()
windowed_df = full_df.window(value='20s')
for name, d in source_figs.items():
df = histograms[name](windowed_df)
updater = df.stream.map(factor_update, source=d['source'], doc=doc, fig=d['fig'])
doc._update_streams.add(updater)
full_figs = [v['fig'] for v in source_figs.values()]
query_figs = [v['fig'] for v in query_source_figs.values()]
doc.add_root(column(row(*full_figs), table, row(*query_figs)))
server = Server({'/main': main_page})
server.start()
IOLoop.current().add_callback(source.start)
IOLoop.current().start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment