Created
September 5, 2024 18:36
-
-
Save pellicceama/1ec95f1f2f1dc4d1265a54c55496ca8c to your computer and use it in GitHub Desktop.
Merge two parquet files in a cloudflare worker
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import initWasm, { ParquetFile, transformParquetStream } from './parquet-wasm'; | |
import binary from './parquet-wasm/parquet_wasm_bg.wasm'; | |
interface Env { | |
DEV_PARQUET_BUCKET: R2Bucket; | |
} | |
export default { | |
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> { | |
await initWasm(binary); | |
const url1 = 'https://raw.githubusercontent.com/redapt/pyspark-s3-parquet-example/master/test-data/nation.plain.parquet'; | |
const url2 = 'https://raw.githubusercontent.com/redapt/pyspark-s3-parquet-example/master/test-data/nation.plain.parquet'; | |
try { | |
// Function to create a stream of record batches from a URL | |
async function* createRecordBatchStream(url: string) { | |
const fileInstance = await ParquetFile.fromUrl(url); | |
yield* await fileInstance.stream(); | |
} | |
// Combine streams from both files | |
const combinedStream = combineStreams(createRecordBatchStream(url1), createRecordBatchStream(url2)); | |
// Convert AsyncGenerator to ReadableStream | |
const readableStream = new ReadableStream({ | |
async start(controller) { | |
for await (const chunk of combinedStream) { | |
controller.enqueue(chunk); | |
} | |
controller.close(); | |
}, | |
}); | |
// Transform the combined stream into a Parquet byte stream | |
const parquetByteStream = await transformParquetStream(readableStream); | |
// Count total length and create FixedLengthStream | |
let totalLength = 0; | |
const chunks: Uint8Array[] = []; | |
for await (const chunk of parquetByteStream) { | |
totalLength += chunk.byteLength; | |
chunks.push(chunk); | |
} | |
const fixedLengthStream = new FixedLengthStream(totalLength); | |
const writer = fixedLengthStream.writable.getWriter(); | |
// Write chunks to the FixedLengthStream | |
(async () => { | |
for (const chunk of chunks) { | |
await writer.write(chunk); | |
} | |
await writer.close(); | |
})(); | |
// Upload the merged Parquet file to R2 using the FixedLengthStream | |
await env.DEV_PARQUET_BUCKET.put('merged_nation.parquet', fixedLengthStream.readable); | |
return new Response('Files merged and uploaded successfully', { | |
headers: { 'Content-Type': 'text/plain' }, | |
}); | |
} catch (error: any) { | |
console.error('Error:', error); | |
return new Response('An error occurred: ' + error.message, { | |
status: 500, | |
headers: { 'Content-Type': 'text/plain' }, | |
}); | |
} | |
}, | |
} satisfies ExportedHandler<Env>; | |
// Helper function to combine multiple async iterables | |
async function* combineStreams<T>(...streams: AsyncIterable<T>[]) { | |
for (const stream of streams) { | |
yield* stream; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment