Skip to content

Instantly share code, notes, and snippets.

@bsnyder788
Created June 28, 2017 19:17
Show Gist options
  • Save bsnyder788/d76fa5bd0777f750529bbdf536e96ad8 to your computer and use it in GitHub Desktop.
Save bsnyder788/d76fa5bd0777f750529bbdf536e96ad8 to your computer and use it in GitHub Desktop.
def save_offset_ranges(csc, config, rdd):
"""Save offset ranges for a given RDD to Cassandra."""
for o in rdd.offsetRanges():
save_offset_to_cassandra(csc, config, o)
def save_offset_to_cassandra(csc, config, offset):
"""Save offset range to Cassandra."""
rdd = csc.parallelize([{
"topic": offset.topic,
"partition": offset.partition,
"from_offset": offset.fromOffset,
"until_offset": offset.untilOffset
}])
rdd.saveToCassandra(config.get_ks(), config.get_offsets_cf())
# save the latest Kafka offset ranges
kms.foreachRDD(lambda x: (save_offset_ranges(job.csc, job.get_config(), x)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment