Skip to content

Instantly share code, notes, and snippets.

@thinrhino
Created May 2, 2014 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thinrhino/8f961f54492ad67b4409 to your computer and use it in GitHub Desktop.
Save thinrhino/8f961f54492ad67b4409 to your computer and use it in GitHub Desktop.
A piece of code to retrieve raw data from mixpanel and dump into a bucket on AWS S3
"""
Code to download and upload raw data from mix-panel
"""
import hashlib
import datetime
import time
import tempfile
import os
import bz2
import logging
from boto.s3.connection import S3Connection
from boto.s3.key import Key
import requests
logger = logging.getLogger(__name__)
def get_data(date):
end_point = 'https://data.mixpanel.com/api/2.0/export/'
api_key = '<api_key>'
api_secret = '<api_secret>'
payload = {'api_key': api_key,
'from_date': date,
'expire': int(time.time()) + 600,
'to_date': date}
sorted_string = ''
for a in sorted(payload.keys()):
sorted_string += str(a) + '=' + str(payload[a])
payload_hash = hashlib.md5(sorted_string)
payload_hash.update(api_secret)
payload['sig'] = payload_hash.hexdigest()
try:
r = requests.get(end_point, params=payload, stream=True)
handle, file_name = tempfile.mkstemp(suffix='.txt.bz2', prefix='mix_')
os.close(handle)
tf = bz2.BZ2File(filename=file_name, mode='w', compresslevel=9)
for line in r.iter_lines():
tf.writelines('%s\n' % line)
tf.close()
except Exception, e:
raise e
return tf.name
def upload_s3(data_date, dump_bucket, file):
try:
if os.path.exists(file):
k = Key(dump_bucket)
data_date = datetime.datetime.strptime(data_date, '%Y-%m-%d').date()
key_path = '%s/%s/%s.bz2' % (data_date.year, data_date.month, data_date.day)
k.key = key_path
key_size = k.set_contents_from_filename(file)
if os.stat(file).st_size == key_size:
logger.info('data uploaded : %s' % key_path)
logger.info('file size uplaoded : %s' % key_size)
os.remove(file)
else:
logger.error('File upload %s failed' % file)
except Exception, e:
raise e
if __name__ == '__main__':
logging.basicConfig(filename="upload_data.log", level=logging.INFO, filemode='a')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger('').addHandler(console)
AWSAccessKeyId = '<aws_access_key>'
AWSSecretKey = '<aws_access_secret_key>'
conn = S3Connection(aws_secret_access_key=AWSSecretKey,
aws_access_key_id=AWSAccessKeyId)
dump_bucket = conn.get_bucket('<bucket_name>')
# The Code retrieves raw data from a historical date till today.
delta = 0
while True:
start_date = datetime.datetime.strptime('<YYYY-MM-DD>', '%Y-%m-%d') + datetime.timedelta(days=delta)
data_date = start_date.strftime('%Y-%m-%d')
if start_date == datetime.date.today():
break
logger.info('Getting data for : %s' % data_date)
temp_file = get_data(data_date)
upload_s3(data_date, dump_bucket, temp_file)
delta += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment