Skip to content

Instantly share code, notes, and snippets.

View amacal's full-sized avatar

Adrian Macal amacal

View GitHub Profile
def start(self, token):
response = self.ecs.run_task(
cluster=self.cluster,
taskDefinition=self.task,
platformVersion='1.4.0',
networkConfiguration={
'awsvpcConfiguration': {
'assignPublicIp': 'ENABLED',
'securityGroups': [self.securityGroup],
'subnets': [self.vpcSubnet]
resource "aws_ecs_cluster" "cluster" {
name = "wikipedia"
capacity_providers = ["FARGATE_SPOT"]
default_capacity_provider_strategy {
capacity_provider = "FARGATE_SPOT"
weight = 100
}
setting {
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
from lxml.etree import iterparse
from json import dumps
def worker_ftp(name, host, directory, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
FtpDownload(host=host, directory=directory),
S3Upload(bucket=bucket, key=output, chunksize=128*1024*1024)
])
pipeline.start(input=input)
def worker_json(name, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, ftpQueue, jsonQueue):
pipeline = Pipeline(name=filename, steps=[
Throttling(queue=ftpQueue),
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, environment=lambda token: [
{ 'name': 'TYPE', 'value': 'worker-ftp' },
{ 'name': 'NAME', 'value': token.value },
{ 'name': 'BUCKET', 'value': bucket },
{ 'name': 'INPUT', 'value': token.value },
{ 'name': 'OUTPUT', 'value': f'raw/{token.value}' },
{ 'name': 'HOST', 'value': token.item['Host'] },
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
from lxml.etree import iterparse
from json import dumps
class EcsTask:
def __init__(self, cluster, task, securityGroup, vpcSubnet, queue, environment):
self.cluster = cluster
self.task = task
self.securityGroup = securityGroup
self.vpcSubnet = vpcSubnet
self.queue = queue
self.environment = environment
def input(self):
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, queue):
pipeline = Pipeline(name=filename, steps=[
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, queue=queue, environment=lambda token: [
{ 'name': 'TYPE', 'value': 'worker' },
{ 'name': 'FILENAME', 'value': filename },
{ 'name': 'BUCKET', 'value': bucket },
{ 'name': 'HOST', 'value': token['Host'] },
{ 'name': 'DIRECTORY', 'value': token['Directory'] },
])
])
def worker(filename, bucket, host, directory):
pipeline = Pipeline(name=filename, steps=[
FtpDownload(host=host, directory=directory),
Ungzip(),
XmlToJson(rowtag='logitem'),
S3Upload(bucket=bucket, key=f'data/{splitext(splitext(filename)[0])[0]}.json', chunksize=128*1024*1024)
])
pipeline.start(input=filename)
FROM python:3.9.0-buster
COPY wikipedia-*.zip /app/wikipedia/
RUN ls -l /app/wikipedia/ && cd /app/wikipedia/ \
&& unzip wikipedia-app.zip && unzip wikipedia-libs.zip \
&& rm wikipedia-*.zip
CMD [ "python", "-u", "/app/wikipedia/index.py" ]