Skip to content

Instantly share code, notes, and snippets.

@Atticuss
Last active January 24, 2019 22:02
Show Gist options
  • Save Atticuss/2934ae296ff931d07d146bc9c051d4ce to your computer and use it in GitHub Desktop.
Save Atticuss/2934ae296ff931d07d146bc9c051d4ce to your computer and use it in GitHub Desktop.
eat vpc flow logs -> dump to dgraph
import sys
import json
import boto3
import sqlite3
import pydgraph
from uuid import uuid4
dgraph_server = 'localhost:9080'
client_stub = pydgraph.DgraphClientStub(dgraph_server)
dgraph_client = pydgraph.DgraphClient(client_stub)
def load_events_into_mem():
print('Loading events into memory')
events_by_eni = {}
conn = sqlite3.connect('/Users/jcallahan/Desktop/projects/flow_log_ids/events.sqlite3', detect_types=sqlite3.PARSE_DECLTYPES)
curs = conn.cursor()
curs.execute('select eni, event from events')
resp = curs.fetchall()
for row in resp:
eni, event = row
try:
events_by_eni[eni].append(json.loads(event))
except:
events_by_eni[eni] = [json.loads(event)]
print('Done')
return events_by_eni
def delete_all():
op = pydgraph.Operation(drop_all=True)
dgraph_client.alter(op)
def build_schema():
schema = """
xid: string @index(term) .
communicates_with: uid .
"""
op = pydgraph.Operation(schema=schema)
dgraph_client.alter(op)
def get_uid_by_xid(xid):
variables = {'$a': xid}
query = """
query get_xid($a: string) {
get_uid(func: eq(xid, $a))
{
uid
}
}
"""
res = dgraph_client.query(query, variables=variables)
data = json.loads(res.json)
if len(data['get_uid']) > 0:
return data['get_uid'][0]['uid']
def add_node(xid):
txn = dgraph_client.txn()
nquads = '_:node <xid> "%s" .' % xid
res = txn.mutate(set_nquads=nquads)
txn.commit()
return res.uids['node']
def add_edge(src_uid, dst_uid):
edge_uuid = str(uuid4())
txn = dgraph_client.txn()
nquads = '<%s> <communicates_with> <%s> (edge_uuid="%s") .' % (src_uid, dst_uid, edge_uuid)
txn.mutate(set_nquads=nquads)
txn.commit()
return edge_uuid
def does_edge_exist(src_uid, dst_uid):
variables = {'$a': src_uid, '$b': dst_uid}
query = """
query get_edge($a: string, $b: string) {
get_edge(func: uid($a)) {
communicates_with @filter(uid($b)) {
uid
}
}
}
"""
res = dgraph_client.query(query, variables=variables)
data = json.loads(res.json)
if len(data['get_edge']) > 0:
return True
else:
return False
def is_ip_public(ip_address):
ip_ranges = [
('192.168.0.0', '192.168.255.255'),
('172.16.0.0', '172.31.255.255'),
('10.0.0.0', '10.255.255.255')
]
return False if True in [is_ip_in_range(ip_address, ip_min, ip_max) for ip_min, ip_max in ip_ranges] else True
def is_ip_in_range(target_ip, ip_min, ip_max):
target_ip_parts = target_ip.split('.')
ip_min_parts = ip_min.split('.')
ip_max_parts = ip_max.split('.')
in_range = True
for target_part, ip_part_min, ip_part_max in zip(target_ip_parts, ip_min_parts, ip_max_parts):
if target_part < ip_part_min or target_part > ip_part_max:
in_range = False
return in_range
if __name__ == '__main__':
delete_all()
build_schema()
events_by_eni = load_events_into_mem()
ip_pairs = {}
num_edges = 0
num_nodes = 0
num_enis = len(events_by_eni)
"""
`event_list` is just a list of flow log events, with an `event` having the structure of:
{
'message': '2 079622674598 eni-00074b07 77.72.82.147 172.31.1.26 57880 1823 6 1 40 1518808575 1518808633 REJECT OK',
'timestamp': 1518808575000,
'ingestionTime': 1518808774254
}
"""
for eni_idx, (eni, event_list) in enumerate(events_by_eni.items()):
sys.stdout.write('\r%s/%s\t' % (eni_idx, num_enis))
sys.stdout.flush()
num_events = len(event_list)
for event_idx, event in enumerate(event_list):
version, account_id, interface_id, src_ip, dst_ip, src_port, dst_port, protocol, num_packets, num_bytes, start_time, end_time, action, log_status = event['message'].split(' ')
#print('%s -> %s' % (src_ip, dst_ip))
if src_ip not in ip_pairs.keys():
ip_pairs[src_ip] = []
if dst_ip in ip_pairs[src_ip]:
continue
else:
ip_pairs[src_ip].append(dst_ip)
if is_ip_public(src_ip) or is_ip_public(dst_ip):
continue
num_edges += 1
src_uid = get_uid_by_xid(src_ip)
if src_uid is None:
num_nodes += 1
src_uid = add_node(src_ip)
dst_uid = get_uid_by_xid(dst_ip)
if dst_uid is None:
num_nodes += 1
dst_uid = add_node(dst_ip)
#if not does_edge_exist(src_uid, dst_uid):
add_edge(src_uid, dst_uid)
print('\n\nnodes: %s -- edges: %s' % (num_nodes, num_edges))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment