Last active
September 11, 2019 04:13
-
-
Save TimmyCarbone/d244c54dcaaddcc7a3a59965b8e9c5f2 to your computer and use it in GitHub Desktop.
Distribute rows from readable stream to multiple streams depending on the row content
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { PassThrough } = require('stream'); | |
// FUNCTIONS | |
// -------- | |
// Function: For each partition, we create an object containing a stream and | |
// limits for that partition. | |
const createBranchingStreams = (partitions) => { | |
let streams = partitions.map((p, i) => ({ | |
min: p.min, | |
max: p.max, | |
stream: new PassThrough() | |
})); | |
// If we're using a default partition, we can create it here. | |
streams.push({ | |
name: 'default', | |
stream: new PassThrough() | |
}); | |
return streams; | |
}; | |
// Function: For each row we read, we decide in which stream to send the data | |
const sendToCorrectStream = (streams, row) => { | |
// We're looking for the stream we should send that row into | |
const found = streams.find((s) => | |
(!row.name) && (row.id >= s.min) && (row.id < s.max)); | |
const content = JSON.stringify(row); | |
if (found) { | |
found.stream.write(content); | |
} else { | |
// If we didn't find any stream, we're sending the row to the default stream | |
const defaultStream = streams.find((s) => s.name === 'default'); | |
defaultStream.stream.write(content); | |
} | |
}; | |
// MAIN | |
// --------- | |
const partitions = ** Pull list of partitions however we want **; | |
// Create distribution streams | |
const branches = createBranchingStreams(partitions); | |
// Connect each distribution stream to its corresponding table partition | |
branches.forEach((b) => { | |
b.stream.pause(); | |
// We define the partition name | |
const partitionSuffix = b.name === 'default' ? 'default' : [b.min, b.max].join('_'); | |
const SQLoptions = ['FORMAT CSV', `DELIMITER '${CSV_DELIMITER}'`].join(','); | |
const copyQuery = `COPY table_name_part_${partitionSuffix} FROM STDIN WITH (${SQLoptions})`; | |
// "insert" is a stream that will store the content it receives into the defined partition | |
const insert = client.query(pgCopy.from(copyQuery)); | |
// We plug our branching stream to our inserting stream to connect | |
// the branching stream to the partition. | |
// We defined that the COPY query would match a CSV format so we also | |
// need to transform the row to a CSV matching the table schema. | |
b.stream.pipe(JSONStream.parse()).pipe(transformRowToCSV).pipe(insert); | |
b.stream.resume(); | |
}); | |
// Stream query results from data source | |
const stream = ** Query results stream **; | |
// For each row, distribute it to the correct stream | |
// The destination stream will store the data in its attached partition | |
stream.on('data', (row) => sendToCorrectStream(branches, row)); | |
// When there's nothing left coming from the source stream, | |
// we can close all the distribution scripts. | |
stream.on('end', () => branches.forEach((b) => b.stream.end())); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment