Created
July 11, 2023 12:18
-
-
Save rdkls/1c8cd5f3895f7f96fddd9477f367a86a to your computer and use it in GitHub Desktop.
query athena nfw logs and insert into neo4j
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# run neo4j locally with e.g. docker run --rm -ti -p 7474:7474 -p7687:7687 --env NEO4J_AUTH=none neo4j | |
# athena table should be created per https://gist.github.com/rdkls/4d2228795b3a64d9a728f94e1441222f | |
from pyathena import connect | |
from neo4j import GraphDatabase | |
import threading | |
from halo import Halo # Import halo lib | |
# Define connection parameters | |
AWS_PROFILE_NAME = "my-authd-aws-profile" | |
ATHENA_URI = "s3://athena-results-xxxxxxxxxx/" | |
NEO4J_URI = "bolt://localhost:7687" | |
create_nodes = False | |
# Connect to Athena | |
athena_conn = connect( | |
s3_staging_dir=ATHENA_URI, | |
profile_name=AWS_PROFILE_NAME | |
) | |
# Connect to Neo4j | |
neo4j_driver = GraphDatabase.driver( | |
NEO4J_URI, | |
#auth=(NEO4J_USER, NEO4J_PASSWORD) | |
) | |
def create_nodes_and_relationships(tx, cipher): | |
tx.run(cipher) | |
def create_nodes_task(): | |
# Create nodes for each IP using neo4j MERGE | |
athena_cursor = athena_conn.cursor() | |
neo4j_session = neo4j_driver.session() | |
athena_cursor.execute(""" | |
SELECT 'MERGE (ip:IP {address: "' || event.src_ip || '"})' AS cipher | |
FROM "aws-accelerator-subscription-database"."anf_logs" | |
UNION | |
SELECT 'MERGE (ip:IP {address: "' || event.dest_ip || '"})' AS cipher | |
FROM "aws-accelerator-subscription-database"."anf_logs" | |
""") | |
# Create a spinner object for creating nodes | |
node_spinner = Halo(text="Creating nodes 0%", spinner="dots", color="cyan") | |
node_spinner.start() # Start the spinner | |
total_rows = athena_cursor.rowcount # Get the number of rows in cursor | |
for i, row in enumerate(athena_cursor): | |
cipher = row[0] | |
#print(cipher) | |
neo4j_session.execute_write(create_nodes_and_relationships, cipher=cipher) | |
percentage = (i + 1) / total_rows * 100 # Calculate percentage completion | |
node_spinner.text = f"Creating nodes {percentage:.2f}%" # Update spinner text with percentage | |
node_spinner.succeed("Creating nodes 100%") # Stop spinner with success message | |
# Close cursor and session | |
athena_cursor.close() | |
neo4j_session.close() | |
def create_relationships_task(): | |
# Create relationships between these nodes, having property pkts being sum of all packets between those IPs | |
athena_cursor = athena_conn.cursor() | |
neo4j_session = neo4j_driver.session() | |
athena_cursor.execute(""" | |
SELECT 'MATCH (src:IP {address: "' || event.src_ip || '"}), (dst:IP {address: "' || event.dest_ip || '"}) MERGE (src)-[r:CONNECTS_TO]->(dst)' AS cipher | |
FROM "aws-accelerator-subscription-database"."anf_logs" | |
""") | |
# Create a spinner object for creating relationships | |
rel_spinner = Halo(text="Creating relationships 0%", spinner="dots", color="cyan") | |
rel_spinner.start() # Start the spinner | |
total_rows = athena_cursor.rowcount # Get the number of rows in cursor | |
for i, row in enumerate(athena_cursor): | |
cipher = row[0] | |
if cipher: | |
#print(cipher) | |
neo4j_session.execute_write(create_nodes_and_relationships, cipher=cipher) | |
percentage = (i + 1) / total_rows * 100 # Calculate percentage completion | |
rel_spinner.text = f"Creating relationships {percentage:.2f}%" # Update spinner text with percentage | |
rel_spinner.succeed("Creating relationships 100%") # Stop spinner with success message | |
# Close cursor and session | |
athena_cursor.close() | |
neo4j_session.close() | |
if create_nodes: | |
# Create a thread for creating nodes | |
node_thread = threading.Thread(target=create_nodes_task) | |
node_thread.start() | |
# Create a thread for creating relationships | |
rel_thread = threading.Thread(target=create_relationships_task) | |
rel_thread.start() | |
# Wait for threads to finish | |
if create_nodes: | |
node_thread.join() | |
rel_thread.join() | |
# Close connection and driver | |
athena_conn.close() | |
neo4j_driver.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment