Created
October 30, 2009 20:06
-
-
Save bradjasper/222663 to your computer and use it in GitHub Desktop.
Dead simple event tracking with celery and git
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Simple mixpanel wrapper. Does everything synchronously as we will be | |
calling this from a worker queue | |
""" | |
import base64 | |
import json | |
import urllib | |
from django.conf import settings | |
API_TOKEN = getattr(settings, "MIXPANEL_API_TOKEN") | |
API_URL = getattr(settings, "MIXPANEL_API_URL") | |
def track(event, properties=None): | |
"""Track mixpanel events""" | |
if properties is None: | |
properties = {} | |
properties["token"] = API_TOKEN | |
if "time" in properties: | |
properties["time"] = int(properties["time"]) | |
params = {"event": event, "properties": properties} | |
data = base64.b64encode(json.dumps(params)) | |
conn = urllib.urlopen(API_URL + data) | |
contents = conn.read() | |
assert contents == "1", "Mixpanel returned unknown value: %s" % contents | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from mixpanel.tasks import TrackEvent | |
# Inside a view | |
def my_view(request): | |
TrackEvent.delay("search_view", { | |
"ip": request.META['REMOTE_ADDR'], | |
"distinct_id": session_id, | |
"time": time.time()}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Mixpanel tasks | |
""" | |
from celery.task import Task, tasks | |
from base import track | |
class TrackEvent(Task): | |
"""Asynchrnously track a Mixpanel event""" | |
def run(self, event, properties=None, *args, **kwargs): | |
return track(event, properties) | |
tasks.register(TrackEvent) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment