Skip to content

Instantly share code, notes, and snippets.

@amacal
Created November 15, 2020 13:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amacal/ffb73d61ba93505b171be0ad9b080c7f to your computer and use it in GitHub Desktop.
Save amacal/ffb73d61ba93505b171be0ad9b080c7f to your computer and use it in GitHub Desktop.
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os import path
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
class Parameters:
def __init__(self):
self.ssmClient = client('ssm')
def value(self, name):
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value']
class Metrics:
def __init__(self, name):
self.name = name
def log(self, data):
print(f'{self.name}: {data}')
class Metadata:
def __init__(self):
self.data = dict()
def keys(self):
return self.data.keys()
def set(self, name, value):
self.data[name] = value
def get(self, name):
return self.data[name]
class Pipeline:
def __init__(self, name, steps):
self.steps = steps
self.metadata = Metadata()
self.metrics = Metrics(name)
def init(self):
prev = None
for step in self.steps:
next = BinaryPipe()
step.bind(prev, next, self.metrics, self.metadata)
prev = next
def flush(self):
for step in self.steps:
step.flush()
def run(self, input):
self.steps[0].start(input)
def complete(self):
for key in self.metadata.keys():
self.metrics.log(f'{key} -> {self.metadata.get(key)}')
def start(self, input=None):
self.init()
self.run(input)
self.flush()
self.complete()
class BinaryPipe:
def __init__(self):
self.offset = 0
self.data = bytearray()
def append(self, chunk):
self.data = self.data + chunk
self.callback() if self.callback is not None else None
def subscribe(self, callback):
self.callback = callback
def length(self):
return len(self.data) - self.offset
def read(self, size):
size = size if size >= 0 else self.length()
value = self.data[self.offset:self.offset+size]
self.offset = self.offset + len(value)
if self.offset >= 1024 * 1024:
self.data = self.data[self.offset:]
self.offset = 0
return value
class FtpDownload:
def __init__(self, pool):
self.item = None
self.client = None
self.pool = pool
self.tick = int(now())
def bind(self, prev, next, metrics, metadata):
self.next = next
self.metrics = metrics
def start(self, input):
self.request_item()
self.start_client()
self.start_download(input)
def request_item(self):
self.item = self.pool.get_nowait()
self.metrics.log(f'using {self.host()}')
def host(self):
return self.item['Host']
def directory(self):
return self.item['Directory']
def start_client(self):
self.client = FTP(self.host())
self.client.login()
self.client.cwd(self.directory())
def start_download(self, input):
self.metrics.log(f'download started {self.directory()}/{input}')
self.client.retrbinary(f'RETR {input}', self.append, blocksize=128*1024)
self.metrics.log(f'download completed {self.directory()}/{input}')
def append(self, chunk):
self.next.append(chunk)
self.touch(tick=int(now()))
def touch(self, tick):
if tick - self.tick > 60:
self.client.putcmd('NOOP')
self.tick = tick
def flush(self):
self.release_item()
self.release_client()
def release_item(self):
if self.item:
self.pool.put(self.item)
def release_client(self):
if self.client:
self.client.quit()
class Ungzip:
def bind(self, prev, next, metrics, metadata):
self.next = next
self.prev = prev
self.prev.subscribe(self.changed)
self.file = GzipFile(mode='r', fileobj=prev)
def changed(self):
while self.prev.length() > 1024 * 1024:
self.next.append(self.file.read(128 * 1024))
def flush(self):
while True:
chunk = self.file.read(128 * 1024)
if len(chunk) == 0:
break
self.next.append(chunk)
class S3Upload:
def __init__(self, bucket, key, chunksize):
self.chunksize = chunksize
self.client = client('s3')
self.bucket = bucket
self.key = key
self.part = 1
self.parts = list()
self.upload_id = None
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.metrics = metrics
self.prev.subscribe(self.changed)
def start_upload(self):
if not self.upload_id:
self.upload_id = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId']
self.metrics.log(f'upload started {self.upload_id}')
def changed(self):
self.start_upload()
self.upload(self.chunksize)
def flush(self):
self.start_upload()
self.upload()
self.complete()
def upload(self, size = 0):
while self.prev.length() > size:
chunk = self.prev.read(self.chunksize)
response = self.client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, PartNumber=self.part, Body=chunk)
self.metrics.log(f'part {self.part} completed; {len(chunk)} bytes')
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part})
self.part = self.part + 1
def complete(self):
self.client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, MultipartUpload={'Parts': self.parts})
self.metrics.log(f'upload completed {self.upload_id}')
class MD5Hash:
def __init__(self, name):
self.name = name
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.metadata = metadata
self.prev.subscribe(self.changed)
self.instance = md5()
def changed(self):
chunk = self.prev.read(-1)
self.instance.update(chunk)
self.next.append(chunk)
def flush(self):
self.metadata.set(self.name, self.instance.digest().hex())
class SHA1Hash:
def __init__(self, name):
self.name = name
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.metadata = metadata
self.prev.subscribe(self.changed)
self.instance = sha1()
def changed(self):
chunk = self.prev.read(-1)
self.instance.update(chunk)
self.next.append(chunk)
def flush(self):
self.metadata.set(self.name, self.instance.digest().hex())
def handle(filename, bucket, queue):
pipeline = Pipeline(name=filename, steps=[
FtpDownload(pool=queue),
Ungzip(),
S3Upload(bucket=bucket, key=f'data/{path.splitext(filename)[0]}', chunksize=16*1024*1024)
])
pipeline.start(input=filename)
if __name__ == '__main__':
parameters = Parameters()
manager = Manager()
queue = manager.Queue()
bucket = parameters.value('/wikipedia/bucket_name')
items = [(f'enwiki-20201020-pages-logging{i}.xml.gz', bucket, queue) for i in range(1, 28)]
for i in range(3):
queue.put({
'Host': 'ftpmirror.your.org',
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/'
})
for i in range(3):
queue.put({
'Host': 'ftp.acc.umu.se',
'Directory': 'mirror/wikimedia.org/dumps/enwiki/20201020/'
})
for i in range(3):
queue.put({
'Host': 'dumps.wikimedia.your.org',
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/'
})
with Pool(9) as pool:
pool.starmap(handle, items)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment