Skip to content

Instantly share code, notes, and snippets.

@curtisliu
Last active September 22, 2020 14:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save curtisliu/c1aed2555934203931ec to your computer and use it in GitHub Desktop.
Save curtisliu/c1aed2555934203931ec to your computer and use it in GitHub Desktop.
Sample import

This is an example script that uploads data to Amplitude. The data should be in a zipped file format where each line is a json object in the same format as required by the HTTP API docs: https://amplitude.zendesk.com/hc/en-us/articles/204771828. It is highly recommended to include an insert_id with each event to prevent data duplication on retries.

Usage: python sample_import.py data.zip starting_line_number

import json
import time
import urllib
import urllib2
import logging
from concurrent import futures
from urllib2 import HTTPError
from zipfile import ZipFile
API_KEY = 'YOUR_API_KEY'
ENDPOINT = 'https://api.amplitude.com/httpapi'
# subclass of ThreadPoolExecutor that provides:
# - proper exception logging from futures
# - ability to track futures and get results of all submitted futures
# - failure on exit if a future throws an exception (when futures are tracked)
class Executor(futures.ThreadPoolExecutor):
def __init__(self, max_workers):
super(Executor, self).__init__(max_workers)
self.track_futures = False
self.futures = []
def submit(self, fn, *args, **kwargs):
def wrapped_fn(args, kwargs):
return fn(*args, **kwargs)
future = super(Executor, self).submit(wrapped_fn, args, kwargs)
if self.track_futures:
self.futures.append(future)
return future
def results(self):
if not self.track_futures:
raise Exception('Cannot get results from executor without future tracking')
return (future.result() for future in as_completed(self.futures))
def __enter__(self):
self.track_futures = True
return super(Executor, self).__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
for future in as_completed(self.futures):
future.result()
self.futures = []
self.shutdown(wait=False)
return False
finally:
super(Executor, self).__exit__(exc_type, exc_val, exc_tb)
def run_with_retry(f, tries, failure_callback=None):
while True:
try:
return f()
except Exception, e:
tries -= 1
if tries == 0:
logger.exception('[%s] Failed to run %s Encountered %s', os.getpid(), f, e.__class__.__name__)
raise
else:
if failure_callback:
failure_callback()
logger.info(
'[%s] Raised %s, retrying (%s tries left)',
os.getpid(), e.__class__.__name__, tries)
def send_req(json_events):
data = urllib.urlencode({'api_key': API_KEY, 'event': json_events})
def do_send():
req = urllib2.Request(ENDPOINT, data)
response = urllib2.urlopen(req, timeout=60)
response.read()
if response.code != 200:
raise Exception('Bad response: ' + response.code)
run_with_retry(do_send, tries=10, failure_callback=lambda: time.sleep(10))
def upload(events):
start = time.time()
with Executor(max_workers=64) as executor:
for i in xrange(0, len(events), 10):
executor.submit(send_req, json.dumps(events[i:i + 10]))
diff = time.time() - start
logging.info('uploading %s events took %s', len(events), diff)
def main():
import sys
filename = sys.argv[1]
start = int(sys.argv[2])
rownum = 0
cur_events = []
zf = ZipFile(filename, 'r', allowZip64=True)
with zf.open(zf.infolist()[0]) as f:
for line in f:
rownum += 1
if rownum <= start:
continue
event = json.loads(line.strip())
if (
'event_type' not in event or
('user_id' not in event and 'device_id' not in event)
):
continue
cur_events.append(event)
if len(cur_events) >= 1000:
logging.info('uploading %s events, row %s', len(cur_events), rownum)
upload(cur_events)
cur_events = []
if cur_events:
logging.info('uploading %s events, row %s', len(cur_events), rownum)
upload(cur_events)
if __name__ == '__main__':
main()
@arjun-hashlearn
Copy link

When I try to use this it throws up(upload_to_amplitude.py contains the code here)
Traceback (most recent call last):
File "upload_to_amplitude.py", line 120, in
main()
File "upload_to_amplitude.py", line 116, in main
upload(cur_events)
File "upload_to_amplitude.py", line 86, in upload
executor.submit(send_req, json.dumps(events[i:i + 10]))
File "upload_to_amplitude.py", line 45, in exit
for future in as_completed(self.futures):
NameError: global name 'as_completed' is not defined

You'll need to add this on top "from concurrent.futures import as_completed" to fix it

Copy link

ghost commented May 5, 2016

If you are re-importing Amplitude data, this program doesn't set the event time. To do so:

  • after line 5, add
    import calendar
  • add these lines after line 108:
    x=event['client_event_time']
    if (len(x) < 20) : x = x + ".000000"
    event['time']=calendar.timegm(time.strptime(x,'%Y-%m-%d %H:%M:%S.%f'))*1000+int(x[-6:-3])

@jonyo
Copy link

jonyo commented May 16, 2016

@curtisliu I was also getting the global name 'as_completed' is not defined, I added from concurrent.futures import as_completed as @arjun-hashlearn suggested, now it gives:

$ python amplitude-import.py /tmp/import.zip 0
...
NameError: global name 'logger' is not defined

I think I fixed that by changing all logger to logging, now it gives yet another error:

python amplitude-import.py /tmp/import.zip 0
...
NameError: global name 'os' is not defined

So, I added import os, that fixed that one. Now since I already tried importing the same data (which was importing "chunks" of just fine, just erroring before the retry stuff kicked in), it gave some 400 errors and that threw exceptions that it didn't try to recover from.

So I hacked the code some more, and came up with this fork that fixes all of the above, plus a few minor improvements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment