Skip to content

Instantly share code, notes, and snippets.

@ehazlett
Created December 27, 2011 01:02
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save ehazlett/1522383 to your computer and use it in GitHub Desktop.
Save ehazlett/1522383 to your computer and use it in GitHub Desktop.
Logs Redis Pub/Sub messages to MongoDB
#!/usr/bin/env
#
# Requirements:
# argparse==1.2.1
# pymongo==2.1
# redis==2.4.10
import redis
import pymongo
import argparse
import sys
import logging
from datetime import datetime
import time
logging.basicConfig(level=logging.DEBUG)
def transcribe(redis_host='localhost', redis_port=6379, redis_password=None, redis_db=0,\
redis_channels='*', mongo_host='127.0.0.1', mongo_port=27017, mongo_user=None, \
mongo_password=None, mongo_db='transcribe'):
"""
Logs messages from Redis channels into Mongo
:keyword redis_host: Redis host/ip
:keyword redis_port: Redis port
:keyword redis_password: Redis password
:keyword redis_db: Redis database
:keyword redis_channels: Channel pattern to listen
:keyword mongo_host: MongoDB host/ip
:keyword mongo_port: MongoDB port
:keyword mongo_user: MongoDB user
:keyword mongo_password: MongoDB password
:keyword mongo_db: MongoDB database
"""
log = logging.getLogger('core')
log.debug('Connecting to Redis: {0}:{1} with db {2}'.format(redis_host, redis_port, redis_db))
rds = redis.Redis(host=redis_host, port=redis_port, db=redis_db, password=redis_password)
log.debug('Connecting to MongoDB: {0}:{1} with db {2} (user: {3})'.format(mongo_host, mongo_port, mongo_db, mongo_user))
mgo_db = pymongo.Connection(host=mongo_host, port=mongo_port)[mongo_db]
if mongo_user and mongo_password:
mgo_db.authenticate(mongo_user, mongo_pass)
# subscribe
ps = rds.pubsub()
ps.psubscribe(redis_channels)
# listen
log.info('Listening for messages on channels: {0}'.format(redis_channels))
for m in ps.listen():
data = {
'date': time.time(),
'channel': m['channel'],
'data': m['data'],
}
log.debug(' {0}:{1}: {2}'.format(datetime.fromtimestamp(data['date']), data['channel'], data['data']))
mgo_db.messages.insert(data)
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--redis-host', action="store", dest="redis_host", default="127.0.0.1", help="Redis host/ip")
parser.add_argument('--redis-port', action="store", type=int, dest="redis_port", default=6379, help="Redis port")
parser.add_argument('--redis-password', action="store", dest="redis_password", default=None, help="Redis password")
parser.add_argument('--redis-db', action="store", type=int, dest="redis_db", default=0, help="Redis DB")
parser.add_argument('--redis-channels', action="store", dest="redis_channels", default='*', help="Redis Pub/Sub channel pattern")
parser.add_argument('--mongo-host', action="store", dest="mongo_host", default='127.0.0.1', help="MongoDB host/ip")
parser.add_argument('--mongo-port', action="store", type=int, dest="mongo_port", default=27017, help="MongoDB port")
parser.add_argument('--mongo-user', action="store", dest="mongo_user", default=None, help="MongoDB user")
parser.add_argument('--mongo-password', action="store", dest="mongo_password", default=None, help="MongoDB password")
parser.add_argument('--mongo-db', action="store", dest="mongo_db", default='transcribe', help="MongoDB DB")
opts = parser.parse_args()
try:
transcribe(**opts.__dict__)
except KeyboardInterrupt:
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment