Skip to content

Instantly share code, notes, and snippets.

@hyteckit
Forked from bemasher/README.md
Last active October 5, 2017 01:05
Show Gist options
  • Save hyteckit/b02842933acb48c7d002d9b6927f4f59 to your computer and use it in GitHub Desktop.
Save hyteckit/b02842933acb48c7d002d9b6927f4f59 to your computer and use it in GitHub Desktop.
A simple system for collecting and writing differential consumption values to an influxdb database.
[Unit]
Description=Software Defined Radio TCP Server
Wants=network.target
After=network.target
[Service]
ExecStart=/usr/bin/rtl_tcp
Restart=always
[Install]
WantedBy=default.target
RTLAMR_SYMBOLLENGTH=8
RTLAMR_CENTERFREQ=909586111
RTLAMR_FORMAT=json
RTLAMR_MSGTYPE=idm
#RTLAMR_FILTERID=00000000
package main
import (
"bufio"
"encoding/json"
"log"
"os"
"strconv"
"time"
"github.com/influxdata/influxdb/client/v2"
)
const (
dbName = "meter"
host = "http://localhost:8086"
username = "username"
password = "password"
)
type Message struct {
IDM IDM `json:"Message"`
}
// IDM embedded in rtlamr JSON output.
type IDM struct {
EndPointType byte `json:"ERTType"`
EndPointID uint32 `json:"ERTSerialNumber"`
Consumption uint32 `json:"LastConsumptionCount"`
IntervalCount byte `json:"ConsumptionIntervalCount"`
Intervals []uint16 `json:"DifferentialConsumptionIntervals"`
}
type LSTATE struct {
lastIntervalSet bool
lastInterval byte
prevTime time.Time
}
// Generate tags.
func (idm IDM) Tags() map[string]string {
return map[string]string{
"endpoint_type": strconv.Itoa(int(idm.EndPointType)),
"endpoint_id": strconv.Itoa(int(idm.EndPointID)),
}
}
// Output the consumption in Wh (output is expected to be 100ths of kWh).
func (idm IDM) Fields(idx int) map[string]interface{} {
return map[string]interface{}{
"consumption": float64(idm.Intervals[idx]) * 10,
}
}
func init() {
log.SetFlags(log.Lshortfile | log.Lmicroseconds)
}
func main() {
c, err := client.NewHTTPClient(client.HTTPConfig{
Addr: host,
Username: username,
Password: password,
})
if err != nil {
log.Fatalln(err)
}
stdinBuf := bufio.NewScanner(os.Stdin)
var meter = make(map[uint32]LSTATE);
// For each message rtlamr outputs, one per line.
for stdinBuf.Scan() {
// Parse the message.
var msg Message
err := json.Unmarshal(stdinBuf.Bytes(), &msg)
if err != nil {
log.Println(err)
continue
}
idm := msg.IDM
// If IntervalCount is not set for meter with EndPointID,
// initialize that meter with LSTATE from idm data
if !meter[idm.EndPointID].lastIntervalSet {
meter[idm.EndPointID] = LSTATE{true, idm.IntervalCount, time.Now()}
}
log.Printf("%+v\n", idm)
// If IntervalCount has changed for meter with EndPointID,
// write consumption data for that meter to influxdb
if meter[idm.EndPointID].lastInterval != idm.IntervalCount {
nowTime := time.Now()
prevTime := meter[idm.EndPointID].prevTime
meter[idm.EndPointID] = LSTATE{true, idm.IntervalCount, nowTime}
d := nowTime.Sub(prevTime);
// Create a new batch of points to send to influxdb.
bp, err := client.NewBatchPoints(client.BatchPointsConfig{
Database: dbName,
Precision: "ms",
})
pt, err := client.NewPoint("power", idm.Tags(), idm.Fields(0), nowTime)
if err != nil {
log.Println(err)
} else {
bp.AddPoint(pt)
}
// Write batch of points to influxdb.
err = c.Write(bp)
if err != nil {
log.Println(err)
}
// Log the received message.
log.Printf("Meter: %+v\n", meter[idm.EndPointID])
log.Printf("%d: CHANGED. Old Count: %d, New Count: %d. Time between change: %s\n\n", idm.EndPointID, meter[idm.EndPointID].lastInterval, idm.IntervalCount, d)
} else {
log.Printf("No Change. Interval Count: %d. Prev Time: %s\n\n", idm.IntervalCount, meter[idm.EndPointID].prevTime)
}
}
}
[Unit]
Description=rtlamrrd
BindsTo=rtl_tcp.service
After=rtl_tcp.service
[Service]
EnvironmentFile=/home/username/.config/systemd/user/rtlamr.env
ExecStart=/bin/sh -c '/home/username/bin/rtlamr | /home/username/bin/rtlamrrd'
Restart=always
RestartSec=30
[Install]
WantedBy=default.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment