Skip to content

Instantly share code, notes, and snippets.

@lucasjellema
Created May 1, 2019 05:19
Show Gist options
  • Save lucasjellema/81d17d36f7d8c31f227bb2d42f9d1c53 to your computer and use it in GitHub Desktop.
Save lucasjellema/81d17d36f7d8c31f227bb2d42f9d1c53 to your computer and use it in GitHub Desktop.
ES 2018 Demonstration of Promises, Asynchronous Generators and Pipelining (requires Node 10 or higher) - Time Windowed Aggregates
const sleep = (milliseconds) => {
return new Promise(resolve => setTimeout(resolve, Math.floor(milliseconds)))
}
const lg = (msg) => {
const d = new Date()
console.log(`${d.getSeconds()}.${Math.round(d.getMilliseconds() / 100)} - ${msg}`)
}
// each sensor has a slightly randomized timeput period, suggesting a different and somewhat varying production rate of measurements
// each sensor produces temperature measurements around an average value with a certain variation.
const sensorOneTimeOut = 600
sensorOne = async function () {
return sleep((0.7 + 0.6 * Math.random()) * sensorOneTimeOut).then(() => { return { sensor: 'sensor-one', temperature: 295 + 4.3 * (0.7 + 0.6 * (Math.random())) } })
}
const sensorTwoTimeOut = 750
sensorTwo = async function () {
return sleep((0.4 + 0.6 * Math.random()) * sensorTwoTimeOut).then(() => { return { sensor: 'sensor-two', temperature: 293 + 3.3 * (0.7 + 0.6 * (Math.random())) } })
}
const sensorThreeTimeOut = 2250
sensorThree = async function () {
return sleep((0.5 + 0.4 * Math.random()) * sensorThreeTimeOut).then(() => { return { sensor: 'sensor-three', temperature: 297 + 6.3 * (0.6 + 0.4 * (Math.random())) } })
}
// the sensor Promise to resolve sets its identification and value in lastestSensor an latestValue for processing in function sensorValues
var latestValue
var latestSensor
// the sensorPool contains the sensor promises for all sensors we are listening to.
const sensorPool = new Set()
addSensor = function (s) {
const sensorPromise = s()
sensorPool.add(sensorPromise)
sensorPromise.then(value => {
latestValue = value;
latestSensor = s
sensorPool.delete(sensorPromise);
});
} //addSensor
// the cache of recorded values used by the time windowed moving average function
// the values are injected in function runningSensorAverages
// and used in function timeWindowedAggregates
var recordedValues = {}
sensorValues = async function* () {
addSensor(sensorOne)
addSensor(sensorTwo)
addSensor(sensorThree)
// here we do a fan in - all sensor measurements streams are bundled into a single stream using the Promise.race() on a Pool of Promise returning functions
while (true) {
await Promise.race([...sensorPool]);
yield latestValue;
// add a promise for the sensor that just fired back to the pool
addSensor(latestSensor)
}// neverending while
}// sensorValues
timeWindowedAggregates = async function* (timeWindow) {
while (true) {
await sleep(timeWindow)
// wake up after timeWindow has elapsed; calculated average over values collected per sensor; drop values and continue
for (var sensor in recordedValues) {
var sum = recordedValues[sensor].reduce((a, b) => a + b, 0)
var avg = sum / recordedValues[sensor].length
// reset values array for next time window (for now assume non-overlapping time windows)
recordedValues[sensor] = []
yield { sensor: sensor, timestamp: Date.now(), average: avg }
} // for
} // while
}// timedWindowAggregates
// windowSize defines the number of values used for the calculation of the average
// period defines the number of values after which a new value should be produced
runningSensorAverages = async function* (sensorReadings, windowSize, period) {
var sensors = {}
// sensors are objects (mapped with sensor-id) with these properties:
// ticks since last production, values (array)
for await (sensorReading of sensorReadings) {
//lg(`Sensor Reading received ${JSON.stringify(sensorReading)}`)
// store sensorReading in recordedValues (for timeWindowedAggregates)
if (!recordedValues[sensorReading.sensor]) recordedValues[sensorReading.sensor] = []
recordedValues[sensorReading.sensor].push(sensorReading.temperature)
// retain latest sensor reading and calculate running aggregates whenever the period has passed
if (!sensors[sensorReading.sensor]) sensors[sensorReading.sensor] = { ticks: 0, values: [] }
var sensorRecord = sensors[sensorReading.sensor]
sensorRecord.ticks++
sensorRecord.values.push(sensorReading.temperature)
// if it is time again to deliver the goods...
if (sensorRecord.ticks == period) {
sensorRecord.ticks = 0
if (sensorRecord.values.length >= windowSize) {
var windowValues = sensorRecord.values.slice(sensorRecord.values.length - windowSize)
var sum = windowValues.reduce((a, b) => a + b, 0)
var avg = sum / windowSize
sensorRecord.values = sensorRecord.values.slice(sensorRecord.values.length - windowSize)
yield { sensor: sensorReading.sensor, average: avg, max: Math.max(...windowValues) }
}
}
}
}// runningSensorAverages
filterOutliersFromSensorReadings = async function* (sensorReadings) {
for await (sensorReading of sensorReadings) {
if (sensorReading.temperature < 273 || sensorReading.sensor == 'sensor-x')
lg(`Sensor Reading filtered ${JSON.stringify(sensorReading)}`); //filter this reading
else
yield sensorReading
}// for sensorReadings
}// filterOutliersFromSensorReadings
// produce runningAverage over the sensorValues() once every 10 readings, using 15 readings to calculate the average
doIt = async function () {
for await (runningAverage of runningSensorAverages(filterOutliersFromSensorReadings( sensorValues()), 15, 10)) {
lg(`Running Average for Sensor received ${JSON.stringify(runningAverage)}`)
}
}
// produce timeWindowed averages over the over 6 second time windows (using the readings collected by runningSensorAverages and stored in recordedValues)
doIt2 = async function () {
for await (timedWindowAggregate of timeWindowedAggregates(6000)) {
lg(`Timed Window Average for Sensor received ${JSON.stringify(timedWindowAggregate)}`)
}
}
doIt()
doIt2()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment