Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active March 6, 2018 19:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/d69ba72313c44e8e45e6453f4ea97074 to your computer and use it in GitHub Desktop.
Save ottomata/d69ba72313c44e8e45e6453f4ea97074 to your computer and use it in GitHub Desktop.
import json
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from pprint import pprint
def get_committed_offsets(consumer, partitions):
return [(tp, consumer.committed(tp)) for tp in partitions]
def commit(consumer_group, assignments):
consumer = KafkaConsumer(
bootstrap_servers=['kafka1012.eqiad.wmnet:9092'],
group_id=consumer_group,
enable_auto_commit=False,
)
# print the current committed offsets
print("\n\n%s currently at: " % consumer_group)
pprint(get_committed_offsets(consumer, assignments.keys()))
# print the message dts at these offsets:
print("Seeking to new offsets:")
for tp, o in assignments.items():
consumer.assign([tp])
consumer.seek(tp, o.offset)
message = next(consumer)
event = json.loads(message.value)
try:
dt = event['dt']
except:
dt = event['meta']['dt']
print("partition {}, offset {} is at dt {}".format(message.partition, message.offset, dt))
print("\nCommit these offsets to consumer group %s''? [y/n]:" % consumer_group)
if raw_input() == 'y':
print("Committing...", assignments)
consumer.commit(assignments)
else:
print("Not committing.")
# print the current committed offsets
print("%s now at: " % consumer_group)
pprint(get_committed_offsets(consumer, assignments.keys()))
# === EventLogging
#
# 12 partitions in eventlogging-valid-mixed
topic_partitions = [TopicPartition('eventlogging-valid-mixed', p) for p in range(0,12)]
# Offsets obtained from kafkacat like:
# for p in $(seq 0 11); do kafkacat -C -b kafka1012.eqiad.wmnet -f "%p %o\n%s\n" -t eventlogging-valid-mixed -p $p -c 1 -o -169000; done
assignments = {
topic_partitions[0]: OffsetAndMetadata(760139545, b''),
topic_partitions[1]: OffsetAndMetadata(760165349, b''),
topic_partitions[2]: OffsetAndMetadata(760213636, b''),
topic_partitions[3]: OffsetAndMetadata(758471379, b''),
topic_partitions[4]: OffsetAndMetadata(755461088, b''),
topic_partitions[5]: OffsetAndMetadata(759662862, b''),
topic_partitions[6]: OffsetAndMetadata(760187732, b''),
topic_partitions[7]: OffsetAndMetadata(760190481, b''),
topic_partitions[8]: OffsetAndMetadata(760185377, b''),
topic_partitions[9]: OffsetAndMetadata(757392297, b''),
topic_partitions[10]: OffsetAndMetadata(755504356, b''),
topic_partitions[11]: OffsetAndMetadata(759680897, b'')
}
commit('eventlogging_consumer_mysql_00', assignments)
# ===EventBus
#
# Offsets obtained like:
# kafkacat -C -b kafka1012.eqiad.wmnet -f "%p %o\n%s\n" -t "eqiad.mediawiki.page-create" -p 0 -c 1 -o -67000
assignments = {
TopicPartition('eqiad.mediawiki.page-create', 0): OffsetAndMetadata(48395235, b''),
TopicPartition('eqiad.mediawiki.page-delete', 0): OffsetAndMetadata(5363302, b''),
TopicPartition('eqiad.mediawiki.page-undelete', 0): OffsetAndMetadata(108336, b''),
TopicPartition('eqiad.mediawiki.page-move', 0): OffsetAndMetadata(2821346, b''),
}
commit('eventlogging_consumer_mysql_eventbus_00', assignments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment