Skip to content

Instantly share code, notes, and snippets.

@paruljain
Created February 3, 2021 02:45
Show Gist options
  • Save paruljain/af74109886f35be0c67ebb0edf893b4d to your computer and use it in GitHub Desktop.
Save paruljain/af74109886f35be0c67ebb0edf893b4d to your computer and use it in GitHub Desktop.
from aws_request_signer import AwsRequestSigner, UNSIGNED_PAYLOAD
import http.client
import queue
from os import scandir
import os
from urllib.parse import quote
from os.path import join
import time
from threading import Thread
MAX_THREADS = 100
BLOCKSIZE = 1048576
AWS_REGION = 'us-east-1'
AWS_ACCESS_KEY_ID = '**************'
AWS_SECRET_ACCESS_KEY = '*****************************'
AWS_S3_HOST = '1.1.1.1'
AWS_S3_PORT = 20000
BUCKET = 'test'
FOLDER = 'c:\\python'
fileQ = queue.Queue(10000)
fileCount = 0
totalSize = 0
filesScanned = 0
shutdown = False
class FileReader:
def __init__(self, filePath, blocksize=1048576):
self.fd = open(filePath, 'rb')
self.blocksize = blocksize
self.chunkQ = queue.Queue(2)
def run(self):
self.runWorker = True
Thread(target=self.getFileChunk).start()
def getFileChunk(self):
while self.runWorker:
chunk = self.fd.read(self.blocksize)
if not chunk:
self.fd.close()
self.chunkQ.put('')
break
self.chunkQ.put(chunk)
def read(self, size):
return self.chunkQ.get()
def close(self):
self.runWorker = False
self.fd.close()
# Put files to copy to s3 on the queue
# path is the root path from where to recursively list files to copy
def scanDir(path):
global filesScanned
if shutdown:
return
try:
for file in scandir(path):
if shutdown:
return
fullPath = join(path, file.name)
if file.is_file():
fileQ.put(fullPath, True)
filesScanned += 1
elif file.is_dir():
scanDir(fullPath)
except:
pass # Ignore folder access permission errors
def upload():
global fileCount
global totalSize
requestSigner = AwsRequestSigner(AWS_REGION, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, 's3')
conn = http.client.HTTPConnection(host=AWS_S3_HOST, port=AWS_S3_PORT, blocksize=BLOCKSIZE)
while True:
if shutdown:
break
try:
f = fileQ.get(True, 5)
except Exception:
conn.close()
break
k = f.replace('c:\\', '/').replace('\\', '/')
URL = 'http://' + AWS_S3_HOST + ':' + str(AWS_S3_PORT) + '/' + BUCKET + quote(k)
# The headers we'll provide and want to sign.
try:
fileSize = os.stat(f).st_size
except Exception:
continue
headers = {"Content-Type": "application/octet-stream", "Content-Length": str(fileSize)}
# Add the authentication headers.
headers.update(requestSigner.sign_with_headers("PUT", URL, headers, content_hash=UNSIGNED_PAYLOAD))
try:
fh = FileReader(f, BLOCKSIZE)
except Exception:
continue
fh.run()
try:
conn.request(method='PUT', url='/' + BUCKET + quote(k), headers=headers, body=fh)
res = conn.getresponse()
data = res.read()
except Exception as err:
print("s3 connection error:", err)
conn.close()
fh.close()
break
if res.status < 200 or res.status > 299:
print('Error connecting to s3:', res.status, data)
conn.close()
fh.close()
break
fileQ.task_done()
fileCount += 1
totalSize += fileSize
startTime = time.time()
# Reports status of the copy job
def monitor():
while runMonitor:
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded')
time.sleep(5)
copyOps = []
for _ in range(MAX_THREADS):
t = Thread(target=upload)
copyOps.append(t)
t.start()
print('Starting ...')
# Start the monitoring thread
runMonitor = True
Thread(target=monitor).start()
scanDir(FOLDER)
# Wait for all copy jobs to finish
for copyOp in copyOps:
copyOp.join()
runMonitor = False
timeTakenSeconds = round(time.time() - startTime, 2)
print(filesScanned, 'files scanned;', fileCount, 'files uploaded;', round(totalSize/1024/1024, 2), 'MB uploaded;', timeTakenSeconds, 'seconds')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment