Skip to content

Instantly share code, notes, and snippets.

@bradjasper
Created October 30, 2009 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bradjasper/222663 to your computer and use it in GitHub Desktop.
Save bradjasper/222663 to your computer and use it in GitHub Desktop.
Dead simple event tracking with celery and git
"""
Simple mixpanel wrapper. Does everything synchronously as we will be
calling this from a worker queue
"""
import base64
import json
import urllib
from django.conf import settings
API_TOKEN = getattr(settings, "MIXPANEL_API_TOKEN")
API_URL = getattr(settings, "MIXPANEL_API_URL")
def track(event, properties=None):
"""Track mixpanel events"""
if properties is None:
properties = {}
properties["token"] = API_TOKEN
if "time" in properties:
properties["time"] = int(properties["time"])
params = {"event": event, "properties": properties}
data = base64.b64encode(json.dumps(params))
conn = urllib.urlopen(API_URL + data)
contents = conn.read()
assert contents == "1", "Mixpanel returned unknown value: %s" % contents
return True
from mixpanel.tasks import TrackEvent
# Inside a view
def my_view(request):
TrackEvent.delay("search_view", {
"ip": request.META['REMOTE_ADDR'],
"distinct_id": session_id,
"time": time.time()})
"""
Mixpanel tasks
"""
from celery.task import Task, tasks
from base import track
class TrackEvent(Task):
"""Asynchrnously track a Mixpanel event"""
def run(self, event, properties=None, *args, **kwargs):
return track(event, properties)
tasks.register(TrackEvent)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment