Created
February 21, 2021 20:26
-
-
Save alexwoolford/71db537f74b468a68f2d0d037a94ce79 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka.admin import AdminClient, ConfigResource | |
import confluent_kafka | |
import concurrent.futures | |
from neo4j import GraphDatabase | |
""" | |
This script builds a graph of all the topics and properties for those topics. The graph can then be queried to identify unusual properties: | |
MATCH (topic:Topic)-[:HAS_PROPERTY]->(property:Property) | |
WHERE SIZE(()-[:HAS_PROPERTY]->(property)) < 10 | |
RETURN topic, property | |
""" | |
kafka_conf = {'bootstrap.servers': 'cp01.woolford.io:9092'} | |
admin_client = AdminClient(kafka_conf) | |
neo4j_conf = {'uri': "bolt://localhost:7687", 'user': 'neo4j', 'password': '********'} | |
def get_topic_properties(admin_client, topic): | |
properties = dict() | |
topic_config = admin_client.describe_configs([ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic)]) | |
for future in concurrent.futures.as_completed(iter(topic_config.values())): | |
config_response = future.result(timeout=1) | |
for property in config_response.values(): | |
properties[property.name] = property.value | |
return properties | |
def get_topics_properties(): | |
topics_properties = list() | |
for topic in admin_client.list_topics().topics: | |
properties = get_topic_properties(admin_client, topic) | |
for key, value in properties.items(): | |
topic_property = {'topic': topic, 'key': key, 'value': value} | |
topics_properties.append(topic_property) | |
return topics_properties | |
if __name__ == "__main__": | |
with GraphDatabase.driver(neo4j_conf.get('uri'), auth=(neo4j_conf.get('user'), neo4j_conf.get('password'))) as driver: | |
with driver.session() as session: | |
topics_properties = get_topics_properties() | |
for topics_property in topics_properties: | |
# define topic, property, and create the relationship | |
cypher = """MERGE (topic:Topic {{topic: '{topic}'}}) MERGE (property:Property {{property_key: '{property_key}', property_value: '{property_value}'}}) MERGE (topic)-[:HAS_PROPERTY]->(property)""".format(topic=topics_property.get('topic'), property_key=topics_property.get('key'), property_value=topics_property.get('value')) | |
session.run(cypher) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment