Skip to content

Instantly share code, notes, and snippets.

@ttsiodras
Created September 1, 2015 09:31
Show Gist options
  • Save ttsiodras/463fb9f6c4b4de8e9364 to your computer and use it in GitHub Desktop.
Save ttsiodras/463fb9f6c4b4de8e9364 to your computer and use it in GitHub Desktop.
MixPanel importer
#!/usr/bin/env python
"""Mixpanel event importer
Usage:
mp_import_evts.py -c configFile -t token -a api_key -i fname -d delta
mp_import_evts.py (-h | --help)
mp_import_evts.py --version
Options:
-c configFile config file with settings
-t token MixPanel token
-a api_key MixPanel API key
-i filename Input filename (as exported from MixPanel)
-d delta The MixPanel project's UTC offset in hours
(e.g. -8 for PST, 1 for EEST)
-h --help Show this usage screen
--version Show version
"""
from docopt import docopt
import os
import sys
import json
import base64
import urllib
import cPickle
from time import strftime
from collections import defaultdict
import logging
import eventlet
from eventlet.green import urllib2
from pyramid.paster import get_appsettings
from sqlalchemy import engine_from_config
from pyzendoc.models import (
DBSession,
User,
UserCompany)
def panic(msg):
g_logger.error(msg)
sys.exit(1)
class EventImporter(object):
def __init__(self, token, api_key, time_offset):
self.token = token
self.api_key = api_key
self.time_offset = time_offset
def update(self, event_list):
url = "http://api.mixpanel.com/import/?"
batch = []
for event in event_list:
assert "time" in event['properties'], \
"Must specify a backdated time"
assert "distinct_id" in event['properties'], \
"Must specify a distinct ID"
event['properties']['time'] = \
str(
int(event['properties']['time']) -
(self.time_offset * 3600)
) # transforms timestamp to UTC
if "token" not in event['properties']:
event['properties']["token"] = self.token
batch.append(event)
payload = {
"data": base64.b64encode(json.dumps(batch)),
"verbose": 1,
"api_key": self.api_key
}
response = urllib2.urlopen(url, urllib.urlencode(payload))
message = response.read()
#g_logger.info(
# "Sent 50 events on " + strftime("%Y-%m-%d %H:%M:%S") + "!")
#g_logger.info("Received: " + str(message))
if json.loads(message)['status'] != 1:
raise RuntimeError('import failed')
def batch_update(self, filename, companies_of_user):
pool = eventlet.GreenPool(size=10)
events = []
total = 0
with open(filename, 'r') as f:
for event in f:
evt_data = json.loads(event)
user_id = evt_data.get('properties', {}).get('id')
if user_id is None:
distinct_id = evt_data.get('properties', {}).get('distinct_id')
try:
user_id = int(distinct_id)
except ValueError:
# No id present, and weird distinct_id value...
continue
# As per suggestion from Marco Sanchez Junco of MixPanel
company_id = companies_of_user.get(user_id)
if company_id is not None and len(company_id) == 1:
company_id = company_id[0]
evt_data['properties']['id'] = company_id
evt_data['properties']['distinct_id'] = str(company_id)
events.append(evt_data)
if len(events) == 50:
total += 50
if (total % 1000) == 0:
g_logger.info("Sent %d events" % total)
pool.spawn(self.update, events)
events = []
if len(events):
self.update(events)
g_logger.info(str(events) + "\n" +
"Sent remaining %d events!" % len(events))
def main(args):
# Setup logging
logging.getLogger("segment").addHandler(logging.StreamHandler())
logger = logging.getLogger("mixpanel_client")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
global g_logger
g_logger = logger
# Parse config file
config_file = args.get('-c', None)
if config_file is None:
panic("Configuration file is mandatory! Aborting...")
settings = get_appsettings(config_file)
if not os.path.exists("mappings.db"):
# Setup SQLAlchemy part
sqlalchemy_url = settings.get('sqlalchemy.url', None)
if sqlalchemy_url is None:
panic("Missing sqlalchemy.url config line! Aborting...")
engine = engine_from_config(settings, 'sqlalchemy.')
DBSession.configure(bind=engine, autocommit=False)
# Prepare user to company lookup
query = DBSession.query(
User.id, UserCompany.company_id
).join(
UserCompany, UserCompany.user_id == User.id
).filter(
UserCompany.active.is_(True)
)
companies_of_user_temp = defaultdict(list)
for user_id, company_id in query:
companies_of_user_temp[user_id].append(company_id)
companies_of_user = {
k: v
for k, v in companies_of_user_temp.iteritems()
if len(v) <= 1
}
cPickle.dump(companies_of_user, open("mappings.db", "w"))
g_logger.info("Created user/company mappings.db")
else:
g_logger.info("Using cached user/company mappings.db")
companies_of_user = cPickle.load(open("mappings.db"))
# Parse args
if not os.path.exists(args.get("-i")):
panic("input file '%s' not found!" % unicode(args.get("-i")))
import_event = EventImporter(
args.get("-t"), args.get("-a"), int(args.get("-d")))
import_event.batch_update(args.get("-i"), companies_of_user)
if __name__ == "__main__":
main(docopt(__doc__, version='MixPanel Importer 0.1'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment