Skip to content

Instantly share code, notes, and snippets.

@peterpeerdeman
Last active February 26, 2024 15:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save peterpeerdeman/510232905d333c970947f88bbc41b964 to your computer and use it in GitHub Desktop.
Save peterpeerdeman/510232905d333c970947f88bbc41b964 to your computer and use it in GitHub Desktop.
influx-to-prophet-workinprogress.js
"use strict";
const Influx = require("influxdb-nodejs");
const axios = require("axios").default;
const INPUT_MEASUREMENT = "pvstatus";
const OUTPUT_MEASUREMENT = "pvstatus_predictions";
function writeOriginalsToInflux(influxClient, pair) {
if (!pair[1]) return;
const fields = {
powerGeneration: pair[1],
};
influxClient
.write(OUTPUT_MEASUREMENT)
.tag("origin", "originaldata")
.field(fields)
.time(new Date(pair[0]).getTime() * 1000 * 1000)
.queue();
}
async function predict(event, context) {
console.log("openfaas influx-to-prophet running");
const influxClient = new Influx("http://localhost:8086/pv");
// let query = influxClient
// .query('pvstatus')
// .addFunction('mean(energyGeneration)')
// .addGroup('time', '1d')
// .addGroup('time', '1d')
const result = await influxClient.queryRaw(
`SELECT mean("powerGeneration") FROM "${INPUT_MEASUREMENT}" WHERE time > now() - 104w GROUP BY time(1d) fill(linear) tz('Europe/Amsterdam')`
);
const values = result.results[0].series[0].values;
let ds = [];
let y = [];
let p = 365;
values.forEach((pair) => {
ds.push(pair[0].split("T")[0]);
y.push(pair[1]);
writeOriginalsToInflux(influxClient, pair);
});
const data = {
ds: ds,
y: y,
p: p,
};
// send data to https://github.com/peterpeerdeman/rasprophet-prophet-rest-service
const prediction = await axios.post("http://localhost:5000", data);
for (const key in prediction.data.ds) {
const fields = {
additive_terms: prediction.data.additive_terms[key],
additive_terms_lower: prediction.data.additive_terms_lower[key],
additive_terms_upper: prediction.data.additive_terms_upper[key],
multiplicative_terms: prediction.data.multiplicative_terms[key],
multiplicative_terms_lower:
prediction.data.multiplicative_terms_lower[key],
multiplicative_terms_upper:
prediction.data.multiplicative_terms_upper[key],
trend: prediction.data.trend[key],
trend_lower: prediction.data.trend_lower[key],
trend_upper: prediction.data.trend_upper[key],
weekly: prediction.data.weekly[key],
weekly_lower: prediction.data.weekly_lower[key],
weekly_upper: prediction.data.weekly_upper[key],
yhat: prediction.data.yhat[key],
yhat_lower: prediction.data.yhat_lower[key],
yhat_upper: prediction.data.yhat_upper[key],
};
influxClient
.write(OUTPUT_MEASUREMENT)
.tag("origin", "prophetprediction")
.field(fields)
.time(new Date(prediction.data.ds[key]).getTime() * 1000 * 1000)
.queue();
}
influxClient
.syncWrite()
.then(() =>
console.debug(
`${Date.now()} influx-to-prophet: influx write point success`
)
)
.catch((error) =>
console.debug(`${Date.now()} influx-to-prophet: write failed ${error}`)
);
}
predict();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment