-
-
Save KeKs0r/f44834378254b7a6a1a953759467648d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const fs = require('fs'); | |
const path = require('path'); | |
const { DataStream } = require('scramjet'); | |
const JSONStream = require('JSONStream'); | |
const jsonfile = require('jsonfile'); | |
const simulate = require('../../src/simulation/simulate'); | |
const cpus = require('os').cpus().length * 2; | |
const filePath = (num, type) => path.join(__dirname, `./prep-data/sim/mod-ay_${num}${type ? `-${type}` : ''}.json`); | |
// const fileIn = filePath('01', 'slice'); | |
// const fileOut = filePath('01', 'preview-res'); | |
const fileIn = filePath('01'); | |
const fileOut = filePath('01', 'res'); | |
const readStream = fs.createReadStream(fileIn, 'utf8'); | |
// const outStream = fs.createWriteStream(fileOut, 'utf8'); | |
let lastUser = '0'; | |
let lastDate = 0; | |
function validate(event) { | |
const isValid = lastUser < event.device_uuid | |
|| (lastUser === event.device_uuid | |
&& (lastDate < event.client_time || lastDate === event.client_time)); | |
if (!isValid) { | |
throw new Error('Ordering seems to be wrong'); | |
} | |
lastUser = event.device_uuid; | |
lastDate = event.client_time; | |
} | |
const makeRemap = () => { | |
let currentUser; | |
let currentUserEvents = []; | |
return (emit, event) => { | |
if (!currentUser) { | |
currentUser = event.device_uuid; | |
} | |
if (currentUser !== event.device_uuid) { | |
emit(currentUserEvents); | |
currentUserEvents = []; | |
currentUser = event.device_uuid; | |
} | |
currentUserEvents.push(event); | |
}; | |
}; | |
const reduceSum = (sum, user) => { | |
Object.keys(user).map((k) => { | |
if (!sum[k]) { | |
sum[k] = user[k]; | |
} else { | |
sum[k] += user[k]; | |
} | |
}); | |
return sum; | |
}; | |
const writeToFile = res => jsonfile.writeFile(fileOut, res); | |
const result = {}; | |
const subprocess = stream => stream | |
.each((e) => { | |
console.log(e); | |
}) | |
.map(userEvents => simulate(userEvents, true).reduce(reduceSum, {})); | |
console.log(fileIn, '->', fileOut); | |
console.time('process'); | |
const nodeStream = readStream.pipe(JSONStream.parse()); | |
DataStream.from(nodeStream) | |
.each(validate) | |
.catch(console.error) | |
.remap(makeRemap()) | |
.distribute(cpus, subprocess) | |
.reduce(reduceSum, result) | |
.then(writeToFile) | |
.then(() => console.log(result)) | |
.catch(console.error) | |
.then(() => console.timeEnd('process')); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment