Skip to content

Instantly share code, notes, and snippets.

View bsnyder788's full-sized avatar

Brett Snyder bsnyder788

View GitHub Profile
### Keybase proof
I hereby claim:
* I am bsnyder788 on github.
* I am bsnyder788 (https://keybase.io/bsnyder788) on keybase.
* I have a public key ASDwUdJUxx442IJGnpEs9Ic1IDi2rkTSPX28-JKzaBs42wo
To claim this, I am signing this object:
def save_offset_ranges(csc, config, rdd):
"""Save offset ranges for a given RDD to Cassandra."""
for o in rdd.offsetRanges():
save_offset_to_cassandra(csc, config, o)
def save_offset_to_cassandra(csc, config, offset):
"""Save offset range to Cassandra."""
rdd = csc.parallelize([{
"topic": offset.topic,
"partition": offset.partition,
def read_offsets(csc, config):
"""Read offset ranges from Cassandra."""
rows = csc.cassandraTable(config.get_ks(),
config.get_offsets_cf())\
.select("topic", "partition", "from_offset", "until_offset")\
.where("topic=?", config.get_kafka_topic())\
.collect()
offset_ranges = {}
for r in rows:
tp = TopicAndPartition(r.topic, r.partition)