Skip to content

Instantly share code, notes, and snippets.

@akheron
Last active March 14, 2016 11:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akheron/890d0506b803abb3130b to your computer and use it in GitHub Desktop.
Save akheron/890d0506b803abb3130b to your computer and use it in GitHub Desktop.
import logging, logging.config
TOPIC_NAME = 'TOPIC-NAME'
GROUP_NAME = 'GROUP-NAME'
BOOTSTRAP_SERVERS = ['BROKER-HOST']
def setup_logging():
logging.config.dictConfig({
'version': 1,
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)-8s %(module)s %(process)d %(thread)d %(message)s'
},
'console': {
'format': '[%(asctime)s] %(levelname)-8s %(message)s',
},
},
'handlers': {
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
},
'root': {
'level': 'DEBUG',
'handlers': ['console'],
},
})
def main():
from kafka import KafkaConsumer
log = logging.getLogger(__name__)
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=GROUP_NAME,
auto_commit_interval_ms=1000,
)
while True:
try:
_, messages = consumer.poll(1000).popitem()
except KeyError:
log.info('empty result set')
continue
for message in messages:
log.info('received message with offset %d' % message.offset)
if __name__ == '__main__':
setup_logging()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment