Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
from lxml.etree import iterparse
from json import dumps
class Parameters:
def __init__(self):
self.ssmClient = client('ssm')
def value(self, name):
return self.ssmClient.get_parameter(Name=name)['Parameter']['Value']
class Metrics:
def __init__(self, name):
self.name = name
def log(self, data):
print(f'{self.name}: {data}\n', end='')
class Metadata:
def __init__(self):
self.data = dict()
def keys(self):
return self.data.keys()
def set(self, name, value):
self.data[name] = value
def get(self, name):
return self.data[name]
class Pipeline:
def __init__(self, name, steps):
self.steps = steps
self.metadata = Metadata()
self.metrics = Metrics(name)
def init(self):
prev = DictPipe()
self.pipe = prev
for step in self.steps:
next = BinaryPipe() if step.output() == 'binary' else DictPipe()
step.bind(prev, next, self.metrics, self.metadata)
prev = next
def flush(self):
for step in self.steps:
step.flush()
def run(self, input):
self.pipe.append([input])
def complete(self):
for key in self.metadata.keys():
self.metrics.log(f'{key} -> {self.metadata.get(key)}')
def start(self, input=None):
self.init()
self.run(input)
self.flush()
self.complete()
class BinaryPipe:
def __init__(self):
self.offset = 0
self.total = 0
self.threshold = 1024 * 1024
self.data = bytearray()
def append(self, chunk):
self.data += chunk
self.callback() if self.callback is not None else None
def subscribe(self, callback):
self.callback = callback
def length(self):
return len(self.data) - self.offset
def read(self, size):
size = size if size >= 0 else self.length()
value = self.data[self.offset:self.offset+size]
self.offset = self.offset + len(value)
if self.offset >= self.threshold:
self.data = self.data[self.offset:]
self.offset = 0
return bytes(value)
class DictPipe:
def __init__(self):
self.offset = 0
self.total = 0
self.threshold = 1024 * 1024
self.data = list()
def append(self, chunk):
self.data += chunk
self.callback() if self.callback is not None else None
def subscribe(self, callback):
self.callback = callback
def length(self):
return len(self.data) - self.offset
def read(self, size):
size = size if size >= 0 else self.length()
value = self.data[self.offset:self.offset+size]
self.offset = self.offset + len(value)
if self.offset >= self.threshold:
self.data = self.data[self.offset:]
self.offset = 0
return value
class FtpDownload:
def __init__(self, host, directory):
self.client = None
self.host = host
self.directory = directory
self.tick = int(now())
def input(self):
return 'dict'
def output(self):
return 'binary'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.prev.subscribe(self.changed)
self.next = next
self.metrics = metrics
def changed(self):
if item := self.prev.read(1):
self.start_client()
self.start_download(item[0])
def request_item(self):
self.metrics.log(f'using {self.host}')
def start_client(self):
self.client = FTP(self.host)
self.client.login()
self.client.cwd(self.directory)
def start_download(self, input):
self.metrics.log(f'download started {self.directory} {input}')
self.client.retrbinary(f'RETR {input}', self.append, blocksize=128*1024)
self.metrics.log(f'download completed {self.directory} {input}')
def append(self, chunk):
self.next.append(chunk)
self.touch(tick=int(now()))
def touch(self, tick):
if tick - self.tick > 60:
self.client.putcmd('NOOP')
self.tick = tick
def flush(self):
self.release_client()
def release_client(self):
if self.client:
self.client.quit()
class Ungzip:
def input(self):
return 'binary'
def output(self):
return 'binary'
def bind(self, prev, next, metrics, metadata):
self.next = next
self.prev = prev
self.prev.subscribe(self.changed)
self.file = GzipFile(mode='r', fileobj=prev)
def changed(self):
while self.prev.length() > 1024 * 1024:
self.next.append(self.file.read(128 * 1024))
def flush(self):
while True:
chunk = self.file.read(128 * 1024)
if len(chunk) == 0:
break
self.next.append(chunk)
class XmlToJson:
def __init__(self, rowtag):
self.rowtag = rowtag
self.iterator = None
def input(self):
return 'binary'
def output(self):
return 'binary'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.prev.subscribe(self.changed)
def changed(self):
if self.iterator is None and self.prev.length() > 1024 * 1024:
self.iterator = iterparse(source=self.prev, events=['start', 'end'])
while self.iterator is not None and self.prev.length() > 1024 * 1024:
data = self.tick()
if data is not None:
self.next.append(data)
def flush(self):
while True:
data = self.tick()
if data is not None:
self.next.append(data)
else:
break
def tick(self):
container = None
previous = None
path = []
item = None
while True:
try:
event, node = next(self.iterator)
_, _, node.tag = node.tag.rpartition('}')
if event == 'start' and node.tag == self.rowtag:
container = dict()
path.append(container)
while node.getprevious() is not None:
del node.getparent()[0]
elif event == 'end' and node.tag == self.rowtag:
data = bytearray(dumps(container), 'utf8')
container = None
path = []
return data + b'\n'
elif container is None:
pass
elif event == 'start':
if path[-1] is None:
path[-1] = dict()
previous = node.tag
path.append(None)
elif event == 'end' and path[-1] is None:
path[-2][previous] = node.text
path.pop()
else:
path.pop()
except StopIteration:
return None
class S3Upload:
def __init__(self, bucket, key, chunksize):
self.chunksize = chunksize
self.client = client('s3')
self.bucket = bucket
self.key = key
self.part = 1
self.parts = list()
self.upload_id = None
def input(self):
return 'binary'
def output(self):
return 'dict'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.metrics = metrics
self.prev.subscribe(self.changed)
def start_upload(self):
if not self.upload_id:
self.upload_id = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)['UploadId']
self.metrics.log(f'upload started {self.upload_id}')
def changed(self):
self.start_upload()
self.upload(self.chunksize)
def flush(self):
self.start_upload()
self.upload()
self.complete()
def upload(self, size = 0):
while self.prev.length() > size:
chunk = self.prev.read(self.chunksize)
response = self.client.upload_part(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, PartNumber=self.part, Body=chunk)
self.metrics.log(f'part {self.part} completed; {len(chunk)} bytes')
self.parts.append({'ETag': response['ETag'], 'PartNumber': self.part})
self.part = self.part + 1
def complete(self):
self.client.complete_multipart_upload(Bucket=self.bucket, Key=self.key, UploadId=self.upload_id, MultipartUpload={'Parts': self.parts})
self.metrics.log(f'upload completed {self.upload_id}')
class MD5Hash:
def __init__(self, name):
self.name = name
def input(self):
return 'binary'
def output(self):
return 'binary'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.metadata = metadata
self.prev.subscribe(self.changed)
self.instance = md5()
def changed(self):
chunk = self.prev.read(-1)
self.instance.update(chunk)
self.next.append(chunk)
def flush(self):
self.metadata.set(self.name, self.instance.digest().hex())
class SHA1Hash:
def __init__(self, name):
self.name = name
def input(self):
return 'binary'
def output(self):
return 'binary'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.metadata = metadata
self.prev.subscribe(self.changed)
self.instance = sha1()
def changed(self):
chunk = self.prev.read(-1)
self.instance.update(chunk)
self.next.append(chunk)
def flush(self):
self.metadata.set(self.name, self.instance.digest().hex())
class EcsTask:
def __init__(self, cluster, task, securityGroup, vpcSubnet, queue, environment):
self.cluster = cluster
self.task = task
self.securityGroup = securityGroup
self.vpcSubnet = vpcSubnet
self.queue = queue
self.environment = environment
def input(self):
return 'dict'
def output(self):
return 'dict'
def bind(self, prev, next, metrics, metadata):
self.prev = prev
self.next = next
self.metrics = metrics
self.ecs = client('ecs')
self.prev.subscribe(self.changed)
def changed(self):
item = self.queue.get_nowait()
self.metrics.log(f'acquired {item["Host"]}')
response = self.ecs.run_task(
cluster=self.cluster,
taskDefinition=self.task,
launchType='FARGATE',
platformVersion='1.4.0',
networkConfiguration={
'awsvpcConfiguration': {
'assignPublicIp': 'ENABLED',
'securityGroups': [self.securityGroup],
'subnets': [self.vpcSubnet]
}
},
overrides={
'containerOverrides': [{
'name': self.task.replace('/', ':').split(':')[-2],
'environment': self.environment(item)
}]
}
)
self.metrics.log(f'waiting {response["tasks"][0]["taskArn"]}')
self.ecs.get_waiter('tasks_stopped').wait(
cluster=self.cluster,
tasks=[response['tasks'][0]['taskArn']],
WaiterConfig={
'Delay': 6,
'MaxAttempts': 1200
}
)
self.queue.put(item)
self.metrics.log(f'released {item["Host"]}')
def flush(self):
pass
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, queue):
pipeline = Pipeline(name=filename, steps=[
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, queue=queue, environment=lambda token: [
{ 'name': 'TYPE', 'value': 'worker' },
{ 'name': 'FILENAME', 'value': filename },
{ 'name': 'BUCKET', 'value': bucket },
{ 'name': 'HOST', 'value': token['Host'] },
{ 'name': 'DIRECTORY', 'value': token['Directory'] },
])
])
pipeline.start(input=filename)
def worker(filename, bucket, host, directory):
pipeline = Pipeline(name=filename, steps=[
FtpDownload(host=host, directory=directory),
Ungzip(),
XmlToJson(rowtag='logitem'),
S3Upload(bucket=bucket, key=f'data/{splitext(splitext(filename)[0])[0]}.json', chunksize=128*1024*1024)
])
pipeline.start(input=filename)
if __name__ == '__main__' and getenv('TYPE') == 'worker':
worker(getenv('FILENAME'), getenv('BUCKET'), getenv('HOST'), getenv('DIRECTORY'))
if __name__ == '__main__' and getenv('TYPE') == 'master':
parameters = Parameters()
manager = Manager()
queue = manager.Queue()
bucket = parameters.value('/wikipedia/bucket_name')
securityGroup = parameters.value('/wikipedia/security_group')
vpcSubnet = parameters.value('/wikipedia/vpc_subnet')
taskArn = parameters.value('/wikipedia/task_arn')
clusterArn = parameters.value('/wikipedia/cluster_arn')
items = [(f'enwiki-20201020-pages-logging{i}.xml.gz', bucket, clusterArn, taskArn, securityGroup, vpcSubnet, queue) for i in range(1, 28)]
for i in range(3):
queue.put({
'Host': 'ftpmirror.your.org',
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/'
})
for i in range(3):
queue.put({
'Host': 'ftp.acc.umu.se',
'Directory': 'mirror/wikimedia.org/dumps/enwiki/20201020/'
})
for i in range(3):
queue.put({
'Host': 'dumps.wikimedia.your.org',
'Directory': 'pub/wikimedia/dumps/enwiki/20201020/'
})
with Pool(9) as pool:
pool.starmap(master, items)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment