Skip to content

Instantly share code, notes, and snippets.

@djih

djih/README.md Secret

Forked from jonyo/README.md
Last active July 13, 2023 17:26
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save djih/2a7e7fb2c1d45c8277f7aef64b682ed6 to your computer and use it in GitHub Desktop.
Save djih/2a7e7fb2c1d45c8277f7aef64b682ed6 to your computer and use it in GitHub Desktop.
Sample import

This is an example script that uploads data to Amplitude. The data should be in a zipped file format where each line is a json object in the same format as required by the HTTP API docs: https://amplitude.zendesk.com/hc/en-us/articles/204771828. It is highly recommended to include an insert_id with each event to prevent data duplication on retries.

Usage: python3 sample_import.py data.zip starting_line_number

import json
import time
import certifi
import urllib
import urllib3
import logging
import os
from concurrent import futures
from urllib3.exceptions import HTTPError
from urllib.parse import urlencode
from zipfile import ZipFile
from concurrent.futures import as_completed
API_KEY = 'YOUR_API_KEY'
ENDPOINT = 'https://api.amplitude.com/batch'
# subclass of ThreadPoolExecutor that provides:
# - proper exception logging from futures
# - ability to track futures and get results of all submitted futures
# - failure on exit if a future throws an exception (when futures are tracked)
class Executor(futures.ThreadPoolExecutor):
def __init__(self, max_workers):
super(Executor, self).__init__(max_workers)
self.track_futures = False
self.futures = []
def submit(self, fn, *args, **kwargs):
def wrapped_fn(args, kwargs):
return fn(*args, **kwargs)
future = super(Executor, self).submit(wrapped_fn, args, kwargs)
if self.track_futures:
self.futures.append(future)
return future
def results(self):
if not self.track_futures:
raise Exception('Cannot get results from executor without future tracking')
return (future.result() for future in as_completed(self.futures))
def __enter__(self):
self.track_futures = True
return super(Executor, self).__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
try:
for future in as_completed(self.futures):
future.result()
self.futures = []
self.shutdown(wait=False)
return False
finally:
super(Executor, self).__exit__(exc_type, exc_val, exc_tb)
def run_with_retry(f, tries, failure_callback=None):
while True:
try:
return f()
except Exception as e:
print(e)
tries -= 1
if tries <= 0:
logging.info('[%s] Failed to run %s Encountered %s (0 tries left, giving up)', os.getpid(), f, e.__class__.__name__)
break
else:
if failure_callback:
failure_callback()
logging.info(
'[%s] Raised %s, retrying (%s tries left)',
os.getpid(), e.__class__.__name__, tries)
def send_req(json_events):
data = {'api_key': API_KEY, 'events': json_events}
def do_send():
http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED', ca_certs=certifi.where())
response = http.request('POST', ENDPOINT, body=json.dumps(data).encode('utf-8'))
response.read()
response.close()
if response.status != 200:
raise Exception('Bad response: ' + str(response.status) + ', body: ' + str(response.data) )
run_with_retry(do_send, tries=10, failure_callback=lambda: time.sleep(10))
def upload(events):
start = time.time()
with Executor(max_workers=64) as executor:
for i in range(0, len(events), 10):
executor.submit(send_req, events[i:i + 10])
diff = time.time() - start
logging.info('uploading %s events took %s', len(events), diff)
def main():
import sys
rootLogger = logging.getLogger()
rootLogger.setLevel(logging.DEBUG)
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
rootLogger.addHandler(ch)
if API_KEY == 'YOUR_API_KEY':
logging.info('Must set API_KEY')
return
filename = sys.argv[1]
start = int(sys.argv[2])
rownum = 0
cur_events = []
zf = ZipFile(filename, 'r', allowZip64=True)
with zf.open(zf.infolist()[0]) as f:
for line in f:
rownum += 1
if rownum <= start:
continue
event = json.loads(line.strip())
if (
'event_type' not in event or
('user_id' not in event and 'device_id' not in event)
):
continue
cur_events.append(event)
if len(cur_events) >= 1000:
logging.info('uploading %s events, row %s', len(cur_events), rownum)
upload(cur_events)
cur_events = []
if cur_events:
logging.info('uploading %s events, row %s', len(cur_events), rownum)
upload(cur_events)
if __name__ == '__main__':
main()
@samjewell
Copy link

This script is for Python 2, not Python 3, for anyone else who hits this error:

  File "sample_import.py", line 62
    except Exception, e:
                    ^
SyntaxError: invalid syntax

@djih
Copy link
Author

djih commented Apr 9, 2018

@samjewell thanks! I've updated the script to work for both Python 2 and 3.

@khose
Copy link

khose commented May 29, 2019

Just FYI, I needed to update the script because it wasn't running with Python3. xrange and urllib2...

@dana11235
Copy link

We just updated this to support Python3 and Amplitude's Batch API

@441742696
Copy link

Hi, I have a problem here, Do you guys know why?

2022-08-04 15:56:14,490 - urllib3.connectionpool - WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x00000254755642B0>: Failed to establish a new connection: [WinError 10060] The connection attempt failed because the connected party did not reply properly after a period of time or the connected host did not respond.')': /batch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment