Skip to content

Instantly share code, notes, and snippets.

@TerryE
Last active December 7, 2023 15:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TerryE/523c5efd77b2e942644ea7486a650201 to your computer and use it in GitHub Desktop.
Save TerryE/523c5efd77b2e942644ea7486a650201 to your computer and use it in GitHub Desktop.
Node-RED Function to collect Octopus meter readings and day-ahead half-hourly rates
/*
* This function is more of a classical batch style "all-in" process to Query the Developer
* REST API and load Octopus data into MySQL tables. The mysql2 and got modules are used in
* async mode to implement this.
*/
//------------ Functions
// Functions to convert ISOdate string 'YYYY-MM-DDTHH:mm:ss.000Z' <=>
// MySQL 'YYYY-MM-DD HH:mm:ss' datetime format
const toISOdate = d => (new Date(d)).toISOString().replace('.000', '');
const toSQLdate = d => d.slice(0, 19).replace('T', ' ');
const nextSlot = (ds) => {
let d = new Date(ds); // The Date constructor tolerates MySQL dates
d.setTime(d.getTime() + (30 * 60 * 1000));
return d.toISOString().replace('.000', '');
};
//------------
const connectToDB = async () => {
return await mysql2.createConnectionPromise({
database: global.get('DBname'), user: global.get('DBuser'), password: global.get('DBpassword'),
multipleStatements: true, dateStrings: true,
});
};
const loadConfigSettings = async db => {
const sql = "SELECT item, value FROM config_settings WHERE category='octopus'";
const rs = (await db.query(sql))[0];
const s = {}
rs.forEach(e => { s[e.item] = e.value; });
return s;
};
const getLatestAgileRates = async (db, url) => {
const rs = (await db.query('SELECT MAX(dts) AS last FROM agile_rates'))[0][0];
const opts = { headers: {}, url: url + '?period_from=' + nextSlot(rs.last) };
const rates = [];
const pushRate = r => { rates.push([r.valid_from, r.value_exc_vat]); };
do { // Note that this might be an empty return or multi-page, hence to loop
const res = await got(opts).json();
opts.url = res.next;
res.results.forEach(pushRate);
} while (opts.url);
if (rates.length === 0) { return; }
// Convert dates to MySQL format and sort into date order
rates.sort((a, b) => a[0].localeCompare(b[0]));
rates.forEach(e => { e[0] = toSQLdate(e[0]); });
await db.query('INSERT INTO agile_rates VALUES ?;', [rates]);
};
const getLatestMeterReadings = async (db, url, key) => {
// Use the Octopus API to read any new comsumption data since last download. Note that
// this processs complicated the DDC <=> Octopus data pull can leave gaps in the data
// that must be retried on a subsequent pull. Since the DCC pull runs daily, these gaps
// are usually complete day values. To keep processing simple, only whole days of
// readings are requested and then only complete days (48 half-hour) are saved. This
// means that limit download requests to those dates where we have rates, but no readings
// yet. Requesting the data in date-aligned 48 slot chunks avoids needing to process
// gaps in the data or needing to handle multi-page returns
const sql = // This horrible bit of SQL efficiently returns missing dates in readings
`SELECT DISTINCT DATE(a.dts) AS "date"
FROM agile_rates a LEFT JOIN meter_readings m ON a.dts = m.dts
WHERE m.dts IS NULL and a.dts > DATE_SUB(CURDATE(), INTERVAL 28 DAY);`;
const days = (await db.query(sql))[0].map(d => d.date);
days.pop(); // Discard the last day since there will never be readings for this yet
const opts = { headers: { Authorization: 'Basic ' + Buffer.from(`${key}:`).toString('base64') } };
const readings = [];
const savedDays = [];
for (const day of days) {
opts.url = url + `?period_from=${day}T00:00Z&period_to=${day}T23:30Z`;
const res = await got(opts).json();
if (res.count < 48) { continue; } // Ignore part day returns
for (const r of res.results) {
readings.push([toSQLdate(r.interval_start), r.consumption]);
}
savedDays.push(day)
}
if (readings.length === 0) { return [[], []]; }
// Sort readings into ascending time order and store in meter_readings table
readings.sort((a, b) => a[0].localeCompare(b[0]));
await db.query('INSERT INTO meter_readings VALUES ?;', [readings]);
return [savedDays, readings]; // msg.payload = `${readings.length} meter readings`;
}
const computeDailyPrices = async (db, days, readings) => {
if (readings.length === 0) { return; }
// Cache the Agile rates for these days so we can compute actual daily price
let sql = 'SELECT * FROM agile_rates WHERE date(dts) in (?);';
// Read Agile rates and convert into dictionary dts->unit_price
const rate = {};
for (const r of (await db.query(sql, [days]))[0]) {
rate[r.dts] = r.price;
}
// Accumulate reading×price by date
const agg = days.reduce((o, d) => { o[d] = [0, 0]; return o; }, {});
for (const r of readings) {
const day = r[0].substring(0, 10);
agg[day][0] += Math.round(r[1] * rate[r[0]]) / 100;
agg[day][1] += r[1];
}
const rows = days.map(d => [d, agg[d][0], agg[d][1]]);
await db.query("INSERT INTO daily_readings(dts, cost, Duse) VALUES ?", [rows]);
};
let db;
try {
db = await connectToDB();
const s = await loadConfigSettings(db);
const consumptionURL = `https://${s.HOST}/v1/electricity-meter-points/${s.MPAN}/meters/${s.MSN}/consumption/`;
const agileRateURL = `https://${s.HOST}/v1/products//${s.TARIFF}/electricity-tariffs/E-1R-${s.TARIFF}-${s.GSP}/standard-unit-rates/`;
await getLatestAgileRates(db, agileRateURL);
const [days, readings] = await getLatestMeterReadings(db, consumptionURL, s.API_KEY);
await computeDailyPrices(db, days, readings);
node.send({ payload: `${readings.length} readings processed` });
} catch (e) {
node.warn(e.stack); return null;
}
try { db.end(); } catch (e) { }
node.done();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment