Created
November 21, 2020 09:08
-
-
Save amacal/54258bd0c8adf882ba2ad2367785e0eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from boto3 import client | |
from ftplib import FTP | |
from gzip import GzipFile | |
from os.path import splitext | |
from os import getenv | |
from time import time as now | |
from hashlib import md5, sha1 | |
from multiprocessing import Pool, Queue, Manager | |
from lxml.etree import iterparse | |
from json import dumps | |
class Parameters: | |
def __init__(self): | |
self.ssmClient = client('ssm') | |
def value(self, name): | |
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value'] | |
class Metrics: | |
def __init__(self, name): | |
self.name = name | |
def log(self, data): | |
print(f'{self.name}: {data}\n', end='') | |
class Metadata: | |
def __init__(self): | |
self.data = dict() | |
def keys(self): | |
return self.data.keys() | |
def set(self, name, value): | |
self.data[name] = value | |
def get(self, name): | |
return self.data[name] | |
class Pipeline: | |
def __init__(self, name, steps): | |
self.steps = steps | |
self.metadata = Metadata() | |
self.metrics = Metrics(name) | |
def init(self): | |
prev = DictPipe() | |
self.pipe = prev | |
for step in self.steps: | |
next = BinaryPipe() if step.output() == 'binary' else DictPipe() | |
step.bind(prev, next, self.metrics, self.metadata) | |
prev = next | |
def flush(self): | |
for step in self.steps: | |
step.flush() | |
def run(self, input): | |
self.pipe.append([input]) | |
def complete(self): | |
for key in self.metadata.keys(): | |
self.metrics.log(f'{key} -> {self.metadata.get(key)}') | |
def start(self, input=None): | |
self.init() | |
self.run(input) | |
self.flush() | |
self.complete() | |
class BinaryPipe: | |
def __init__(self): | |
self.offset = 0 | |
self.total = 0 | |
self.threshold = 1024 * 1024 | |
self.data = bytearray() | |
def append(self, chunk): | |
self.data += chunk | |
self.callback() if self.callback is not None else None | |
def subscribe(self, callback): | |
self.callback = callback | |
def length(self): | |
return len(self.data) - self.offset | |
def read(self, size): | |
size = size if size >= 0 else self.length() | |
value = self.data[self.offset:self.offset+size] | |
self.offset = self.offset + len(value) | |
if self.offset >= self.threshold: | |
self.data = self.data[self.offset:] | |
self.offset = 0 | |
return bytes(value) | |
class DictPipe: | |
def __init__(self): | |
self.offset = 0 | |
self.total = 0 | |
self.threshold = 1024 * 1024 | |
self.data = list() | |
def append(self, chunk): | |
self.data += chunk | |
self.callback() if self.callback is not None else None | |
def subscribe(self, callback): | |
self.callback = callback | |
def length(self): | |
return len(self.data) - self.offset | |
def read(self, size): | |
size = size if size >= 0 else self.length() | |
value = self.data[self.offset:self.offset+size] | |
self.offset = self.offset + len(value) | |
if self.offset >= self.threshold: | |
self.data = self.data[self.offset:] | |
self.offset = 0 | |
return value | |
class FtpDownload: | |
def __init__(self, host, directory): | |
self.client = None | |
self.host = host | |
self.directory = directory | |
self.tick = int(now()) | |
def input(self): | |
return 'dict' | |
def output(self): | |
return 'binary' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.prev.subscribe(self.changed) | |
self.next = next | |
self.metrics = metrics | |
def changed(self): | |
if item := self.prev.read(1): | |
self.start_client() | |
self.start_download(item[0]) | |
def request_item(self): | |
self.metrics.log(f'using {self.host}') | |
def start_client(self): | |
self.client = FTP(self.host) | |
self.client.login() | |
self.client.cwd(self.directory) | |
def start_download(self, input): | |
self.metrics.log(f'download started {self.directory} {input}') | |
self.client.retrbinary(f'RETR {input}', self.append, blocksize=128*1024) | |
self.metrics.log(f'download completed {self.directory} {input}') | |
def append(self, chunk): | |
self.next.append(chunk) | |
self.touch(tick=int(now())) | |
def touch(self, tick): | |
if tick - self.tick > 60: | |
self.client.putcmd('NOOP') | |
self.tick = tick | |
def flush(self): | |
self.release_client() | |
def release_client(self): | |
if self.client: | |
self.client.quit() | |
class Ungzip: | |
def input(self): | |
return 'binary' | |
def output(self): | |
return 'binary' | |
def bind(self, prev, next, metrics, metadata): | |
self.next = next | |
self.prev = prev | |
self.prev.subscribe(self.changed) | |
self.file = GzipFile(mode='r', fileobj=prev) | |
def changed(self): | |
while self.prev.length() > 1024 * 1024: | |
self.next.append(self.file.read(128 * 1024)) | |
def flush(self): | |
while True: | |
chunk = self.file.read(128 * 1024) | |
if len(chunk) == 0: | |
break | |
self.next.append(chunk) | |
class XmlToJson: | |
def __init__(self, rowtag): | |
self.rowtag = rowtag | |
self.iterator = None | |
def input(self): | |
return 'binary' | |
def output(self): | |
return 'binary' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.prev.subscribe(self.changed) | |
def changed(self): | |
if self.iterator is None and self.prev.length() > 1024 * 1024: | |
self.iterator = iterparse(source=self.prev, events=['start', 'end']) | |
while self.iterator is not None and self.prev.length() > 1024 * 1024: | |
data = self.tick() | |
if data is not None: | |
self.next.append(data) | |
def flush(self): | |
while True: | |
data = self.tick() | |
if data is not None: | |
self.next.append(data) | |
else: | |
break | |
def tick(self): | |
container = None | |
previous = None | |
path = [] | |
item = None | |
while True: | |
try: | |
event, node = next(self.iterator) | |
_, _, node.tag = node.tag.rpartition('}') | |
if event == 'start' and node.tag == self.rowtag: | |
container = dict() | |
path.append(container) | |
while node.getprevious() is not None: | |
del node.getparent()[0] | |
elif event == 'end' and node.tag == self.rowtag: | |
data = bytearray(dumps(container), 'utf8') | |
container = None | |
path = [] | |
return data + b'\n' | |
elif container is None: | |
pass | |
elif event == 'start': | |
if path[-1] is None: | |
path[-1] = dict() | |
previous = node.tag | |
path.append(None) | |
elif event == 'end' and path[-1] is None: | |
path[-2][previous] = node.text | |
path.pop() | |
else: | |
path.pop() | |
except StopIteration: | |
return None | |
class S3Upload: | |
def __init__(self, bucket, key, chunksize): | |
self.chunksize = chunksize | |
self.client = client('s3') | |
self.bucket = bucket | |
self.key = key | |
self.part = 1 | |
self.parts = list() | |
self.upload_id = None | |
def input(self): | |
return 'binary' | |
def output(self): | |
return 'dict' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.metrics = metrics | |
self.prev.subscribe(self.changed) | |
def start_upload(self): | |
if not self.upload_id: | |
self.upload_id = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId'] | |
self.metrics.log(f'upload started {self.upload_id}') | |
def changed(self): | |
self.start_upload() | |
self.upload(self.chunksize) | |
def flush(self): | |
self.start_upload() | |
self.upload() | |
self.complete() | |
def upload(self, size = 0): | |
while self.prev.length() > size: | |
chunk = self.prev.read(self.chunksize) | |
response = self.client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, PartNumber=self.part, Body=chunk) | |
self.metrics.log(f'part {self.part} completed; {len(chunk)} bytes') | |
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part}) | |
self.part = self.part + 1 | |
def complete(self): | |
self.client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, MultipartUpload={'Parts': self.parts}) | |
self.metrics.log(f'upload completed {self.upload_id}') | |
class MD5Hash: | |
def __init__(self, name): | |
self.name = name | |
def input(self): | |
return 'binary' | |
def output(self): | |
return 'binary' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.metadata = metadata | |
self.prev.subscribe(self.changed) | |
self.instance = md5() | |
def changed(self): | |
chunk = self.prev.read(-1) | |
self.instance.update(chunk) | |
self.next.append(chunk) | |
def flush(self): | |
self.metadata.set(self.name, self.instance.digest().hex()) | |
class SHA1Hash: | |
def __init__(self, name): | |
self.name = name | |
def input(self): | |
return 'binary' | |
def output(self): | |
return 'binary' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.metadata = metadata | |
self.prev.subscribe(self.changed) | |
self.instance = sha1() | |
def changed(self): | |
chunk = self.prev.read(-1) | |
self.instance.update(chunk) | |
self.next.append(chunk) | |
def flush(self): | |
self.metadata.set(self.name, self.instance.digest().hex()) | |
class EcsTask: | |
def __init__(self, cluster, task, securityGroup, vpcSubnet, queue, environment): | |
self.cluster = cluster | |
self.task = task | |
self.securityGroup = securityGroup | |
self.vpcSubnet = vpcSubnet | |
self.queue = queue | |
self.environment = environment | |
def input(self): | |
return 'dict' | |
def output(self): | |
return 'dict' | |
def bind(self, prev, next, metrics, metadata): | |
self.prev = prev | |
self.next = next | |
self.metrics = metrics | |
self.ecs = client('ecs') | |
self.prev.subscribe(self.changed) | |
def changed(self): | |
item = self.queue.get_nowait() | |
self.metrics.log(f'acquired {item["Host"]}') | |
response = self.ecs.run_task( | |
cluster=self.cluster, | |
taskDefinition=self.task, | |
launchType='FARGATE', | |
platformVersion='1.4.0', | |
networkConfiguration={ | |
'awsvpcConfiguration': { | |
'assignPublicIp': 'ENABLED', | |
'securityGroups': [self.securityGroup], | |
'subnets': [self.vpcSubnet] | |
} | |
}, | |
overrides={ | |
'containerOverrides': [{ | |
'name': self.task.replace('/', ':').split(':')[-2], | |
'environment': self.environment(item) | |
}] | |
} | |
) | |
self.metrics.log(f'waiting {response["tasks"][0]["taskArn"]}') | |
self.ecs.get_waiter('tasks_stopped').wait( | |
cluster=self.cluster, | |
tasks=[response['tasks'][0]['taskArn']], | |
WaiterConfig={ | |
'Delay': 6, | |
'MaxAttempts': 1200 | |
} | |
) | |
self.queue.put(item) | |
self.metrics.log(f'released {item["Host"]}') | |
def flush(self): | |
pass | |
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, queue): | |
pipeline = Pipeline(name=filename, steps=[ | |
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, queue=queue, environment=lambda token: [ | |
{ 'name': 'TYPE', 'value': 'worker' }, | |
{ 'name': 'FILENAME', 'value': filename }, | |
{ 'name': 'BUCKET', 'value': bucket }, | |
{ 'name': 'HOST', 'value': token['Host'] }, | |
{ 'name': 'DIRECTORY', 'value': token['Directory'] }, | |
]) | |
]) | |
pipeline.start(input=filename) | |
def worker(filename, bucket, host, directory): | |
pipeline = Pipeline(name=filename, steps=[ | |
FtpDownload(host=host, directory=directory), | |
Ungzip(), | |
XmlToJson(rowtag='logitem'), | |
S3Upload(bucket=bucket, key=f'data/{splitext(splitext(filename)[0])[0]}.json', chunksize=128*1024*1024) | |
]) | |
pipeline.start(input=filename) | |
if __name__ == '__main__' and getenv('TYPE') == 'worker': | |
worker(getenv('FILENAME'), getenv('BUCKET'), getenv('HOST'), getenv('DIRECTORY')) | |
if __name__ == '__main__' and getenv('TYPE') == 'master': | |
parameters = Parameters() | |
manager = Manager() | |
queue = manager.Queue() | |
bucket = parameters.value('/wikipedia/bucket_name') | |
securityGroup = parameters.value('/wikipedia/security_group') | |
vpcSubnet = parameters.value('/wikipedia/vpc_subnet') | |
taskArn = parameters.value('/wikipedia/task_arn') | |
clusterArn = parameters.value('/wikipedia/cluster_arn') | |
items = [(f'enwiki-20201020-pages-logging{i}.xml.gz', bucket, clusterArn, taskArn, securityGroup, vpcSubnet, queue) for i in range(1, 28)] | |
for i in range(3): | |
queue.put({ | |
'Host': 'ftpmirror.your.org', | |
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/' | |
}) | |
for i in range(3): | |
queue.put({ | |
'Host': 'ftp.acc.umu.se', | |
'Directory': 'mirror/wikimedia.org/dumps/enwiki/20201020/' | |
}) | |
for i in range(3): | |
queue.put({ | |
'Host': 'dumps.wikimedia.your.org', | |
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/' | |
}) | |
with Pool(9) as pool: | |
pool.starmap(master, items) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment