Skip to content

Instantly share code, notes, and snippets.

@alexwoolford
Created February 21, 2021 20:26
Show Gist options
  • Save alexwoolford/71db537f74b468a68f2d0d037a94ce79 to your computer and use it in GitHub Desktop.
Save alexwoolford/71db537f74b468a68f2d0d037a94ce79 to your computer and use it in GitHub Desktop.
from confluent_kafka.admin import AdminClient, ConfigResource
import confluent_kafka
import concurrent.futures
from neo4j import GraphDatabase
"""
This script builds a graph of all the topics and properties for those topics. The graph can then be queried to identify unusual properties:
MATCH (topic:Topic)-[:HAS_PROPERTY]->(property:Property)
WHERE SIZE(()-[:HAS_PROPERTY]->(property)) < 10
RETURN topic, property
"""
kafka_conf = {'bootstrap.servers': 'cp01.woolford.io:9092'}
admin_client = AdminClient(kafka_conf)
neo4j_conf = {'uri': "bolt://localhost:7687", 'user': 'neo4j', 'password': '********'}
def get_topic_properties(admin_client, topic):
properties = dict()
topic_config = admin_client.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic)])
for future in concurrent.futures.as_completed(iter(topic_config.values())):
config_response = future.result(timeout=1)
for property in config_response.values():
properties[property.name] = property.value
return properties
def get_topics_properties():
topics_properties = list()
for topic in admin_client.list_topics().topics:
properties = get_topic_properties(admin_client, topic)
for key, value in properties.items():
topic_property = {'topic': topic, 'key': key, 'value': value}
topics_properties.append(topic_property)
return topics_properties
if __name__ == "__main__":
with GraphDatabase.driver(neo4j_conf.get('uri'), auth=(neo4j_conf.get('user'), neo4j_conf.get('password'))) as driver:
with driver.session() as session:
topics_properties = get_topics_properties()
for topics_property in topics_properties:
# define topic, property, and create the relationship
cypher = """MERGE (topic:Topic {{topic: '{topic}'}}) MERGE (property:Property {{property_key: '{property_key}', property_value: '{property_value}'}}) MERGE (topic)-[:HAS_PROPERTY]->(property)""".format(topic=topics_property.get('topic'), property_key=topics_property.get('key'), property_value=topics_property.get('value'))
session.run(cypher)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment