Created
May 2, 2014 10:18
-
-
Save thinrhino/8f961f54492ad67b4409 to your computer and use it in GitHub Desktop.
A piece of code to retrieve raw data from mixpanel and dump into a bucket on AWS S3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Code to download and upload raw data from mix-panel | |
""" | |
import hashlib | |
import datetime | |
import time | |
import tempfile | |
import os | |
import bz2 | |
import logging | |
from boto.s3.connection import S3Connection | |
from boto.s3.key import Key | |
import requests | |
logger = logging.getLogger(__name__) | |
def get_data(date): | |
end_point = 'https://data.mixpanel.com/api/2.0/export/' | |
api_key = '<api_key>' | |
api_secret = '<api_secret>' | |
payload = {'api_key': api_key, | |
'from_date': date, | |
'expire': int(time.time()) + 600, | |
'to_date': date} | |
sorted_string = '' | |
for a in sorted(payload.keys()): | |
sorted_string += str(a) + '=' + str(payload[a]) | |
payload_hash = hashlib.md5(sorted_string) | |
payload_hash.update(api_secret) | |
payload['sig'] = payload_hash.hexdigest() | |
try: | |
r = requests.get(end_point, params=payload, stream=True) | |
handle, file_name = tempfile.mkstemp(suffix='.txt.bz2', prefix='mix_') | |
os.close(handle) | |
tf = bz2.BZ2File(filename=file_name, mode='w', compresslevel=9) | |
for line in r.iter_lines(): | |
tf.writelines('%s\n' % line) | |
tf.close() | |
except Exception, e: | |
raise e | |
return tf.name | |
def upload_s3(data_date, dump_bucket, file): | |
try: | |
if os.path.exists(file): | |
k = Key(dump_bucket) | |
data_date = datetime.datetime.strptime(data_date, '%Y-%m-%d').date() | |
key_path = '%s/%s/%s.bz2' % (data_date.year, data_date.month, data_date.day) | |
k.key = key_path | |
key_size = k.set_contents_from_filename(file) | |
if os.stat(file).st_size == key_size: | |
logger.info('data uploaded : %s' % key_path) | |
logger.info('file size uplaoded : %s' % key_size) | |
os.remove(file) | |
else: | |
logger.error('File upload %s failed' % file) | |
except Exception, e: | |
raise e | |
if __name__ == '__main__': | |
logging.basicConfig(filename="upload_data.log", level=logging.INFO, filemode='a') | |
console = logging.StreamHandler() | |
console.setLevel(logging.INFO) | |
logging.getLogger('').addHandler(console) | |
AWSAccessKeyId = '<aws_access_key>' | |
AWSSecretKey = '<aws_access_secret_key>' | |
conn = S3Connection(aws_secret_access_key=AWSSecretKey, | |
aws_access_key_id=AWSAccessKeyId) | |
dump_bucket = conn.get_bucket('<bucket_name>') | |
# The Code retrieves raw data from a historical date till today. | |
delta = 0 | |
while True: | |
start_date = datetime.datetime.strptime('<YYYY-MM-DD>', '%Y-%m-%d') + datetime.timedelta(days=delta) | |
data_date = start_date.strftime('%Y-%m-%d') | |
if start_date == datetime.date.today(): | |
break | |
logger.info('Getting data for : %s' % data_date) | |
temp_file = get_data(data_date) | |
upload_s3(data_date, dump_bucket, temp_file) | |
delta += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment