Skip to content

Instantly share code, notes, and snippets.

@leoluyi
Last active November 12, 2018 06:56
Show Gist options
  • Save leoluyi/2f6223452890fb8eef4dedc55c21e6cf to your computer and use it in GitHub Desktop.
Save leoluyi/2f6223452890fb8eef4dedc55c21e6cf to your computer and use it in GitHub Desktop.
Upload eland data for large files
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import requests
import json
import time
import csv
__version__ = "1.0"
Licensing_string = """
/*******************************************************
* Copyright (C) 2018 Appier Inc.
*
* This program is part of AIXON @ Appier Inc.
*
* AIXON can not be copied and/or distributed without the express
* permission of Appier Inc.
*******************************************************/
"""
print ("Version: %s\n%s"%(__version__, Licensing_string))
# Default Args
MAX_SIZE = 100000
API_PUT_URL = "https://aixon.appier.com/api_upload/v1/csv-upload"
# Config from env
if os.getenv('MAX_SIZE'):
MAX_SIZE = int(os.getenv('MAX_SIZE'))
print("Set MAX SIZE = %d"%(MAX_SIZE))
if os.getenv('API_KEY'):
API_KEY = os.getenv('API_KEY')
print("Set API_KEY = ************")
# Argparse
if len(sys.argv) < 3:
print ("Usage: %s <API_KEY> <Csv_file/folder> <Notes_for_Source> [SourceID for Append] [OverWrite:0/1, default:1]"%(sys.argv[0]))
sys.exit(0)
FirstPut = True
API_KEY = sys.argv[1]
SOURCE_PATH = sys.argv[2]
NOTES_NAME = sys.argv[3]
RESOURCE_ID = ""
if len(sys.argv) >= 5:
RESOURCE_ID = sys.argv[4]
IS_OVERWRITE = True
if len(sys.argv) >= 6:
try:
if int(sys.argv[5]) == 0:
IS_OVERWRITE = False
except:
IS_OVERWRITE = True
##### Production #####
SourceFileList = []
if os.path.isdir(SOURCE_PATH):
for strFile in os.listdir(SOURCE_PATH):
SourceFileList.append( os.path.join(SOURCE_PATH, strFile) )
elif os.path.isfile(SOURCE_PATH):
SourceFileList.append(SOURCE_PATH)
else:
print("No Files Found: %s"%(SOURCE_PATH))
sys.exit(1)
total_count = 0
total_error_count = 0
data = []
payload = {}
headers = {
'Content-Type': 'application/json',
'charset': 'utf-8',
'x-api-key' : API_KEY,
'x-node-description': NOTES_NAME
}
start_time = time.time()
current_time = start_time
record_count = 1
is_add_CUID = False
print("MAX_SIZE per upload: %d"%(MAX_SIZE))
for SourceFile in SourceFileList:
print("Start Processing File: %s"%(SourceFile))
with open(SourceFile, encoding = 'utf8') as fp:
f_csv = csv.reader(fp, delimiter=',')
payload['header'] = next(f_csv)
if 'customuid' not in payload['header']:
is_add_CUID = True
payload['header'].append('customuid')
header_size = len(payload['header'])
for data_list in f_csv:
if is_add_CUID:
data_list.append(str(record_count))
record_count += 1
if len(data_list) != header_size:
total_error_count += 1
#print("The length of data fields != lenght of header fields")
continue
total_count += 1
data.append(data_list)
if (total_count % MAX_SIZE) == 0:
payload['data'] = data
API_URL = API_PUT_URL
if len(RESOURCE_ID) > 0 :
API_URL = API_PUT_URL + "/" + RESOURCE_ID
print("Total Count: %d"%(total_count))
if FirstPut:
FirstPut = False
if IS_OVERWRITE:
IS_OVERWRITE = False
req = requests.put(API_URL, headers=headers, data = json.dumps(payload))
else:
req = requests.patch(API_URL, headers=headers, data = json.dumps(payload))
else:
req = requests.patch(API_URL, headers=headers, data = json.dumps(payload))
print (req.url)
print (req)
print (req.content)
print (req.text)
ret_data = {}
if req.status_code == 200:
ret_data = req.json()
if len(RESOURCE_ID) == 0:
if 'status' in ret_data:
print ("Create Status: %s"%ret_data['status'])
if 'message' in ret_data:
if 'RESOURCE_ID' in ret_data['message']:
RESOURCE_ID = ret_data['message']['RESOURCE_ID']
if 'status' in ret_data:
print ("Uploading Status: %s"%ret_data['status'])
data = []
print("Total %d records uploaded!"%(total_count))
end_time = time.time()
elapsed_time = end_time - current_time
current_time = end_time
print("Used: %d seconds"%(int(elapsed_time)))
if (total_count % MAX_SIZE) > 0:
if len(data) > 0 :
API_URL = API_PUT_URL
if len(RESOURCE_ID) > 0 :
API_URL = API_PUT_URL + "/" + RESOURCE_ID
# import pdb; pdb.set_trace()
payload['data'] = data
print("headers: %s\n"%(headers))
# print(json.dumps(payload))
if FirstPut:
FirstPut = False
if IS_OVERWRITE:
IS_OVERWRITE = False
req = requests.put(API_URL, headers=headers, data = json.dumps(payload))
else:
req = requests.patch(API_URL, headers=headers, data = json.dumps(payload))
else:
req = requests.patch(API_URL, headers=headers, data = json.dumps(payload))
print (req.url)
print (req)
print (req.content)
print (req.text)
ret_data = {}
if req.status_code == 200:
ret_data = req.json()
if 'status' in ret_data:
print (f"Uploading Status: {ret_data['status']}")
end_time = time.time()
elapsed_time = end_time - current_time
current_time = end_time
print(f"Used: {int(elapsed_time):d} seconds")
print(f"Uploading Total : {total_count} records")
print(f"Total Error Records: {total_error_count}")
print(f"Total Used: {int(end_time - start_time)} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment