Skip to content

Instantly share code, notes, and snippets.

@TimmyCarbone
Last active September 11, 2019 04:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TimmyCarbone/d244c54dcaaddcc7a3a59965b8e9c5f2 to your computer and use it in GitHub Desktop.
Save TimmyCarbone/d244c54dcaaddcc7a3a59965b8e9c5f2 to your computer and use it in GitHub Desktop.
Distribute rows from readable stream to multiple streams depending on the row content
const { PassThrough } = require('stream');
// FUNCTIONS
// --------
// Function: For each partition, we create an object containing a stream and
// limits for that partition.
const createBranchingStreams = (partitions) => {
let streams = partitions.map((p, i) => ({
min: p.min,
max: p.max,
stream: new PassThrough()
}));
// If we're using a default partition, we can create it here.
streams.push({
name: 'default',
stream: new PassThrough()
});
return streams;
};
// Function: For each row we read, we decide in which stream to send the data
const sendToCorrectStream = (streams, row) => {
// We're looking for the stream we should send that row into
const found = streams.find((s) =>
(!row.name) && (row.id >= s.min) && (row.id < s.max));
const content = JSON.stringify(row);
if (found) {
found.stream.write(content);
} else {
// If we didn't find any stream, we're sending the row to the default stream
const defaultStream = streams.find((s) => s.name === 'default');
defaultStream.stream.write(content);
}
};
// MAIN
// ---------
const partitions = ** Pull list of partitions however we want **;
// Create distribution streams
const branches = createBranchingStreams(partitions);
// Connect each distribution stream to its corresponding table partition
branches.forEach((b) => {
b.stream.pause();
// We define the partition name
const partitionSuffix = b.name === 'default' ? 'default' : [b.min, b.max].join('_');
const SQLoptions = ['FORMAT CSV', `DELIMITER '${CSV_DELIMITER}'`].join(',');
const copyQuery = `COPY table_name_part_${partitionSuffix} FROM STDIN WITH (${SQLoptions})`;
// "insert" is a stream that will store the content it receives into the defined partition
const insert = client.query(pgCopy.from(copyQuery));
// We plug our branching stream to our inserting stream to connect
// the branching stream to the partition.
// We defined that the COPY query would match a CSV format so we also
// need to transform the row to a CSV matching the table schema.
b.stream.pipe(JSONStream.parse()).pipe(transformRowToCSV).pipe(insert);
b.stream.resume();
});
// Stream query results from data source
const stream = ** Query results stream **;
// For each row, distribute it to the correct stream
// The destination stream will store the data in its attached partition
stream.on('data', (row) => sendToCorrectStream(branches, row));
// When there's nothing left coming from the source stream,
// we can close all the distribution scripts.
stream.on('end', () => branches.forEach((b) => b.stream.end()));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment