Last active
February 26, 2024 15:03
-
-
Save peterpeerdeman/510232905d333c970947f88bbc41b964 to your computer and use it in GitHub Desktop.
influx-to-prophet-workinprogress.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const Influx = require("influxdb-nodejs"); | |
const axios = require("axios").default; | |
const INPUT_MEASUREMENT = "pvstatus"; | |
const OUTPUT_MEASUREMENT = "pvstatus_predictions"; | |
function writeOriginalsToInflux(influxClient, pair) { | |
if (!pair[1]) return; | |
const fields = { | |
powerGeneration: pair[1], | |
}; | |
influxClient | |
.write(OUTPUT_MEASUREMENT) | |
.tag("origin", "originaldata") | |
.field(fields) | |
.time(new Date(pair[0]).getTime() * 1000 * 1000) | |
.queue(); | |
} | |
async function predict(event, context) { | |
console.log("openfaas influx-to-prophet running"); | |
const influxClient = new Influx("http://localhost:8086/pv"); | |
// let query = influxClient | |
// .query('pvstatus') | |
// .addFunction('mean(energyGeneration)') | |
// .addGroup('time', '1d') | |
// .addGroup('time', '1d') | |
const result = await influxClient.queryRaw( | |
`SELECT mean("powerGeneration") FROM "${INPUT_MEASUREMENT}" WHERE time > now() - 104w GROUP BY time(1d) fill(linear) tz('Europe/Amsterdam')` | |
); | |
const values = result.results[0].series[0].values; | |
let ds = []; | |
let y = []; | |
let p = 365; | |
values.forEach((pair) => { | |
ds.push(pair[0].split("T")[0]); | |
y.push(pair[1]); | |
writeOriginalsToInflux(influxClient, pair); | |
}); | |
const data = { | |
ds: ds, | |
y: y, | |
p: p, | |
}; | |
// send data to https://github.com/peterpeerdeman/rasprophet-prophet-rest-service | |
const prediction = await axios.post("http://localhost:5000", data); | |
for (const key in prediction.data.ds) { | |
const fields = { | |
additive_terms: prediction.data.additive_terms[key], | |
additive_terms_lower: prediction.data.additive_terms_lower[key], | |
additive_terms_upper: prediction.data.additive_terms_upper[key], | |
multiplicative_terms: prediction.data.multiplicative_terms[key], | |
multiplicative_terms_lower: | |
prediction.data.multiplicative_terms_lower[key], | |
multiplicative_terms_upper: | |
prediction.data.multiplicative_terms_upper[key], | |
trend: prediction.data.trend[key], | |
trend_lower: prediction.data.trend_lower[key], | |
trend_upper: prediction.data.trend_upper[key], | |
weekly: prediction.data.weekly[key], | |
weekly_lower: prediction.data.weekly_lower[key], | |
weekly_upper: prediction.data.weekly_upper[key], | |
yhat: prediction.data.yhat[key], | |
yhat_lower: prediction.data.yhat_lower[key], | |
yhat_upper: prediction.data.yhat_upper[key], | |
}; | |
influxClient | |
.write(OUTPUT_MEASUREMENT) | |
.tag("origin", "prophetprediction") | |
.field(fields) | |
.time(new Date(prediction.data.ds[key]).getTime() * 1000 * 1000) | |
.queue(); | |
} | |
influxClient | |
.syncWrite() | |
.then(() => | |
console.debug( | |
`${Date.now()} influx-to-prophet: influx write point success` | |
) | |
) | |
.catch((error) => | |
console.debug(`${Date.now()} influx-to-prophet: write failed ${error}`) | |
); | |
} | |
predict(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment