Skip to content

Instantly share code, notes, and snippets.

@danthelion
Created June 27, 2022 06:52
Show Gist options
  • Save danthelion/c5399d42dc4d6ff08345a3319239d852 to your computer and use it in GitHub Desktop.
Save danthelion/c5399d42dc4d6ff08345a3319239d852 to your computer and use it in GitHub Desktop.
Load Wikimedia change events into Kafka
import json
from kafka import KafkaProducer
from sseclient import SSEClient as EventSource
def produce_events_from_url(url: str, topic: str) -> None:
for event in EventSource(url):
if event.event == "message":
try:
parsed_event = json.loads(event.data)
except ValueError:
pass
else:
key = parsed_event["server_name"]
# Partiton by server_name
producer.send(topic, value=json.dumps(parsed_event).encode("utf-8"), key=key.encode("utf-8"))
if __name__ == "__main__":
producer = KafkaProducer(
bootstrap_servers="localhost:63248", client_id="wikidata-producer"
)
produce_events_from_url(
url="https://stream.wikimedia.org/v2/stream/recentchange", topic="recentchange"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment