Skip to content

Instantly share code, notes, and snippets.

@GregorStocks
Last active October 2, 2018 16:50
Show Gist options
  • Save GregorStocks/63d0605cc5e069ab686e to your computer and use it in GitHub Desktop.
Save GregorStocks/63d0605cc5e069ab686e to your computer and use it in GitHub Desktop.
#! /usr/bin/env python
# Loads up some crap from Mixpanel and shoves it into a Postgres
# Setup:
# sudo apt-get install python-dev libpq-dev
# sudo easy_install psycopg2
# Usage:
# python export-mixpanel-to-rds.py --hostname $HOSTNAME --database $DATABASE --username $USERNAME--password $PASSWORD --key $KEY --secret $SECRET --start 2013-12-01 --end 2014-01-10
# ********************************************************************
# MIXPANEL LIBRARY
# ********************************************************************
# Mixpanel, Inc. -- http://mixpanel.com/
#
# Python API client library to consume mixpanel.com analytics data.
#
# Copyright 2010-2013 Mixpanel, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This file has Level modifications to it.
import hashlib
import urllib
import time
try:
import json
except ImportError:
import simplejson as json
class Mixpanel(object):
ENDPOINT = 'http://data.mixpanel.com/api'
VERSION = '2.0'
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def request(self, methods, params, format='json'):
"""
methods - List of methods to be joined, e.g. ['events', 'properties', 'values']
will give us http://mixpanel.com/api/2.0/events/properties/values/
params - Extra parameters associated with method
"""
params['api_key'] = self.api_key
params['expire'] = int(time.time()) + 600 # Grant this request 10 minutes.
params['format'] = format
if 'sig' in params: del params['sig']
params['sig'] = self.hash_args(params)
request_url = '/'.join([self.ENDPOINT, str(self.VERSION)] + methods) + '/?' + self.unicode_urlencode(params)
request = urllib.urlopen(request_url)
data = request.read()
return [json.loads(line) for line in data.strip().split('\n')]
def unicode_urlencode(self, params):
"""
Convert lists to JSON encoded strings, and correctly handle any
unicode URL parameters.
"""
if isinstance(params, dict):
params = params.items()
for i, param in enumerate(params):
if isinstance(param[1], list):
params[i] = (param[0], json.dumps(param[1]),)
return urllib.urlencode(
[(k, isinstance(v, unicode) and v.encode('utf-8') or v) for k, v in params]
)
def hash_args(self, args, secret=None):
"""
Hashes arguments by joining key=value pairs, appending a secret, and
then taking the MD5 hex digest.
"""
for a in args:
if isinstance(args[a], list): args[a] = json.dumps(args[a])
args_joined = ''
for a in sorted(args.keys()):
if isinstance(a, unicode):
args_joined += a.encode('utf-8')
else:
args_joined += str(a)
args_joined += '='
if isinstance(args[a], unicode):
args_joined += args[a].encode('utf-8')
else:
args_joined += str(args[a])
hash = hashlib.md5(args_joined)
if secret:
hash.update(secret)
elif self.api_secret:
hash.update(self.api_secret)
return hash.hexdigest()
# ********************************************************************
# END MIXPANEL LIBRARY
# ********************************************************************
import hashlib
import psycopg2
import json
import datetime
import multiprocessing
from argparse import ArgumentParser
STRING = "text"
INT = "integer"
TIMESTAMP = "timestamp"
DATAPOINTS = {"mp_device_model": STRING,
"manufacturer": STRING,
"os_version": STRING,
"model": STRING,
"region": STRING,
"lib_version": STRING,
"radio": STRING,
"ios_ifa": STRING, # i don't know what this is
"mp_lib": STRING,
"carrier": STRING,
"screen_height": INT,
"distinct_id": STRING,
"app_release": STRING,
"city": STRING,
"time": TIMESTAMP,
"app_version": STRING,
"os": STRING,
"screen_width": INT,
"mp_country_code": STRING,
"event": STRING,
"id": STRING,
# If you have any fields you want to be first-class citizens, add them here. It's best if you do it before running this script, so you don't have to alter the table.
"userid": STRING,
"wifi": STRING,
"controller_class_name": STRING,
"extra": STRING # JSON-encoded "everything else" field, because we need a schema :(
}
def conncur(args):
conn = psycopg2.connect(host=args.hostname, database=args.database, user=args.username, password=args.password)
cur = conn.cursor()
return (conn, cur)
TABLE_NAME = 'mixpanel'
def set_up_db(args):
conn, cur = conncur(args)
# Assume that if the table exists, it has all the columns we need.
# If not, you're on your own.
cur.execute("CREATE TABLE IF NOT EXISTS %s (chartio_id text PRIMARY KEY, %s);" % (TABLE_NAME, ",".join(["%s %s" % (name, DATAPOINTS[name]) for name in DATAPOINTS])))
# cur.execute("CREATE INDEX dates ON %s(time)" % TABLE_NAME) # Uncomment this on a single run if you want faster charts
# It's commented by default because there's no CREATE INDEX IF NOT EXISTS
conn.commit()
cur.close()
conn.close()
def hash_event(event):
sha = hashlib.sha1()
sha.update(str(event))
return sha.hexdigest()
def shove_event_in_db(event, conn, cur):
hash = hash_event(event)
flat_event = {'event': event['event'], 'chartio_id': hash}
for key in event['properties'].keys():
k = key.lstrip('$').replace(' ', '_').lower()
flat_event[k] = event['properties'][key]
# Get something that basically follows our schema
event_keys = set(flat_event.keys())
keys_present = event_keys & set(DATAPOINTS.keys())
keys_not_in_schema = event_keys - keys_present - set(["chartio_id"])
if len(keys_not_in_schema):
extra_data = {}
for key in keys_not_in_schema:
extra_data[key] = flat_event[key]
flat_event["extra"] = json.dumps(extra_data)
keys_present.add("extra")
if "time" in flat_event:
flat_event["time"] = datetime.datetime.fromtimestamp(flat_event["time"])
# Insert if it's not already there
query = "INSERT INTO %s (chartio_id, %s) SELECT %%(chartio_id)s, %s WHERE NOT EXISTS (SELECT 1 FROM %s WHERE chartio_id=%%(chartio_id)s);" % (
TABLE_NAME,
','.join(keys_present),
','.join(['%%(%s)s' % key for key in keys_present]),
TABLE_NAME)
cur.execute(query, flat_event)
conn.commit()
def chunks(l, n):
# they are not guaranteed to all be the same size, to make this more readable
chunksize = (len(l) / n) + 1
return [l[i*chunksize:(i+1)*chunksize] for i in xrange(n)]
def shove_chunk_in_db(args, chunk):
conn, cur = conncur(args)
for event in chunk:
shove_event_in_db(event, conn, cur)
cur.close()
conn.close()
if __name__ == '__main__':
parser = ArgumentParser(description='Shove data from Mixpanel into a database')
parser.add_argument("--key", help="your Mixpanel API key")
parser.add_argument("--secret", help="your Mixpanel API secret")
parser.add_argument("--hostname", help="your database hostname, like 'mixpanel.luirdusapdorxx.us-east-1.rds.amazonaws.com'")
parser.add_argument("--database", help="the name of your database")
parser.add_argument("--username", help="the database username")
parser.add_argument("--password", help="the database user's password")
parser.add_argument("--start", help="start date, like '2014-01-09'", default=(datetime.date.today() - datetime.timedelta(days=5)).isoformat())
parser.add_argument("--end", help="end date, like '2014-01-09'", default=datetime.date.today().isoformat())
args = parser.parse_args()
set_up_db(args)
api = Mixpanel(
api_key = args.key,
api_secret = args.secret
)
data = api.request(['export'], {
'from_date': args.start,
'to_date': args.end
})
print "Processing %s events" % len(data)
NUM_THREADS = 32
data_chunks = chunks(data, NUM_THREADS)
threads = [multiprocessing.Process(target=shove_chunk_in_db, args=(args, chunk)) for chunk in data_chunks]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment