Skip to content

Instantly share code, notes, and snippets.

@bsnyder788
Last active June 28, 2017 19:16
Show Gist options
  • Save bsnyder788/b37dca46772af45e6c82a952e0230670 to your computer and use it in GitHub Desktop.
Save bsnyder788/b37dca46772af45e6c82a952e0230670 to your computer and use it in GitHub Desktop.
def read_offsets(csc, config):
"""Read offset ranges from Cassandra."""
rows = csc.cassandraTable(config.get_ks(),
config.get_offsets_cf())\
.select("topic", "partition", "from_offset", "until_offset")\
.where("topic=?", config.get_kafka_topic())\
.collect()
offset_ranges = {}
for r in rows:
tp = TopicAndPartition(r.topic, r.partition)
offset_ranges[tp] = r.from_offset
return offset_ranges if len(offset_ranges) > 0 else None
# read in the last successfully processed Kafka offset
from_offsets = read_offsets(cassandra_streaming_context, config)
stream = KafkaUtils.createDirectStream(
ssc, topics=[config.get_topic()], kafkaParams={
“metadata.broker.list”: config.get_kafka_brokers(),
}, fromOffsets=from_offsets)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment