Skip to content

Instantly share code, notes, and snippets.

@rdkls
Created July 11, 2023 12:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rdkls/1c8cd5f3895f7f96fddd9477f367a86a to your computer and use it in GitHub Desktop.
Save rdkls/1c8cd5f3895f7f96fddd9477f367a86a to your computer and use it in GitHub Desktop.
query athena nfw logs and insert into neo4j
#!/usr/bin/env python3
# run neo4j locally with e.g. docker run --rm -ti -p 7474:7474 -p7687:7687 --env NEO4J_AUTH=none neo4j
# athena table should be created per https://gist.github.com/rdkls/4d2228795b3a64d9a728f94e1441222f
from pyathena import connect
from neo4j import GraphDatabase
import threading
from halo import Halo # Import halo lib
# Define connection parameters
AWS_PROFILE_NAME = "my-authd-aws-profile"
ATHENA_URI = "s3://athena-results-xxxxxxxxxx/"
NEO4J_URI = "bolt://localhost:7687"
create_nodes = False
# Connect to Athena
athena_conn = connect(
s3_staging_dir=ATHENA_URI,
profile_name=AWS_PROFILE_NAME
)
# Connect to Neo4j
neo4j_driver = GraphDatabase.driver(
NEO4J_URI,
#auth=(NEO4J_USER, NEO4J_PASSWORD)
)
def create_nodes_and_relationships(tx, cipher):
tx.run(cipher)
def create_nodes_task():
# Create nodes for each IP using neo4j MERGE
athena_cursor = athena_conn.cursor()
neo4j_session = neo4j_driver.session()
athena_cursor.execute("""
SELECT 'MERGE (ip:IP {address: "' || event.src_ip || '"})' AS cipher
FROM "aws-accelerator-subscription-database"."anf_logs"
UNION
SELECT 'MERGE (ip:IP {address: "' || event.dest_ip || '"})' AS cipher
FROM "aws-accelerator-subscription-database"."anf_logs"
""")
# Create a spinner object for creating nodes
node_spinner = Halo(text="Creating nodes 0%", spinner="dots", color="cyan")
node_spinner.start() # Start the spinner
total_rows = athena_cursor.rowcount # Get the number of rows in cursor
for i, row in enumerate(athena_cursor):
cipher = row[0]
#print(cipher)
neo4j_session.execute_write(create_nodes_and_relationships, cipher=cipher)
percentage = (i + 1) / total_rows * 100 # Calculate percentage completion
node_spinner.text = f"Creating nodes {percentage:.2f}%" # Update spinner text with percentage
node_spinner.succeed("Creating nodes 100%") # Stop spinner with success message
# Close cursor and session
athena_cursor.close()
neo4j_session.close()
def create_relationships_task():
# Create relationships between these nodes, having property pkts being sum of all packets between those IPs
athena_cursor = athena_conn.cursor()
neo4j_session = neo4j_driver.session()
athena_cursor.execute("""
SELECT 'MATCH (src:IP {address: "' || event.src_ip || '"}), (dst:IP {address: "' || event.dest_ip || '"}) MERGE (src)-[r:CONNECTS_TO]->(dst)' AS cipher
FROM "aws-accelerator-subscription-database"."anf_logs"
""")
# Create a spinner object for creating relationships
rel_spinner = Halo(text="Creating relationships 0%", spinner="dots", color="cyan")
rel_spinner.start() # Start the spinner
total_rows = athena_cursor.rowcount # Get the number of rows in cursor
for i, row in enumerate(athena_cursor):
cipher = row[0]
if cipher:
#print(cipher)
neo4j_session.execute_write(create_nodes_and_relationships, cipher=cipher)
percentage = (i + 1) / total_rows * 100 # Calculate percentage completion
rel_spinner.text = f"Creating relationships {percentage:.2f}%" # Update spinner text with percentage
rel_spinner.succeed("Creating relationships 100%") # Stop spinner with success message
# Close cursor and session
athena_cursor.close()
neo4j_session.close()
if create_nodes:
# Create a thread for creating nodes
node_thread = threading.Thread(target=create_nodes_task)
node_thread.start()
# Create a thread for creating relationships
rel_thread = threading.Thread(target=create_relationships_task)
rel_thread.start()
# Wait for threads to finish
if create_nodes:
node_thread.join()
rel_thread.join()
# Close connection and driver
athena_conn.close()
neo4j_driver.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment