Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def worker_sort(name, tag, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
S3Download(),
NDJsonChunk(chunksize=1024*1024),
ForEachChunk(chunksize=512*1024*1024, steps=lambda index: [
NDJsonIndex(extract=lambda row: row[tag]),
QuickSort(key=lambda row: row.key),
NDJsonFlush(),
S3Upload(bucket=bucket, key=f'{output}.tmp/{index}', chunksize=128*1024*1024)
]),
WaitAll(),
MergeSort(key=lambda row: row.key, steps=lambda index: [
S3Download(),
NDJsonIndex(extract=lambda row: row[tag]),
]),
NDJsonFlush(),
S3Upload(bucket=bucket, key=f'{output}', chunksize=256*1024*1024),
Singleton(value=S3Prefix(bucket=bucket, prefix=f'{output}.tmp/')),
S3List(),
S3Delete(),
DictDebug(),
])
pipeline.start(input=S3Object(bucket=bucket, key=input))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment