Skip to content

Instantly share code, notes, and snippets.

@pthrasher
Created May 29, 2012 18:24
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save pthrasher/2829878 to your computer and use it in GitHub Desktop.
Save pthrasher/2829878 to your computer and use it in GitHub Desktop.
GistDeck for Charlotte Python Meetup

Who am I?

Philip Thrasher

Web hacker at a private cyber security company.

What I Do

  • Build webapps with Django
  • Build API's with Tornado (or django)
  • Deal with very large data sets.
  • Create interfaces used to interact with both real-time data, real-time statistics, and large static datasets.

Technologies I Work With on a Daily, or Almost Daily Basis.

  • Python
  • Django
  • Tornado
  • MySQL
  • Rabbit MQ
  • Hadoop
  • Javascript

What I Want to Show You Today

  1. How to manage real-time data simply using python and redis.
  2. How to show real-time data to users via websockets (using socket.io).

Don't worry... I'll be using socket.io with python (using Tornado), instead of node on the server side.

Problems With Realtime

  1. It never stops coming.
  2. Some sources only allow one consumer connection (twitter).
  3. Potentially LOTS of data.
  4. Hard to store.
  5. Hard to show in a meaningful way to the user depending on the dataset.

How The Pros Do It

  1. Many data sources post single items (or multiple in batch) onto a queue that another process consumes. There could be multiple queues depending on the data, multiple consumers, etc.
  2. The process that reads from the queue looks at the raw data, possibly "Hydrating" the data, by adding more meaningful data points. This process can be parallelized easily since it just processes incoming messages. Usually order is maintained by timestamps on cleaned data.
  3. Post processing is done, and actions are taken out on the events. Usually this means placing the cleaned data on new queues for a client process to consume.

What We're Going to Do

  1. Consume tweets from twitter in real time.
  2. See how to process each message as it comes in, in real time.
  3. Handle the one connection problem, and potential backups using redis pub/sub.
  4. Display this data in ways that may be useful to the user.

Step-1: Consume the Twitter Streaming API.

Python modules required:

  • tweepy
  • redis

Step-1: consumer.py

import tweepy
import redis
from twitter_credentials import *


class Listener(tweepy.StreamListener):
    def on_data(self, data):
        if data: # don't worry about blank lines
            redis.publish('twitter_raw', data)


if __name__ == '__main__':
    try:
        """
        The following need to be specified in './twitter_credentials.py'
        CONSUMER_KEY
        CONSUMER_SECRET
        ACCESS_TOKEN
        ACCESS_TOKEN_SECRET
        """
    
        auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        redis = redis.StrictRedis(host='localhost', port=6379, db=0)

        streaming_api = tweepy.streaming.Stream(auth, Listener(), timeout=60)

        # Geo box that covers the entire earth to ensure we only get geo-located tweets
        streaming_api.filter(locations=[ -180,-90,180,90 ])
    except KeyboardInterrupt:
        print "\nBye!"

Step-1: What consumer.py does:

  1. Authenticates with Twitter's streaming API using the tweepy Module.
  2. Asks for all tweets in real-time that are geo-located here on earth. ;-)
  3. Publishes all valid tweets to the 'twitter_raw' channel on the local redis server.

Not very exciting. But now we have one connection to twitter (twitter only allows one at a time for the streaming api) and we're publishing to a queue that someone else can consume from.

Step-2: Subscribe to the twitter_raw Channel

  1. Connect to Redis
  2. Subscribe to the channel.
  3. Print each tweet as it comes in, to stdout.

Step-2: subscriber.py

import redis
import json
import datetime


def log_tweet(username, name, tweet_body):
    now = datetime.datetime.now()
    print "[%s] @%s (%s): %s" % (now, username, name, tweet_body,)


def consume_subscription(tweets):
    while True:
        message = tweets.next()

        raw_tweet = message.get('data')
        if not raw_tweet:
            continue

        tweet = json.loads(raw_tweet)

        # Sometimes we just get back spurious data, like integers and stuff
        if not isinstance(tweet, dict):
            continue

        # Data extraction
        tweet_body = tweet.get('text')
        user = tweet.get('user', {})
        username = user.get('screen_name')
        name = user.get('name')

        # Data presentation
        if tweet_body and name and username:
            log_tweet(username, name, tweet_body)


if __name__ == '__main__':
    try:
        redis = redis.StrictRedis(host='localhost', port=6379, db=0)
        pubsub = redis.pubsub()
        pubsub.subscribe('twitter_raw')
        tweets = pubsub.listen()
        consume_subscription(tweets)

    except KeyboardInterrupt:
        print "\nBye!"

Step-3: Display Data Using Websockets

Python modules needed:

  1. tornado
  2. tornadio2
  3. brukva - tornado compatible redis client.

What we'll do

  1. Flesh out subscriber from previous step to allow for generic hydration functions. (instead of just extracting the user, name, and tweet text.)
  2. Write Tornado App to push these results to the client.

Step-3: enrich.py,

import redis
import json
import datetime
import hydrators
import events


HYDRATORS = [
    hydrators.user_data,
    hydrators.body,
]

EVENTS = {
    "simple_tweet": events.simple_tweet,
}

def consume_raw_subscription(tweets):
    """
    This method consumes the raw tweets, parses them, hands them to each
    hydrator, the hydrator cleans the data, then we run through a list of event
    handlers, each of which will publish to a redis queue if it has the data it
    needs.
    """
    while True:
        message = tweets.next()

        raw_tweet = message.get('data')
        if not raw_tweet:
            continue

        try:
            tweet = json.loads(raw_tweet)
        except ValueError, e:
            continue

        # Sometimes we just get back spurious data, like integers and stuff
        if not isinstance(tweet, dict):
            continue

        # Data extraction
        clean_obj = {}

        for hydrator in HYDRATORS:
            # Allow the method to overwrite our clean data
            hydrated = hydrator(tweet, clean_obj)
            if hydrated:
                clean_obj.update(hydrated)

        for kind, fn in EVENTS.items():
            # Publish events
            data = fn(clean_obj)
            json_data = json.dumps(data)
            event = json.dumps({
                'kind': kind,
                'body': json_data,
            })
            redis.publish('events', event)


if __name__ == '__main__':
    try:
        redis = redis.StrictRedis(host='localhost', port=6379, db=0)
        pubsub = redis.pubsub()
        pubsub.subscribe('twitter_raw')
        tweets = pubsub.listen()
        consume_raw_subscription(tweets)

    except KeyboardInterrupt:
        print "\nBye!"

Step-3: hydrators.py

def body(tweet, clean_obj):
    """
    Simpy grabs the tweet body.
    """
    tweet_body = tweet.get('text')
    if tweet_body:
        clean_obj.update({
            "body": tweet_body,
        })
    return clean_obj

def user_data(tweet, clean_obj):
    """
    Simply grabs username, and name.
    """
    user = tweet.get('user', {})
    username = user.get('screen_name')
    name = user.get('name')

    if username and name:
        clean_obj.update({
            "username": username,
            "name": name,
        })

    return clean_obj

Step-3: events.py

def simple_tweet(data):
    return {
        'username': data.get('username', ''),
        'name': data.get('name', ''),
        'body': data.get('body', ''),
    }

Step-3: webapp.py - Tornado Settings

# Create tornadio router
WSRouter = TornadioRouter(WebSocketHandler,
                            dict(enabled_protocols=['websocket', 'xhr-polling',
                                                    'jsonp-polling', 'htmlfile']))

# Create socket application
application = web.Application(
    WSRouter.apply_routes([(r"/step-(\d)+[/]?", StepPageHandler)]),
    flash_policy_port = 843,
    flash_policy_file = os.path.join(ROOT, 'flashpolicy.xml'),
    template_path = os.path.join(ROOT, 'templates'),
    static_path = os.path.join(ROOT, 'static'),
    socket_io_port = 8001,
    enable_pretty_logging = True
)

if __name__ == '__main__':
    socketio_server = SocketServer(application, auto_start=False)

    STATS = ServerStats()

    redis = brukva.Client(host='localhost', port=6379, selected_db=0)
    redis.connect()
    redis.subscribe('events')
    redis.listen(broadcast_events)

    ioloop.IOLoop.instance().start()

Step-3: webapp.py - Tornado Request Handlers

class StepPageHandler(web.RequestHandler):
    """
    Renders each step page for the talk.
    """
    def get(self, step):
        self.render('step-%s.html' % step)


class WebSocketHandler(SocketConnection):
    """
    Manages the global list of subscribers.
    """
    def on_open(self, *args, **kwargs):
        SUBSCRIBERS.append(self)

    def on_close(self):
        if self in SUBSCRIBERS:
            SUBSCRIBERS.remove(self)

Step-3: webapp.py - broadcast_events

def broadcast_events(event):
    """
    Generic event broadcaster. We subscribe to the 'events' redis queue, and
    anytime we get anything new from there, we emit that event to all of our
    http clients. This method is a brukva callback.
    """
    # Get the message body from the redis message.
    raw_event = event.body
    if not raw_event:
        # If there was no body to be had, let's just return.
        STATS.log("Recieved empty event body.", level="warn")
        return

    try:
        # Let's try to parse the JSON obj.
        event = json.loads(raw_event)
    except ValueError, e:
        # Ruh roh raggy... Wouldn't parse. Let's log it.
        STATS.log("Couldn't parse JSON object: %s" % str(raw_event), level="warn")
        return

    # Let's ensure this is a properly formed event (eg. kind, and json body.)
    kind = event.get('kind')
    body = event.get('body')
    if not kind or not body:
        STATS.log("Not a proper event: %s" % event, level="warn")
        return

    for subscriber in SUBSCRIBERS:
        STATS.incr('sent_messages', 1)
        subscriber.emit(kind, body)

    STATS.incr(kind, 1)

Step-3: What's happening so far?

  1. consumer.py is connected to the twitter streaming API, and pushing all tweets to the raw_tweets redis channel
  2. enrich.py is taking all tweets from raw_tweets in redis, and applying any configured hydration to it, then publishing any events that need to be derived from this hydrated data to the events channel in redis. (currently only the 'simple_tweet' event.)
  3. webapp.py is a tornado app that is consuming from the events channel in redis, and emitting a socket.io event to the browser based on this.

Step-3: Client-Side Code - socket.io

var host = window.location.hostname,
    port = 8001,
    socket = io.connect(host, port);

// watching for the `simple_tweet` event.
socket.on('simple_tweet', function(tweet) {

    renderTweet(JSON.parse(tweet));

});

function renderTweet(tweet) {

    // ...

}

Now what?

All data sent in real-time via an open connection to the browser... Really cool.

Not cool enough though. There's a lot of data in the average tweet, including geo-data. Let's draw this on a map.

Step-4: Draw Geo-Located Tweets as a blip on a map using SVG.

We'll need to add:

  1. A new hydrator that extracts geo-coords from each tweet
  2. A new event called "geo_blip" that the webapp will subscribe to.
  3. Add map and blip code to client.

Step-4: hydrators.py

def lat_lon(tweet, clean_obj):
    """
    Searches for the lat and lon and returns it.
    """
    geo = tweet.get('geo')
    if geo:
        geo_type = geo.get('type')
        if geo_type.lower() != 'point':
            return None

        lat, lon = geo.get('coordinates')
    else:
        place = tweet.get('place')
        if not place:
            return None

        bounding_box = place.get('bounding_box')
        if not bounding_box:
            return None

        coords = bounding_box.get('coordinates')
        if not coords:
            return None

        lat, lon = coords[0][0]

    if lat and lon:
        clean_obj.update({
            "lat": lat,
            "lon": lon,
        })

    return clean_obj

Step-4: events.py

def simple_tweet(data):
    return {
        'username': data.get('username', ''),
        'name': data.get('name', ''),
        'body': data.get('body', ''),
    }


def geo_blip(data):
    return {
        'lat': data.get('lat', 0),
        'lon': data.get('lon', 0),
    }

Step-4: enrich.py

def simple_tweet(data):
    return {
        'username': data.get('username', ''),
        'name': data.get('name', ''),
        'body': data.get('body', ''),
    }


def geo_blip(data):
    return {
        'lat': data.get('lat', 0),
        'lon': data.get('lon', 0),
    }


# ...

Step-4: socket.io Event Subscriber

socket.on('geo_blip', function(geo) {
    
    drawBlip(JSON.parse(geo));

}

function drawBlip(geo) {

    // ...

}

Recap

  1. Consumed events from twitter's feed.
  2. Enriched them in real-time.
  3. Created events from the data that the browser can subscribe to.
  4. Display this information in a friendly way on the client.

Fin

  • A lot more JS/CSS/HTML used for each page.
  • Some stats counting methods in python and js.

You can see this stuff in the public git-repo:

https://github.com/pthrasher/twitter-python-talk

Socket.IO: http://socket.io/ TornadoWeb: http://tornadoweb.org Redis: redis.io

Questions?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment