Skip to content

Instantly share code, notes, and snippets.

@KeKs0r
Created September 20, 2018 11:04
Show Gist options
  • Save KeKs0r/f44834378254b7a6a1a953759467648d to your computer and use it in GitHub Desktop.
Save KeKs0r/f44834378254b7a6a1a953759467648d to your computer and use it in GitHub Desktop.
const fs = require('fs');
const path = require('path');
const { DataStream } = require('scramjet');
const JSONStream = require('JSONStream');
const jsonfile = require('jsonfile');
const simulate = require('../../src/simulation/simulate');
const cpus = require('os').cpus().length * 2;
const filePath = (num, type) => path.join(__dirname, `./prep-data/sim/mod-ay_${num}${type ? `-${type}` : ''}.json`);
// const fileIn = filePath('01', 'slice');
// const fileOut = filePath('01', 'preview-res');
const fileIn = filePath('01');
const fileOut = filePath('01', 'res');
const readStream = fs.createReadStream(fileIn, 'utf8');
// const outStream = fs.createWriteStream(fileOut, 'utf8');
let lastUser = '0';
let lastDate = 0;
function validate(event) {
const isValid = lastUser < event.device_uuid
|| (lastUser === event.device_uuid
&& (lastDate < event.client_time || lastDate === event.client_time));
if (!isValid) {
throw new Error('Ordering seems to be wrong');
}
lastUser = event.device_uuid;
lastDate = event.client_time;
}
const makeRemap = () => {
let currentUser;
let currentUserEvents = [];
return (emit, event) => {
if (!currentUser) {
currentUser = event.device_uuid;
}
if (currentUser !== event.device_uuid) {
emit(currentUserEvents);
currentUserEvents = [];
currentUser = event.device_uuid;
}
currentUserEvents.push(event);
};
};
const reduceSum = (sum, user) => {
Object.keys(user).map((k) => {
if (!sum[k]) {
sum[k] = user[k];
} else {
sum[k] += user[k];
}
});
return sum;
};
const writeToFile = res => jsonfile.writeFile(fileOut, res);
const result = {};
const subprocess = stream => stream
.each((e) => {
console.log(e);
})
.map(userEvents => simulate(userEvents, true).reduce(reduceSum, {}));
console.log(fileIn, '->', fileOut);
console.time('process');
const nodeStream = readStream.pipe(JSONStream.parse());
DataStream.from(nodeStream)
.each(validate)
.catch(console.error)
.remap(makeRemap())
.distribute(cpus, subprocess)
.reduce(reduceSum, result)
.then(writeToFile)
.then(() => console.log(result))
.catch(console.error)
.then(() => console.timeEnd('process'));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment