Skip to content

Instantly share code, notes, and snippets.

View amacal's full-sized avatar

Adrian Macal amacal

View GitHub Profile
294808988 function calls (289527877 primitive calls) in 1308.849 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
560862 780.566 0.001 780.566 0.001 {method 'write' of '_ssl._SSLSocket' objects}
423504 260.731 0.001 260.731 0.001 {method 'read' of '_ssl._SSLSocket' objects}
11175660 39.737 0.000 39.737 0.000 {orjson.loads}
19706 27.423 0.001 113.160 0.006 json.py:45(process)
93 18.475 0.199 18.475 0.199 {method 'connect' of '_socket.socket' objects}
463723114 function calls (458443034 primitive calls) in 1209.540 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
90969949/90968756 82.382 0.000 82.420 0.000 {built-in method builtins.len}
11175664 81.461 0.000 201.197 0.000 decoder.py:332(decode)
19706 77.013 0.004 670.440 0.034 json.py:46(process)
11175664 71.578 0.000 412.260 0.000 __init__.py:299(loads)
11175664 60.933 0.000 60.933 0.000 decoder.py:343(raw_decode)
462476698 function calls (457195595 primitive calls) in 1256.186 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
560863 762.970 0.001 762.970 0.001 {method 'write' of '_ssl._SSLSocket' objects}
426597 205.799 0.000 205.799 0.000 {method 'read' of '_ssl._SSLSocket' objects}
11175669 36.491 0.000 36.491 0.000 decoder.py:343(raw_decode)
19706 22.167 0.001 161.075 0.008 json.py:46(process)
11175669 14.730 0.000 61.480 0.000 decoder.py:332(decode)
def worker_sort(name, tag, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
S3Download(),
NDJsonChunk(chunksize=1024*1024),
ForEachChunk(chunksize=512*1024*1024, steps=lambda index: [
NDJsonIndex(extract=lambda row: row[tag]),
QuickSort(key=lambda row: row.key),
NDJsonFlush(),
S3Upload(bucket=bucket, key=f'{output}.tmp/{index}', chunksize=128*1024*1024)
]),
with source as
(
select * from ecds_db.playground.time11M
),
timeline as
(
select id, start_at as happened_at, 1 as balance
from source
union all
select id, end_at as happened_at, -1 as balance
with source as
(
select * from ecds_db.playground.time11M
),
timeline as
(
select start_at as moment
from source
union
select end_at as moment
from multiprocessing import Pool, Queue, Manager
manager = Manager()
ftpQueue = manager.Queue()
jsonQueue = manager.Queue()
with Pool(20) as pool:
pool.starmap(master, [(name, ..., ftpQueue, jsonQueue) for name in fetch_names()])
from boto3 import client
from botocore.exceptions import ClientError
from ftplib import FTP
from gzip import GzipFile
from os.path import splitext
from os import getenv
from time import time as now
from hashlib import md5, sha1
from queue import Queue
from lxml.etree import iterparse
from queue import Queue
from asyncio import get_running_loop, wait, run
from concurrent.futures import ThreadPoolExecutor
async def main():
tasks = []
loop = get_running_loop()
ftpQueue = Queue()
jsonQueue = Queue()
resource "aws_s3_bucket" "data" {
bucket = "wikipedia-${var.account_id}"
force_destroy = true
lifecycle_rule {
id = "abort-multipart-upload"
enabled = true
abort_incomplete_multipart_upload_days = 1
}
}