Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save kbeathanabhotla/d898644924cf321c890fdedd1dddb223 to your computer and use it in GitHub Desktop.
Save kbeathanabhotla/d898644924cf321c890fdedd1dddb223 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment