Created
November 15, 2020 13:57
-
-
Save amacal/ffb73d61ba93505b171be0ad9b080c7f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from boto3 import client | |
from ftplib import FTP | |
from gzip import GzipFile | |
from os import path | |
from time import time as now | |
from hashlib import md5, sha1 | |
from multiprocessing import Pool, Queue, Manager | |
class Parameters: | |
def __init__(self): | |
self.ssmClient = client('ssm') | |
def value(self, name): | |
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value'] | |
class Metrics: | |
def __init__(self, name): | |
self.name = name | |
def log(self, data): | |
print(f'{self.name}: {data}') | |
class Metadata: | |
def __init__(self): | |
self.data = dict() | |
def keys(self): | |
return self.data.keys() | |
def set(self, name, value): | |
self.data[name] = value | |
def get(self, name): | |
return self.data[name] | |
class Pipeline: | |
def __init__(self, name, steps): | |
self.steps = steps | |
self.metadata = Metadata() | |
self.metrics = Metrics(name) | |
def init(self): | |
prev = None | |
for step in self.steps: | |
next = BinaryPipe() | |
step.bind(prev, next, self.metrics, self.metadata) | |
prev = next | |
def flush(self): | |
for step in self.steps: | |
step.flush() | |
def run(self, input): | |
self.steps[0].start(input) | |
def complete(self): | |
for key in self.metadata.keys(): | |
self.metrics.log(f'{key} -> {self.metadata.get(key)}') | |
def start(self, input=None): | |
self.init() | |
self.run(input) | |
self.flush() | |
self.complete() | |
class BinaryPipe: | |
def __init__(self): | |
self.offset = 0 | |
self.data = bytearray() | |
def append(self, chunk): | |
self.data = self.data + chunk | |
self.callback() if self.callback is not None else None | |
def subscribe(self, callback): | |
self.callback = callback | |
def length(self): | |
return len(self.data) - self.offset | |
def read(self, size): | |
size = size if size >= 0 else self.length() | |
value = self.data[self.offset:self.offset+size] | |
self.offset = self.offset + len(value) | |
if self.offset >= 1024 * 1024: | |
self.data = self.data[self.offset:] | |
self.offset = 0 | |
return value | |
class FtpDownload: | |
def __init__(self, pool): | |
self.item = None | |
self.client = None | |
self.pool = pool | |
self.tick = int(now()) | |
def bind(self, prev, next, metrics, metadata): | |
self.next = next | |
self.metrics = metrics | |
def start(self, input): | |
self.request_item() | |
self.start_client() | |
self.start_download(input) | |
def request_item(self): | |
self.item = self.pool.get_nowait() | |
self.metrics.log(f'using {self.host()}') | |
def host(self): | |
return self.item['Host'] | |
def directory(self): | |
return self.item['Directory'] | |
def start_client(self): | |
self.client = FTP(self.host()) | |
self.client.login() | |
self.client.cwd(self.directory()) | |
def start_download(self, input): | |
self.metrics.log(f'download started {self.directory()}/{input}') | |
self.client.retrbinary(f'RETR {input}', self.append, blocksize=128*1024) | |
self.metrics.log(f'download completed {self.directory()}/{input}') | |
def append(self, chunk): | |
self.next.append(chunk) | |
self.touch(tick=int(now())) | |
def touch(self, tick): | |
if tick - self.tick > 60: | |
self.client.putcmd('NOOP') | |
self.tick = tick | |
def flush(self): | |
self.release_item() | |
self.release_client() | |
def release_item(self): | |
if self.item: | |
self.pool.put(self.item) | |
def release_client(self): | |
if self.client: | |
self.client.quit() | |
class Ungzip: | |
def bind(self, prev, next, metrics, metadata): | |
self.next = next | |
self.prev = prev | |
self.prev.subscribe(self.changed) | |
self.file = GzipFile(mode='r', fileobj=prev) | |
def changed(self): | |
while self.prev.length() > 1024 * 1024: | |
self.next.append(self.file.read(128 * 1024)) | |
def flush(self): | |
while True: | |
chunk = self.file.read(128 * 1024) | |
if len(chunk) == 0: | |
break | |
self.next.append(chunk) | |
class S3Upload: | |
def __init__(self, bucket, key, chunksize): | |
self.chunksize = chunksize | |
self.client = client('s3') | |
self.bucket = bucket | |
self.key = key | |
self.part = 1 | |
self.parts = list() | |
self.upload_id = None | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.metrics = metrics | |
self.prev.subscribe(self.changed) | |
def start_upload(self): | |
if not self.upload_id: | |
self.upload_id = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId'] | |
self.metrics.log(f'upload started {self.upload_id}') | |
def changed(self): | |
self.start_upload() | |
self.upload(self.chunksize) | |
def flush(self): | |
self.start_upload() | |
self.upload() | |
self.complete() | |
def upload(self, size = 0): | |
while self.prev.length() > size: | |
chunk = self.prev.read(self.chunksize) | |
response = self.client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, PartNumber=self.part, Body=chunk) | |
self.metrics.log(f'part {self.part} completed; {len(chunk)} bytes') | |
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part}) | |
self.part = self.part + 1 | |
def complete(self): | |
self.client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, MultipartUpload={'Parts': self.parts}) | |
self.metrics.log(f'upload completed {self.upload_id}') | |
class MD5Hash: | |
def __init__(self, name): | |
self.name = name | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.metadata = metadata | |
self.prev.subscribe(self.changed) | |
self.instance = md5() | |
def changed(self): | |
chunk = self.prev.read(-1) | |
self.instance.update(chunk) | |
self.next.append(chunk) | |
def flush(self): | |
self.metadata.set(self.name, self.instance.digest().hex()) | |
class SHA1Hash: | |
def __init__(self, name): | |
self.name = name | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.metadata = metadata | |
self.prev.subscribe(self.changed) | |
self.instance = sha1() | |
def changed(self): | |
chunk = self.prev.read(-1) | |
self.instance.update(chunk) | |
self.next.append(chunk) | |
def flush(self): | |
self.metadata.set(self.name, self.instance.digest().hex()) | |
def handle(filename, bucket, queue): | |
pipeline = Pipeline(name=filename, steps=[ | |
FtpDownload(pool=queue), | |
Ungzip(), | |
S3Upload(bucket=bucket, key=f'data/{path.splitext(filename)[0]}', chunksize=16*1024*1024) | |
]) | |
pipeline.start(input=filename) | |
if __name__ == '__main__': | |
parameters = Parameters() | |
manager = Manager() | |
queue = manager.Queue() | |
bucket = parameters.value('/wikipedia/bucket_name') | |
items = [(f'enwiki-20201020-pages-logging{i}.xml.gz', bucket, queue) for i in range(1, 28)] | |
for i in range(3): | |
queue.put({ | |
'Host': 'ftpmirror.your.org', | |
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/' | |
}) | |
for i in range(3): | |
queue.put({ | |
'Host': 'ftp.acc.umu.se', | |
'Directory': 'mirror/wikimedia.org/dumps/enwiki/20201020/' | |
}) | |
for i in range(3): | |
queue.put({ | |
'Host': 'dumps.wikimedia.your.org', | |
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/' | |
}) | |
with Pool(9) as pool: | |
pool.starmap(handle, items) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment