Skip to content

Instantly share code, notes, and snippets.


djih/ Secret

Forked from jonyo/
Last active Mar 1, 2021
What would you like to do?
Sample import

This is an example script that uploads data to Amplitude. The data should be in a zipped file format where each line is a json object in the same format as required by the HTTP API docs: It is highly recommended to include an insert_id with each event to prevent data duplication on retries.

Usage: python3 starting_line_number

import json
import time
import certifi
import urllib
import urllib3
import logging
import os
from concurrent import futures
from urllib3.exceptions import HTTPError
from urllib.parse import urlencode
from zipfile import ZipFile
from concurrent.futures import as_completed
# subclass of ThreadPoolExecutor that provides:
# - proper exception logging from futures
# - ability to track futures and get results of all submitted futures
# - failure on exit if a future throws an exception (when futures are tracked)
class Executor(futures.ThreadPoolExecutor):
def __init__(self, max_workers):
super(Executor, self).__init__(max_workers)
self.track_futures = False
self.futures = []
def submit(self, fn, *args, **kwargs):
def wrapped_fn(args, kwargs):
return fn(*args, **kwargs)
future = super(Executor, self).submit(wrapped_fn, args, kwargs)
if self.track_futures:
return future
def results(self):
if not self.track_futures:
raise Exception('Cannot get results from executor without future tracking')
return (future.result() for future in as_completed(self.futures))
def __enter__(self):
self.track_futures = True
return super(Executor, self).__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
for future in as_completed(self.futures):
self.futures = []
return False
super(Executor, self).__exit__(exc_type, exc_val, exc_tb)
def run_with_retry(f, tries, failure_callback=None):
while True:
return f()
except Exception as e:
tries -= 1
if tries <= 0:'[%s] Failed to run %s Encountered %s (0 tries left, giving up)', os.getpid(), f, e.__class__.__name__)
if failure_callback:
'[%s] Raised %s, retrying (%s tries left)',
os.getpid(), e.__class__.__name__, tries)
def send_req(json_events):
data = {'api_key': API_KEY, 'events': json_events}
def do_send():
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())
response = http.request('POST', ENDPOINT, body=json.dumps(data).encode('utf-8'))
if response.status != 200:
raise Exception('Bad response: ' + str(response.status) + ', body: ' + str( )
run_with_retry(do_send, tries=10, failure_callback=lambda: time.sleep(10))
def upload(events):
start = time.time()
with Executor(max_workers=64) as executor:
for i in range(0, len(events), 10):
executor.submit(send_req, events[i:i + 10])
diff = time.time() - start'uploading %s events took %s', len(events), diff)
def main():
import sys
rootLogger = logging.getLogger()
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
if API_KEY == 'YOUR_API_KEY':'Must set API_KEY')
filename = sys.argv[1]
start = int(sys.argv[2])
rownum = 0
cur_events = []
zf = ZipFile(filename, 'r', allowZip64=True)
with[0]) as f:
for line in f:
rownum += 1
if rownum <= start:
event = json.loads(line.strip())
if (
'event_type' not in event or
('user_id' not in event and 'device_id' not in event)
if len(cur_events) >= 1000:'uploading %s events, row %s', len(cur_events), rownum)
cur_events = []
if cur_events:'uploading %s events, row %s', len(cur_events), rownum)
if __name__ == '__main__':

This comment has been minimized.

Copy link

@samjewell samjewell commented Apr 4, 2018

This script is for Python 2, not Python 3, for anyone else who hits this error:

  File "", line 62
    except Exception, e:
SyntaxError: invalid syntax

This comment has been minimized.

Copy link
Owner Author

@djih djih commented Apr 9, 2018

@samjewell thanks! I've updated the script to work for both Python 2 and 3.


This comment has been minimized.

Copy link

@khose khose commented May 29, 2019

Just FYI, I needed to update the script because it wasn't running with Python3. xrange and urllib2...


This comment has been minimized.

Copy link

@dana11235 dana11235 commented Oct 1, 2019

We just updated this to support Python3 and Amplitude's Batch API

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment