Skip to content

Instantly share code, notes, and snippets.

@cgbystrom
Created October 12, 2011 21:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cgbystrom/1282615 to your computer and use it in GitHub Desktop.
Save cgbystrom/1282615 to your computer and use it in GitHub Desktop.
Simple proof-of-concept implementing an activity stream on top of Cassandra
# Naive implementation of an activity stream service using Cassandra.
# Just a proof of concept and not anything that is for production use.
# Probably flawed in many ways like proper key usage, writing and features.
import pycassa
import datetime
import uuid
pool = pycassa.connect('Actstream')
subscribers = pycassa.ColumnFamily(pool, 'Subscribers')
streams = pycassa.ColumnFamily(pool, 'Streams')
activities = pycassa.ColumnFamily(pool, 'Activities')
def subscribe(from_stream_id, to_stream_id):
"""
Subscribes an activity stream to another activity stream
Example: subscribe('elvis', 'hector') will subscribe user 'elvis' to activities generated by 'hector'
"""
subscribers.insert('subscribers_' + str(from_stream_id), {str(to_stream_id): '\0'})
def unsubscribe(from_stream_id, to_stream_id):
"""
Unsubscribes an activity stream from another activity stream
Example: unsubscribe('elvis', 'hector') will unsubscribe user 'elvis' from activities generated by 'hector'
"""
return subscribers.remove('subscribers_' + str(from_stream_id), [str(to_stream_id)])
def get_subscribers(stream_id):
"""
Get all subscribers for a given activity stream
Example: get_subscribers('elvis') will get the all users subscribing to user 'elvis' activity stream
"""
return [int(k) for k in subscribers.get('subscribers_' + str(stream_id)).keys()]
def publish_activity(stream_id, message):
"""
Publishes an activity to an activity stream and all the subscribing activity streams
Example: publish_activity('hector', 'Ranked up to level 27') will publish the activity
to activity stream for user 'hector' and all his subscribers.
"""
key = str(uuid.uuid4())
activities.insert('activity_%s' % key, {'message': message})
stream_ids = get_subscribers(stream_id)
value = {datetime.datetime.utcnow(): key}
rows = {'streams_%s' % stream_id: value}
for sid in stream_ids:
rows['streams_%s' % sid] = value
streams.batch_insert(rows)
def get_stream(stream_id, limit):
"""
Get an activity stream
Example: get_stream('elvis', 10) will get the ten latest activities in user 'elvis' activity stream
"""
activity_ids = streams.get('streams_%s' % stream_id, column_count=limit, column_reversed=True).values()
activity_ids = ['activity_%s' % id for id in activity_ids]
return [row['message'] for row in activities.multiget(activity_ids).values()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment