Created
May 1, 2019 05:19
-
-
Save lucasjellema/81d17d36f7d8c31f227bb2d42f9d1c53 to your computer and use it in GitHub Desktop.
ES 2018 Demonstration of Promises, Asynchronous Generators and Pipelining (requires Node 10 or higher) - Time Windowed Aggregates
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const sleep = (milliseconds) => { | |
return new Promise(resolve => setTimeout(resolve, Math.floor(milliseconds))) | |
} | |
const lg = (msg) => { | |
const d = new Date() | |
console.log(`${d.getSeconds()}.${Math.round(d.getMilliseconds() / 100)} - ${msg}`) | |
} | |
// each sensor has a slightly randomized timeput period, suggesting a different and somewhat varying production rate of measurements | |
// each sensor produces temperature measurements around an average value with a certain variation. | |
const sensorOneTimeOut = 600 | |
sensorOne = async function () { | |
return sleep((0.7 + 0.6 * Math.random()) * sensorOneTimeOut).then(() => { return { sensor: 'sensor-one', temperature: 295 + 4.3 * (0.7 + 0.6 * (Math.random())) } }) | |
} | |
const sensorTwoTimeOut = 750 | |
sensorTwo = async function () { | |
return sleep((0.4 + 0.6 * Math.random()) * sensorTwoTimeOut).then(() => { return { sensor: 'sensor-two', temperature: 293 + 3.3 * (0.7 + 0.6 * (Math.random())) } }) | |
} | |
const sensorThreeTimeOut = 2250 | |
sensorThree = async function () { | |
return sleep((0.5 + 0.4 * Math.random()) * sensorThreeTimeOut).then(() => { return { sensor: 'sensor-three', temperature: 297 + 6.3 * (0.6 + 0.4 * (Math.random())) } }) | |
} | |
// the sensor Promise to resolve sets its identification and value in lastestSensor an latestValue for processing in function sensorValues | |
var latestValue | |
var latestSensor | |
// the sensorPool contains the sensor promises for all sensors we are listening to. | |
const sensorPool = new Set() | |
addSensor = function (s) { | |
const sensorPromise = s() | |
sensorPool.add(sensorPromise) | |
sensorPromise.then(value => { | |
latestValue = value; | |
latestSensor = s | |
sensorPool.delete(sensorPromise); | |
}); | |
} //addSensor | |
// the cache of recorded values used by the time windowed moving average function | |
// the values are injected in function runningSensorAverages | |
// and used in function timeWindowedAggregates | |
var recordedValues = {} | |
sensorValues = async function* () { | |
addSensor(sensorOne) | |
addSensor(sensorTwo) | |
addSensor(sensorThree) | |
// here we do a fan in - all sensor measurements streams are bundled into a single stream using the Promise.race() on a Pool of Promise returning functions | |
while (true) { | |
await Promise.race([...sensorPool]); | |
yield latestValue; | |
// add a promise for the sensor that just fired back to the pool | |
addSensor(latestSensor) | |
}// neverending while | |
}// sensorValues | |
timeWindowedAggregates = async function* (timeWindow) { | |
while (true) { | |
await sleep(timeWindow) | |
// wake up after timeWindow has elapsed; calculated average over values collected per sensor; drop values and continue | |
for (var sensor in recordedValues) { | |
var sum = recordedValues[sensor].reduce((a, b) => a + b, 0) | |
var avg = sum / recordedValues[sensor].length | |
// reset values array for next time window (for now assume non-overlapping time windows) | |
recordedValues[sensor] = [] | |
yield { sensor: sensor, timestamp: Date.now(), average: avg } | |
} // for | |
} // while | |
}// timedWindowAggregates | |
// windowSize defines the number of values used for the calculation of the average | |
// period defines the number of values after which a new value should be produced | |
runningSensorAverages = async function* (sensorReadings, windowSize, period) { | |
var sensors = {} | |
// sensors are objects (mapped with sensor-id) with these properties: | |
// ticks since last production, values (array) | |
for await (sensorReading of sensorReadings) { | |
//lg(`Sensor Reading received ${JSON.stringify(sensorReading)}`) | |
// store sensorReading in recordedValues (for timeWindowedAggregates) | |
if (!recordedValues[sensorReading.sensor]) recordedValues[sensorReading.sensor] = [] | |
recordedValues[sensorReading.sensor].push(sensorReading.temperature) | |
// retain latest sensor reading and calculate running aggregates whenever the period has passed | |
if (!sensors[sensorReading.sensor]) sensors[sensorReading.sensor] = { ticks: 0, values: [] } | |
var sensorRecord = sensors[sensorReading.sensor] | |
sensorRecord.ticks++ | |
sensorRecord.values.push(sensorReading.temperature) | |
// if it is time again to deliver the goods... | |
if (sensorRecord.ticks == period) { | |
sensorRecord.ticks = 0 | |
if (sensorRecord.values.length >= windowSize) { | |
var windowValues = sensorRecord.values.slice(sensorRecord.values.length - windowSize) | |
var sum = windowValues.reduce((a, b) => a + b, 0) | |
var avg = sum / windowSize | |
sensorRecord.values = sensorRecord.values.slice(sensorRecord.values.length - windowSize) | |
yield { sensor: sensorReading.sensor, average: avg, max: Math.max(...windowValues) } | |
} | |
} | |
} | |
}// runningSensorAverages | |
filterOutliersFromSensorReadings = async function* (sensorReadings) { | |
for await (sensorReading of sensorReadings) { | |
if (sensorReading.temperature < 273 || sensorReading.sensor == 'sensor-x') | |
lg(`Sensor Reading filtered ${JSON.stringify(sensorReading)}`); //filter this reading | |
else | |
yield sensorReading | |
}// for sensorReadings | |
}// filterOutliersFromSensorReadings | |
// produce runningAverage over the sensorValues() once every 10 readings, using 15 readings to calculate the average | |
doIt = async function () { | |
for await (runningAverage of runningSensorAverages(filterOutliersFromSensorReadings( sensorValues()), 15, 10)) { | |
lg(`Running Average for Sensor received ${JSON.stringify(runningAverage)}`) | |
} | |
} | |
// produce timeWindowed averages over the over 6 second time windows (using the readings collected by runningSensorAverages and stored in recordedValues) | |
doIt2 = async function () { | |
for await (timedWindowAggregate of timeWindowedAggregates(6000)) { | |
lg(`Timed Window Average for Sensor received ${JSON.stringify(timedWindowAggregate)}`) | |
} | |
} | |
doIt() | |
doIt2() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment