Last active
October 2, 2018 16:50
-
-
Save GregorStocks/63d0605cc5e069ab686e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# Loads up some crap from Mixpanel and shoves it into a Postgres | |
# Setup: | |
# sudo apt-get install python-dev libpq-dev | |
# sudo easy_install psycopg2 | |
# Usage: | |
# python export-mixpanel-to-rds.py --hostname $HOSTNAME --database $DATABASE --username $USERNAME--password $PASSWORD --key $KEY --secret $SECRET --start 2013-12-01 --end 2014-01-10 | |
# ******************************************************************** | |
# MIXPANEL LIBRARY | |
# ******************************************************************** | |
# Mixpanel, Inc. -- http://mixpanel.com/ | |
# | |
# Python API client library to consume mixpanel.com analytics data. | |
# | |
# Copyright 2010-2013 Mixpanel, Inc | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# This file has Level modifications to it. | |
import hashlib | |
import urllib | |
import time | |
try: | |
import json | |
except ImportError: | |
import simplejson as json | |
class Mixpanel(object): | |
ENDPOINT = 'http://data.mixpanel.com/api' | |
VERSION = '2.0' | |
def __init__(self, api_key, api_secret): | |
self.api_key = api_key | |
self.api_secret = api_secret | |
def request(self, methods, params, format='json'): | |
""" | |
methods - List of methods to be joined, e.g. ['events', 'properties', 'values'] | |
will give us http://mixpanel.com/api/2.0/events/properties/values/ | |
params - Extra parameters associated with method | |
""" | |
params['api_key'] = self.api_key | |
params['expire'] = int(time.time()) + 600 # Grant this request 10 minutes. | |
params['format'] = format | |
if 'sig' in params: del params['sig'] | |
params['sig'] = self.hash_args(params) | |
request_url = '/'.join([self.ENDPOINT, str(self.VERSION)] + methods) + '/?' + self.unicode_urlencode(params) | |
request = urllib.urlopen(request_url) | |
data = request.read() | |
return [json.loads(line) for line in data.strip().split('\n')] | |
def unicode_urlencode(self, params): | |
""" | |
Convert lists to JSON encoded strings, and correctly handle any | |
unicode URL parameters. | |
""" | |
if isinstance(params, dict): | |
params = params.items() | |
for i, param in enumerate(params): | |
if isinstance(param[1], list): | |
params[i] = (param[0], json.dumps(param[1]),) | |
return urllib.urlencode( | |
[(k, isinstance(v, unicode) and v.encode('utf-8') or v) for k, v in params] | |
) | |
def hash_args(self, args, secret=None): | |
""" | |
Hashes arguments by joining key=value pairs, appending a secret, and | |
then taking the MD5 hex digest. | |
""" | |
for a in args: | |
if isinstance(args[a], list): args[a] = json.dumps(args[a]) | |
args_joined = '' | |
for a in sorted(args.keys()): | |
if isinstance(a, unicode): | |
args_joined += a.encode('utf-8') | |
else: | |
args_joined += str(a) | |
args_joined += '=' | |
if isinstance(args[a], unicode): | |
args_joined += args[a].encode('utf-8') | |
else: | |
args_joined += str(args[a]) | |
hash = hashlib.md5(args_joined) | |
if secret: | |
hash.update(secret) | |
elif self.api_secret: | |
hash.update(self.api_secret) | |
return hash.hexdigest() | |
# ******************************************************************** | |
# END MIXPANEL LIBRARY | |
# ******************************************************************** | |
import hashlib | |
import psycopg2 | |
import json | |
import datetime | |
import multiprocessing | |
from argparse import ArgumentParser | |
STRING = "text" | |
INT = "integer" | |
TIMESTAMP = "timestamp" | |
DATAPOINTS = {"mp_device_model": STRING, | |
"manufacturer": STRING, | |
"os_version": STRING, | |
"model": STRING, | |
"region": STRING, | |
"lib_version": STRING, | |
"radio": STRING, | |
"ios_ifa": STRING, # i don't know what this is | |
"mp_lib": STRING, | |
"carrier": STRING, | |
"screen_height": INT, | |
"distinct_id": STRING, | |
"app_release": STRING, | |
"city": STRING, | |
"time": TIMESTAMP, | |
"app_version": STRING, | |
"os": STRING, | |
"screen_width": INT, | |
"mp_country_code": STRING, | |
"event": STRING, | |
"id": STRING, | |
# If you have any fields you want to be first-class citizens, add them here. It's best if you do it before running this script, so you don't have to alter the table. | |
"userid": STRING, | |
"wifi": STRING, | |
"controller_class_name": STRING, | |
"extra": STRING # JSON-encoded "everything else" field, because we need a schema :( | |
} | |
def conncur(args): | |
conn = psycopg2.connect(host=args.hostname, database=args.database, user=args.username, password=args.password) | |
cur = conn.cursor() | |
return (conn, cur) | |
TABLE_NAME = 'mixpanel' | |
def set_up_db(args): | |
conn, cur = conncur(args) | |
# Assume that if the table exists, it has all the columns we need. | |
# If not, you're on your own. | |
cur.execute("CREATE TABLE IF NOT EXISTS %s (chartio_id text PRIMARY KEY, %s);" % (TABLE_NAME, ",".join(["%s %s" % (name, DATAPOINTS[name]) for name in DATAPOINTS]))) | |
# cur.execute("CREATE INDEX dates ON %s(time)" % TABLE_NAME) # Uncomment this on a single run if you want faster charts | |
# It's commented by default because there's no CREATE INDEX IF NOT EXISTS | |
conn.commit() | |
cur.close() | |
conn.close() | |
def hash_event(event): | |
sha = hashlib.sha1() | |
sha.update(str(event)) | |
return sha.hexdigest() | |
def shove_event_in_db(event, conn, cur): | |
hash = hash_event(event) | |
flat_event = {'event': event['event'], 'chartio_id': hash} | |
for key in event['properties'].keys(): | |
k = key.lstrip('$').replace(' ', '_').lower() | |
flat_event[k] = event['properties'][key] | |
# Get something that basically follows our schema | |
event_keys = set(flat_event.keys()) | |
keys_present = event_keys & set(DATAPOINTS.keys()) | |
keys_not_in_schema = event_keys - keys_present - set(["chartio_id"]) | |
if len(keys_not_in_schema): | |
extra_data = {} | |
for key in keys_not_in_schema: | |
extra_data[key] = flat_event[key] | |
flat_event["extra"] = json.dumps(extra_data) | |
keys_present.add("extra") | |
if "time" in flat_event: | |
flat_event["time"] = datetime.datetime.fromtimestamp(flat_event["time"]) | |
# Insert if it's not already there | |
query = "INSERT INTO %s (chartio_id, %s) SELECT %%(chartio_id)s, %s WHERE NOT EXISTS (SELECT 1 FROM %s WHERE chartio_id=%%(chartio_id)s);" % ( | |
TABLE_NAME, | |
','.join(keys_present), | |
','.join(['%%(%s)s' % key for key in keys_present]), | |
TABLE_NAME) | |
cur.execute(query, flat_event) | |
conn.commit() | |
def chunks(l, n): | |
# they are not guaranteed to all be the same size, to make this more readable | |
chunksize = (len(l) / n) + 1 | |
return [l[i*chunksize:(i+1)*chunksize] for i in xrange(n)] | |
def shove_chunk_in_db(args, chunk): | |
conn, cur = conncur(args) | |
for event in chunk: | |
shove_event_in_db(event, conn, cur) | |
cur.close() | |
conn.close() | |
if __name__ == '__main__': | |
parser = ArgumentParser(description='Shove data from Mixpanel into a database') | |
parser.add_argument("--key", help="your Mixpanel API key") | |
parser.add_argument("--secret", help="your Mixpanel API secret") | |
parser.add_argument("--hostname", help="your database hostname, like 'mixpanel.luirdusapdorxx.us-east-1.rds.amazonaws.com'") | |
parser.add_argument("--database", help="the name of your database") | |
parser.add_argument("--username", help="the database username") | |
parser.add_argument("--password", help="the database user's password") | |
parser.add_argument("--start", help="start date, like '2014-01-09'", default=(datetime.date.today() - datetime.timedelta(days=5)).isoformat()) | |
parser.add_argument("--end", help="end date, like '2014-01-09'", default=datetime.date.today().isoformat()) | |
args = parser.parse_args() | |
set_up_db(args) | |
api = Mixpanel( | |
api_key = args.key, | |
api_secret = args.secret | |
) | |
data = api.request(['export'], { | |
'from_date': args.start, | |
'to_date': args.end | |
}) | |
print "Processing %s events" % len(data) | |
NUM_THREADS = 32 | |
data_chunks = chunks(data, NUM_THREADS) | |
threads = [multiprocessing.Process(target=shove_chunk_in_db, args=(args, chunk)) for chunk in data_chunks] | |
for thread in threads: | |
thread.start() | |
for thread in threads: | |
thread.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment