Skip to content

Instantly share code, notes, and snippets.

Avatar

Adrian Macal amacal

  • FlixBus
  • Munich, Germany
View GitHub Profile
View python-asyncio-new.py
from queue import Queue
from asyncio import get_running_loop, wait, run
from concurrent.futures import ThreadPoolExecutor
async def main():
tasks = []
loop = get_running_loop()
ftpQueue = Queue()
jsonQueue = Queue()
View clean-s3-multipart-upload.tf
resource "aws_s3_bucket" "data" {
bucket = "wikipedia-${var.account_id}"
force_destroy = true
lifecycle_rule {
id = "abort-multipart-upload"
enabled = true
abort_incomplete_multipart_upload_days = 1
}
}
View ecs-spots-start.py
def start(self, token):
response = self.ecs.run_task(
cluster=self.cluster,
taskDefinition=self.task,
platformVersion='1.4.0',
networkConfiguration={
'awsvpcConfiguration': {
'assignPublicIp': 'ENABLED',
'securityGroups': [self.securityGroup],
'subnets': [self.vpcSubnet]
View ecs-spots-infra.tf
resource "aws_ecs_cluster" "cluster" {
name = "wikipedia"
capacity_providers = ["FARGATE_SPOT"]
default_capacity_provider_strategy {
capacity_provider = "FARGATE_SPOT"
weight = 100
}
setting {
View coordinating-ecs-all-v2.py
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
from lxml.etree import iterparse
from json import dumps
View coordinating-ecs-worker-v2.py
def worker_ftp(name, host, directory, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
FtpDownload(host=host, directory=directory),
S3Upload(bucket=bucket, key=output, chunksize=128*1024*1024)
])
pipeline.start(input=input)
def worker_json(name, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
View coordinating-ecs-master-v2.py
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, ftpQueue, jsonQueue):
pipeline = Pipeline(name=filename, steps=[
Throttling(queue=ftpQueue),
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, environment=lambda token: [
{ 'name': 'TYPE', 'value': 'worker-ftp' },
{ 'name': 'NAME', 'value': token.value },
{ 'name': 'BUCKET', 'value': bucket },
{ 'name': 'INPUT', 'value': token.value },
{ 'name': 'OUTPUT', 'value': f'raw/{token.value}' },
{ 'name': 'HOST', 'value': token.item['Host'] },
View coordinating-ecs-all-v1.py
from boto3 import client
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from multiprocessing import Pool, Queue, Manager
from lxml.etree import iterparse
from json import dumps
View coordinating-ecs-task-v1.py
class EcsTask:
def __init__(self, cluster, task, securityGroup, vpcSubnet, queue, environment):
self.cluster = cluster
self.task = task
self.securityGroup = securityGroup
self.vpcSubnet = vpcSubnet
self.queue = queue
self.environment = environment
def input(self):
View coordinating-ecs-master-v1.py
def master(filename, bucket, cluster, task, securityGroup, vpcSubnet, queue):
pipeline = Pipeline(name=filename, steps=[
EcsTask(cluster=cluster, task=task, securityGroup=securityGroup, vpcSubnet=vpcSubnet, queue=queue, environment=lambda token: [
{ 'name': 'TYPE', 'value': 'worker' },
{ 'name': 'FILENAME', 'value': filename },
{ 'name': 'BUCKET', 'value': bucket },
{ 'name': 'HOST', 'value': token['Host'] },
{ 'name': 'DIRECTORY', 'value': token['Directory'] },
])
])