def worker_sort(name, tag, bucket, input, output): | |
pipeline = Pipeline(name=name, steps=[ | |
S3Download(), | |
NDJsonChunk(chunksize=1024*1024), | |
ForEachChunk(chunksize=512*1024*1024, steps=lambda index: [ | |
NDJsonIndex(extract=lambda row: row[tag]), | |
QuickSort(key=lambda row: row.key), | |
NDJsonFlush(), | |
S3Upload(bucket=bucket, key=f'{output}.tmp/{index}', chunksize=128*1024*1024) | |
]), | |
WaitAll(), | |
MergeSort(key=lambda row: row.key, steps=lambda index: [ | |
S3Download(), | |
NDJsonIndex(extract=lambda row: row[tag]), | |
]), | |
NDJsonFlush(), | |
S3Upload(bucket=bucket, key=f'{output}', chunksize=256*1024*1024), | |
Singleton(value=S3Prefix(bucket=bucket, prefix=f'{output}.tmp/')), | |
S3List(), | |
S3Delete(), | |
DictDebug(), | |
]) | |
pipeline.start(input=S3Object(bucket=bucket, key=input)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment