Skip to content

Instantly share code, notes, and snippets.

@paruljain
Created February 2, 2021 23:00
Show Gist options
  • Save paruljain/3324e46b6735a0e404fe3cd9e213ea79 to your computer and use it in GitHub Desktop.
Save paruljain/3324e46b6735a0e404fe3cd9e213ea79 to your computer and use it in GitHub Desktop.
from aws_request_signer import AwsRequestSigner, UNSIGNED_PAYLOAD
import http.client
import queue
from os import scandir
import os
from urllib.parse import quote
from os.path import join
import time
from threading import Thread
import threading
MAX_THREADS = 100
AWS_REGION = 'us-east-1'
AWS_ACCESS_KEY_ID = '*****************'
AWS_SECRET_ACCESS_KEY = '*****************************'
AWS_S3_HOST = '1.1.1.1'
AWS_S3_PORT = 20000
BUCKET = 'test'
FOLDER = 'c:\\python'
fileQ = queue.Queue(1000)
fileCount = 0
totalSize = 0
filesScanned = 0
threadLock = threading.Lock()
# Put files to copy to s3 on the queue
# path is the root path from where to recursively list files to copy
def scanDir(path):
global filesScanned
try:
for file in scandir(path):
fullPath = join(path, file.name)
if file.is_file():
fileQ.put(fullPath, True)
filesScanned += 1
elif file.is_dir():
scanDir(fullPath)
except:
pass # Ignore folder access permission errors
def upload():
global fileCount
global totalSize
requestSigner = AwsRequestSigner(AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, 's3')
conn = http.client.HTTPConnection(host=AWS_S3_HOST, port=AWS_S3_PORT, blocksize=1048576)
while True:
try:
f = fileQ.get(True, 5)
except:
conn.close()
break
k = f.replace('c:\\', '/').replace('\\', '/')
URL = 'http://' + AWS_S3_HOST + ':' + str(AWS_S3_PORT) + '/' + BUCKET + quote(k)
# The headers we'll provide and want to sign.
try:
fileSize = os.stat(f).st_size
except:
continue
headers = {"Content-Type": "application/octet-stream", "Content-Length": str(fileSize)}
# Add the authentication headers.
headers.update(requestSigner.sign_with_headers("PUT", URL, headers, content_hash=UNSIGNED_PAYLOAD))
with open(f, 'rb') as fh:
conn.request(method='PUT', url='/' + BUCKET + quote(k), headers=headers, body=fh)
res = conn.getresponse()
data = res.read()
if res.status < 200 or res.status > 299:
print('Error connecting to s3:', res.status, data)
conn.close()
break
fileQ.task_done()
with threadLock:
fileCount += 1
totalSize += fileSize
startTime = time.time()
# Reports status of the copy job
def monitor():
while runMonitor:
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded')
time.sleep(5)
copyOps = []
for i in range(MAX_THREADS):
t = Thread(target=upload)
copyOps.append(t)
t.start()
print('Starting ...')
# Start the monitoring thread
# Because this thread is started as daemon the main thread will not wait for it
# to complete
runMonitor = True
Thread(target=monitor).start()
scanDir(FOLDER)
# Wait for all copy jobs to finish
for copyOp in copyOps:
copyOp.join()
runMonitor = False
timeTakenSeconds = round(time.time() - startTime, 2)
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded;', timeTakenSeconds, 'seconds')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment