Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save filipecosta90/4325150c346e31365938d863c11d7fd0 to your computer and use it in GitHub Desktop.
Save filipecosta90/4325150c346e31365938d863c11d7fd0 to your computer and use it in GitHub Desktop.
Code snippet addressing question: https://github.com/RedisTimeSeries/redistimeseries-go/issues/63. Usage example (for cluster use --cluster-mode): ./radix-redistimeseries-example --host localhost:20002
package main
import (
"flag"
"fmt"
"github.com/mediocregopher/radix/v3"
"log"
"strconv"
"time"
)
// Code snippet addressing question: https://github.com/RedisTimeSeries/redistimeseries-go/issues/63
// Usage example (for cluster use --cluster-mode): ./radix-redistimeseries-example --host localhost:20002
// [1 1]
// (...)
// [10 10]
// temperature:{area_32}:1
// map[area_id:32 sensor_id:1]
// map[1:2 2:4]
// temperature:{area_32}:2
// map[area_id:32 sensor_id:2]
// map[1:2 2:4]
// Program option vars:
var (
host string
poolPipelineConcurrency int
poolPipelineWindow time.Duration
clusterMode bool
)
// Options:
func init() {
flag.StringVar(&host, "host", "localhost:6379", "The host:port for Redis connection")
flag.DurationVar(&poolPipelineWindow, "pipeline-window-ms", time.Millisecond*0, "If window is zero then implicit pipelining will be disabled")
flag.IntVar(&poolPipelineConcurrency, "pipeline-max-size", 0, "If limit is zero then no limit will be used and pipelines will only be limited by the specified time window")
flag.BoolVar(&clusterMode, "cluster-mode", false, "If set to true, it will run the client in cluster mode.")
flag.Parse()
}
/*
* Example of how to connect to RedisTimeSeries ( cluster or standalone ) and produce write/read commands
*/
func main() {
if clusterMode {
clusterClientExample()
} else {
vanillaClientExample()
}
}
func clusterClientExample() {
var err error = nil
key1 := "temperature:{area_32}:1"
key2 := "temperature:{area_32}:2"
var vanillaCluster *radix.Cluster
poolSize := 1
var rcv interface{}
poolOptions := radix.PoolPipelineWindow(poolPipelineWindow, poolPipelineConcurrency)
poolFunc := func(network, addr string) (radix.Client, error) {
return radix.NewPool(network, addr, poolSize, poolOptions)
}
vanillaCluster, err = radix.NewCluster([]string{host}, radix.ClusterPoolFunc(poolFunc))
if err != nil {
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err)
}
fmt.Println(fmt.Sprintf("Cluster topology: %s",vanillaCluster.Topo()))
// ensure keys do not exist to avoid Error replies
err = vanillaCluster.Do(radix.FlatCmd(nil, "DEL", key1, key2))
if err != nil {
log.Fatalf("Error preparing for example, while issuing DEL. error = %v", err)
}
// Create the new time-series with TS.CREATE
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tscreate
createCmd1 := radix.FlatCmd(nil, "TS.CREATE", key1, "LABELS", "sensor_id", 1, "area_id", 32)
createCmd2 := radix.FlatCmd(nil, "TS.CREATE", key2, "LABELS", "sensor_id", 2, "area_id", 32)
err = vanillaCluster.Do(createCmd1)
if err != nil {
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err)
}
err = vanillaCluster.Do(createCmd2)
if err != nil {
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err)
}
// Populate the time-series with TS.MADD
datapointTimestamps := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
datapointValues := []float64{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
for idx, timestamp := range datapointTimestamps {
value := datapointValues[idx]
// Append new samples to a list of series.
// https://oss.redislabs.com/redistimeseries/commands/#tsmadd
maddCmd := radix.FlatCmd(&rcv, "TS.MADD", key1, timestamp, value, key2, timestamp, value)
err = vanillaCluster.Do(maddCmd)
if err != nil {
log.Fatalf("Error preparing while adding data points. error = %v", err)
}
fmt.Println(rcv)
//Output:
// <timestamp>
// <timestamp>
}
// Query a timestamp range across multiple time-series by filters.
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tsmrange
for _, element := range vanillaCluster.Topo() {
elementClient, err := vanillaCluster.Client(element.Addr)
if err != nil {
log.Fatalf("Error preparing while adding data points. error = %v", err)
}
var mrangeReply []interface{}
mrangeCmd := radix.Cmd(&mrangeReply, "TS.MRANGE", "-", "+", "COUNT", "2", "WITHLABELS", "FILTER", "area_id=32")
err = elementClient.Do(mrangeCmd)
if err != nil {
log.Fatalf("Error preparing while adding data points. error = %v", err)
}
processMrangeReply(mrangeReply)
}
}
func vanillaClientExample() {
var err error = nil
key1 := "temperature:{area_32}:1"
key2 := "temperature:{area_32}:2"
var vanillaClient *radix.Pool
poolSize := 1
poolOptions := radix.PoolPipelineWindow(poolPipelineWindow, poolPipelineConcurrency)
vanillaClient, err = radix.NewPool("tcp", host, poolSize, poolOptions)
if err != nil {
log.Fatalf("Error preparing for example, while creating new connection. error = %v", err)
}
// ensure keys do not exist to avoid Error replies
err = vanillaClient.Do(radix.FlatCmd(nil, "DEL", key1, key2))
if err != nil {
log.Fatalf("Error preparing for example, while issuing DEL. error = %v", err)
}
// Create the new time-series with TS.CREATE
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tscreate
createCmd1 := radix.FlatCmd(nil, "TS.CREATE", key1, "LABELS", "sensor_id", 1, "area_id", 32)
createCmd2 := radix.FlatCmd(nil, "TS.CREATE", key2, "LABELS", "sensor_id", 2, "area_id", 32)
err = vanillaClient.Do(createCmd1)
if err != nil {
log.Fatalf("Error preparing for example, while creating new time series. error = %v", err)
}
err = vanillaClient.Do(createCmd2)
if err != nil {
log.Fatalf("Error preparing for example, while creating new time series. error = %v", err)
}
// Populate the time-series with TS.MADD
datapointTimestamps := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
datapointValues := []float64{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
for idx, timestamp := range datapointTimestamps {
value := datapointValues[idx]
var rcv []interface{}
// Append new samples to a list of series.
// https://oss.redislabs.com/redistimeseries/commands/#tsmadd
maddCmd := radix.FlatCmd(&rcv, "TS.MADD", key1, timestamp, value, key2, timestamp, value)
err = vanillaClient.Do(maddCmd)
if err != nil {
log.Fatalf("Error preparing while adding data points. error = %v", err)
}
fmt.Println(rcv)
//Output:
// <timestamp>
// <timestamp>
}
// Query a timestamp range across multiple time-series by filters.
// Further reference here: https://oss.redislabs.com/redistimeseries/commands/#tsmrange
var mrangeReply []interface{}
mrangeCmd := radix.Cmd(&mrangeReply, "TS.MRANGE", "-", "+", "COUNT", "2", "WITHLABELS", "FILTER", "area_id=32")
err = vanillaClient.Do(mrangeCmd)
if err != nil {
log.Fatalf("Error preparing while adding data points. error = %v", err)
}
processMrangeReply(mrangeReply)
}
// Processes the TS.MRANGE formated reply
// Further refereces here:
func processMrangeReply(mrangeReply []interface{}) {
//fmt.Println(reflect.TypeOf(mrangeReply))
//fmt.Println(mrangeReply)
for _, innerArray := range mrangeReply {
//retrive the inner array
tsArrReply, ok := innerArray.([]interface{})
if ok {
//retrieve the ts name
tsName := string(tsArrReply[0].([]byte))
//retrieve the ts labels
labelsRaw := tsArrReply[1].([]interface{})
labels := make(map[string]string, len(labelsRaw))
for _, labelRaw := range labelsRaw {
kvPair := labelRaw.([]interface{})
k := string(kvPair[0].([]byte))
v := string(kvPair[1].([]byte))
labels[k] = v
}
//retrieve the ts values
valuesRaw := tsArrReply[2].([]interface{})
values := make(map[int64]float64, len(valuesRaw))
for _, valueRaw := range valuesRaw {
kvPair := valueRaw.([]interface{})
k := kvPair[0].(int64)
v, _ := strconv.ParseFloat(kvPair[1].(string), 64)
values[k] = v
}
fmt.Println(tsName)
fmt.Println(labels)
fmt.Println(values)
//Output: <name>
// map[labels]value
// map[ts]value
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment