Skip to content

Instantly share code, notes, and snippets.

@amacal
Created November 29, 2020 10:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amacal/194d1c1fef394db434e421972cc2016f to your computer and use it in GitHub Desktop.
Save amacal/194d1c1fef394db434e421972cc2016f to your computer and use it in GitHub Desktop.
def worker_sort(name, tag, bucket, input, output):
pipeline = Pipeline(name=name, steps=[
S3Download(),
NDJsonChunk(chunksize=1024*1024),
ForEachChunk(chunksize=512*1024*1024, steps=lambda index: [
NDJsonIndex(extract=lambda row: row[tag]),
QuickSort(key=lambda row: row.key),
NDJsonFlush(),
S3Upload(bucket=bucket, key=f'{output}.tmp/{index}', chunksize=128*1024*1024)
]),
WaitAll(),
MergeSort(key=lambda row: row.key, steps=lambda index: [
S3Download(),
NDJsonIndex(extract=lambda row: row[tag]),
]),
NDJsonFlush(),
S3Upload(bucket=bucket, key=f'{output}', chunksize=256*1024*1024),
Singleton(value=S3Prefix(bucket=bucket, prefix=f'{output}.tmp/')),
S3List(),
S3Delete(),
DictDebug(),
])
pipeline.start(input=S3Object(bucket=bucket, key=input))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment