Skip to content

Instantly share code, notes, and snippets.

@josiahcarlson
Created June 24, 2011 22:07
Show Gist options
  • Star 20 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save josiahcarlson/1045789 to your computer and use it in GitHub Desktop.
Save josiahcarlson/1045789 to your computer and use it in GitHub Desktop.
A way of implementing a poll-based chat inside Redis
'''
redis_simple_chat.py
Written June 24, 2011 by Josiah Carlson
Released under the GNU GPL v2
available: http://www.gnu.org/licenses/gpl-2.0.html
Other licenses may be available upon request.
This module intends to offer a simple way for Redis to hold state for chat
channels over time. Clients send messages, check for messages since they last
checked, and some cron daemon calls a cleanup function on occasion.
Requires a recent version of https://github.com/andymccurdy/redis-py/ or a
work-alike.
'''
import time
import redis
def send_message(conn, channel, message):
'''
This function will send a message to the provided channel.
'''
# get the id for this message
with conn.Lock('lock:channel:' + channel, timeout=5):
id = conn.zincrby('ids:', channel, 1)
pipeline = conn.pipeline(transaction=True)
# store the data
pipeline.hset('messages:' + channel, id, message)
# update the timeline
pipeline.zadd('timeline:' + channel, id, id)
# and keep a record about when this channel last got a message
pipeline.zadd('updated:', channel, time.time())
pipeline.execute()
def check_messages(conn, client, channel, limit=10):
'''
This function will check messages for a client on a given channel.
'''
# we must ensure that only one instance of this client is fetching data
# at a time
with conn.Lock('lock:client:' + client, timeout=5):
with conn.lock('lock:channel:' + channel, timeout=5):
pipeline = conn.pipeline(True)
cl_ch = client + ':' + channel
progress = 'clients:' + cl_ch
# find out which messages we already know about
pipeline.zinterstore(progress + ':tmp', ['timeline:' + channel, progress], aggregate='MAX')
# set the last time this client checked for messages
pipeline.zadd('clients:', cl_ch, time.time())
results = pipeline.execute()
if results[-1]:
# Client timed out, or client is new.
pipeline.zunionstore(progress, ['timeline:' + channel])
pipeline.delete(progress + ':tmp')
pipeline.execute()
return 'new connection'
elif not results[0]:
# First messages in a new channel, or a channel that had been
# deleted due to timeout.
conn.zunionstore(progress + 'tmp', ['timeline:' + channel])
if limit is not None:
pipeline.zremrangebyrank(progress + ':tmp', limit, -1)
# get the ids, update the known ids
pipeline.zrange(progress + ':tmp', 0, -1)
pipeline.zunionstore(progress, [progress, progress + ':tmp'], aggregate='MAX')
# discard memory of timed-out messages
pipeline.zinterstore(progress, [progress, 'timeline:' + channel], aggregate='MAX')
pipeline.delete(progress + ':tmp')
ids = pipeline.execute()[-4]
# return the known messages
return [msg for msg in conn.hmget('messages:' + channel, ids) if msg]
def clean_out_channel_backlog(conn, backlog=100, channel_timeout=900, client_timeout=300):
'''
This function cleans out old messages from channels, old channels, and
information about old client/channel pairs.
'''
# only one backlog cleanup function call can run at a time
with conn.Lock('lock:cleanup:'):
pipeline = conn.pipeline(True)
# find those channels that haven't been updated for a while
ch_timeout = time.time() - channel_timeout
for channel, score in conn.zrangebyscore('updated:', 0, ch_timeout, withscores=True):
with conn.Lock('lock:channel:' + channel, timeout=5):
if conn.zscore('updated:', channel) == score:
pipeline.delete(
'messages:' + channel,
'timeline:' + channel,
)
pipeline.zrem('updated:', channel)
pipeline.zrem('ids:', channel)
pipeline.execute()
# get a prioritized list of those channels that have the most volume
pipeline.zinterstore('ids:tmp', {'ids:cleanup': 1, 'ids:': -1})
pipeline.zunionstore('ids:cleanup', ['ids:'])
known = pipeline.execute()[0]
if not known:
# if we've never cleaned up before, clean them all
known = conn.zunionstore('ids:tmp', ['ids:cleanup'])
# iterate over chunks of channels to clean out old messages
for i in xrange(0, known, 100):
for channel in conn.zrange('ids:tmp', i, i+99):
with conn.Lock('lock:channel:' + channel, timeout=5):
# remove old messages from the timeline
message_ids = conn.zrange('timeline:' + channel, 0, -backlog-1)
for id in message_ids:
pipeline.hdel('messages:' + channel, id)
pipeline.zrem('timeline:' + channel, id)
pipeline.execute()
conn.delete('ids:tmp')
# clean out old clients that haven't checked recently
cl_timeout = time.time() - client_timeout
for client, score in conn.zrange('clients:', 0, cl_timeout, withscores=True):
with conn.Lock('lock:client' + client, timeout=5):
if conn.zscore('clients:', client) == score:
# remove the notice of when the item was last called
pipeline.zrem('clients:', client)
# clean out the progress zset
pipeline.delete('clients:' + client)
pipeline.execute()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment