Last active
January 24, 2019 22:02
-
-
Save Atticuss/2934ae296ff931d07d146bc9c051d4ce to your computer and use it in GitHub Desktop.
eat vpc flow logs -> dump to dgraph
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import json | |
import boto3 | |
import sqlite3 | |
import pydgraph | |
from uuid import uuid4 | |
dgraph_server = 'localhost:9080' | |
client_stub = pydgraph.DgraphClientStub(dgraph_server) | |
dgraph_client = pydgraph.DgraphClient(client_stub) | |
def load_events_into_mem(): | |
print('Loading events into memory') | |
events_by_eni = {} | |
conn = sqlite3.connect('/Users/jcallahan/Desktop/projects/flow_log_ids/events.sqlite3', detect_types=sqlite3.PARSE_DECLTYPES) | |
curs = conn.cursor() | |
curs.execute('select eni, event from events') | |
resp = curs.fetchall() | |
for row in resp: | |
eni, event = row | |
try: | |
events_by_eni[eni].append(json.loads(event)) | |
except: | |
events_by_eni[eni] = [json.loads(event)] | |
print('Done') | |
return events_by_eni | |
def delete_all(): | |
op = pydgraph.Operation(drop_all=True) | |
dgraph_client.alter(op) | |
def build_schema(): | |
schema = """ | |
xid: string @index(term) . | |
communicates_with: uid . | |
""" | |
op = pydgraph.Operation(schema=schema) | |
dgraph_client.alter(op) | |
def get_uid_by_xid(xid): | |
variables = {'$a': xid} | |
query = """ | |
query get_xid($a: string) { | |
get_uid(func: eq(xid, $a)) | |
{ | |
uid | |
} | |
} | |
""" | |
res = dgraph_client.query(query, variables=variables) | |
data = json.loads(res.json) | |
if len(data['get_uid']) > 0: | |
return data['get_uid'][0]['uid'] | |
def add_node(xid): | |
txn = dgraph_client.txn() | |
nquads = '_:node <xid> "%s" .' % xid | |
res = txn.mutate(set_nquads=nquads) | |
txn.commit() | |
return res.uids['node'] | |
def add_edge(src_uid, dst_uid): | |
edge_uuid = str(uuid4()) | |
txn = dgraph_client.txn() | |
nquads = '<%s> <communicates_with> <%s> (edge_uuid="%s") .' % (src_uid, dst_uid, edge_uuid) | |
txn.mutate(set_nquads=nquads) | |
txn.commit() | |
return edge_uuid | |
def does_edge_exist(src_uid, dst_uid): | |
variables = {'$a': src_uid, '$b': dst_uid} | |
query = """ | |
query get_edge($a: string, $b: string) { | |
get_edge(func: uid($a)) { | |
communicates_with @filter(uid($b)) { | |
uid | |
} | |
} | |
} | |
""" | |
res = dgraph_client.query(query, variables=variables) | |
data = json.loads(res.json) | |
if len(data['get_edge']) > 0: | |
return True | |
else: | |
return False | |
def is_ip_public(ip_address): | |
ip_ranges = [ | |
('192.168.0.0', '192.168.255.255'), | |
('172.16.0.0', '172.31.255.255'), | |
('10.0.0.0', '10.255.255.255') | |
] | |
return False if True in [is_ip_in_range(ip_address, ip_min, ip_max) for ip_min, ip_max in ip_ranges] else True | |
def is_ip_in_range(target_ip, ip_min, ip_max): | |
target_ip_parts = target_ip.split('.') | |
ip_min_parts = ip_min.split('.') | |
ip_max_parts = ip_max.split('.') | |
in_range = True | |
for target_part, ip_part_min, ip_part_max in zip(target_ip_parts, ip_min_parts, ip_max_parts): | |
if target_part < ip_part_min or target_part > ip_part_max: | |
in_range = False | |
return in_range | |
if __name__ == '__main__': | |
delete_all() | |
build_schema() | |
events_by_eni = load_events_into_mem() | |
ip_pairs = {} | |
num_edges = 0 | |
num_nodes = 0 | |
num_enis = len(events_by_eni) | |
""" | |
`event_list` is just a list of flow log events, with an `event` having the structure of: | |
{ | |
'message': '2 079622674598 eni-00074b07 77.72.82.147 172.31.1.26 57880 1823 6 1 40 1518808575 1518808633 REJECT OK', | |
'timestamp': 1518808575000, | |
'ingestionTime': 1518808774254 | |
} | |
""" | |
for eni_idx, (eni, event_list) in enumerate(events_by_eni.items()): | |
sys.stdout.write('\r%s/%s\t' % (eni_idx, num_enis)) | |
sys.stdout.flush() | |
num_events = len(event_list) | |
for event_idx, event in enumerate(event_list): | |
version, account_id, interface_id, src_ip, dst_ip, src_port, dst_port, protocol, num_packets, num_bytes, start_time, end_time, action, log_status = event['message'].split(' ') | |
#print('%s -> %s' % (src_ip, dst_ip)) | |
if src_ip not in ip_pairs.keys(): | |
ip_pairs[src_ip] = [] | |
if dst_ip in ip_pairs[src_ip]: | |
continue | |
else: | |
ip_pairs[src_ip].append(dst_ip) | |
if is_ip_public(src_ip) or is_ip_public(dst_ip): | |
continue | |
num_edges += 1 | |
src_uid = get_uid_by_xid(src_ip) | |
if src_uid is None: | |
num_nodes += 1 | |
src_uid = add_node(src_ip) | |
dst_uid = get_uid_by_xid(dst_ip) | |
if dst_uid is None: | |
num_nodes += 1 | |
dst_uid = add_node(dst_ip) | |
#if not does_edge_exist(src_uid, dst_uid): | |
add_edge(src_uid, dst_uid) | |
print('\n\nnodes: %s -- edges: %s' % (num_nodes, num_edges)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment