Skip to content

Instantly share code, notes, and snippets.

@khun84
Last active August 20, 2022 01:44
Show Gist options
  • Save khun84/15c6e16558602801d28b31ff29caff0b to your computer and use it in GitHub Desktop.
Save khun84/15c6e16558602801d28b31ff29caff0b to your computer and use it in GitHub Desktop.
Nodejs snippet
- sleep
- pipeline
- parallel task
class TaskQueuePC {
constructor(maxConcurrency) {
this.maxConcurrency = maxConcurrency;
this.consumerQueue = [];
this.taskQueue = [];
for (let i = 0; i < maxConcurrency; i++) {
this.consumer()
}
}
async consumer() {
while (true) {
try {
const task = await this.getNextTask()
await task();
} catch (e) {
console.error(e);
}
}
}
async getNextTask() {
// The purpose of introducing a Promise here is that we need a "resolve" callback to achieve a sleep effect where there is not task
// The resolve callback will be pushed into the consumerQueue and this method caller will keep waiting until the resolve callback is being "wake up" to perform a task
return new Promise((resolve, reject) => {
if (this.taskQueue.length === 0) {
this.consumerQueue.push(resolve);
} else {
return resolve(this.taskQueue.shift());
}
});
}
// When producer submit task, we should not block the event loop.
// In fact, we'll just return a promise and allow the caller to register callback on the promise.
// So from here we can see can the caller can create a clean task and express interest on the task result by registering callback on the promise returned by #runTask.
runTask(task) {
return new Promise((resolve, reject) => {
const taskWrapper = () => {
const taskPromise = task();
taskPromise.then(resolve, reject);
return taskPromise;
}
if (this.consumerQueue.length !== 0) {
const consumer = this.consumerQueue.shift();
consumer(taskWrapper);
} else {
this.taskQueue.push(taskWrapper);
}
})
}
}
module.paths.push('/Users/Daniel/.nvm/versions/node/v14.17.1/lib/node_modules');
const { createReadStream, createWriteStream } = require('fs');
const { parse } = require('csv-parse');
const { Transform, pipeline } = require('stream');
const { format: csvFormat, writeToString } = require('fast-csv');
const myOutputStream = createWriteStream('my-free-credits-2.csv');
const phOutputStream = createWriteStream('ph-free-credits-2.csv');
const idOutputStream = createWriteStream('id-free-credits-2.csv');
const sgOutputStream = createWriteStream('sg-free-credits-2.csv');
const streamMapper = {
my: myOutputStream,
ph: phOutputStream,
id: idOutputStream,
sg: sgOutputStream
};
// headers = ['country', 'vendor_id', 'name', 'paid_credit_balance', 'free', 'latest_quoted_at', 'latest_booked_at', 'inactive_since_dec'];
class CsvWriter extends Transform {
constructor(opts = {}) {
opts.objectMode = true;
super(opts);
this.headersWritten = {
my: false,
id: false,
ph: false,
sg: false
};
}
async _transform(row, enc, done) {
let csvString = await writeToString([row]);
csvString = csvString + "\n";
if (!this.headersWritten[row.country.toLowerCase()]) {
const headers = [];
Object.entries(row).forEach((pair) => {
const [colname, _] = pair;
headers.push(colname);
});
csvString = [headers.join(','), csvString].join("\n");
this.headersWritten[row.country.toLowerCase()] = true;
}
streamMapper[row.country.toLowerCase()].write(Buffer.from(csvString, enc));
done();
}
}
const inputStream = createReadStream('source-file-path');
const csvParser = parse({ columns: true });
pipeline(
inputStream,
csvParser,
(new CsvWriter()).on('end', () => {
for (const [_, streamer] of Object.entries(streamMapper)) {
streamer.close();
}
}),
(err) => {
if (err) {
console.log(err);
for (const [_, streamer] of Object.entries(streamMapper)) {
streamer.close();
}
}
}
);
async function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
console.log('start');
await sleep(1000).then(() => {
console.log('after sleep');});
console.log('end');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment