Skip to content

Instantly share code, notes, and snippets.

@pellicceama
Created September 5, 2024 18:36
Show Gist options
  • Save pellicceama/1ec95f1f2f1dc4d1265a54c55496ca8c to your computer and use it in GitHub Desktop.
Save pellicceama/1ec95f1f2f1dc4d1265a54c55496ca8c to your computer and use it in GitHub Desktop.
Merge two parquet files in a cloudflare worker
import initWasm, { ParquetFile, transformParquetStream } from './parquet-wasm';
import binary from './parquet-wasm/parquet_wasm_bg.wasm';
interface Env {
DEV_PARQUET_BUCKET: R2Bucket;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
await initWasm(binary);
const url1 = 'https://raw.githubusercontent.com/redapt/pyspark-s3-parquet-example/master/test-data/nation.plain.parquet';
const url2 = 'https://raw.githubusercontent.com/redapt/pyspark-s3-parquet-example/master/test-data/nation.plain.parquet';
try {
// Function to create a stream of record batches from a URL
async function* createRecordBatchStream(url: string) {
const fileInstance = await ParquetFile.fromUrl(url);
yield* await fileInstance.stream();
}
// Combine streams from both files
const combinedStream = combineStreams(createRecordBatchStream(url1), createRecordBatchStream(url2));
// Convert AsyncGenerator to ReadableStream
const readableStream = new ReadableStream({
async start(controller) {
for await (const chunk of combinedStream) {
controller.enqueue(chunk);
}
controller.close();
},
});
// Transform the combined stream into a Parquet byte stream
const parquetByteStream = await transformParquetStream(readableStream);
// Count total length and create FixedLengthStream
let totalLength = 0;
const chunks: Uint8Array[] = [];
for await (const chunk of parquetByteStream) {
totalLength += chunk.byteLength;
chunks.push(chunk);
}
const fixedLengthStream = new FixedLengthStream(totalLength);
const writer = fixedLengthStream.writable.getWriter();
// Write chunks to the FixedLengthStream
(async () => {
for (const chunk of chunks) {
await writer.write(chunk);
}
await writer.close();
})();
// Upload the merged Parquet file to R2 using the FixedLengthStream
await env.DEV_PARQUET_BUCKET.put('merged_nation.parquet', fixedLengthStream.readable);
return new Response('Files merged and uploaded successfully', {
headers: { 'Content-Type': 'text/plain' },
});
} catch (error: any) {
console.error('Error:', error);
return new Response('An error occurred: ' + error.message, {
status: 500,
headers: { 'Content-Type': 'text/plain' },
});
}
},
} satisfies ExportedHandler<Env>;
// Helper function to combine multiple async iterables
async function* combineStreams<T>(...streams: AsyncIterable<T>[]) {
for (const stream of streams) {
yield* stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment