Skip to content

Instantly share code, notes, and snippets.

@mark-rushakoff
Created September 1, 2016 16:28
Show Gist options
  • Save mark-rushakoff/2e1c160a15d0c7500548d9aae1773c6d to your computer and use it in GitHub Desktop.
Save mark-rushakoff/2e1c160a15d0c7500548d9aae1773c6d to your computer and use it in GitHub Desktop.
Result parsing for InfluxDB
package resultparser
import (
"encoding/json"
"fmt"
)
// Field is implemented by TimeField, IntField, FloatField, and StringField
type Field interface {
columnIndex([]string) (int, error)
set(interface{})
}
var (
_ Field = &TimeField{}
_ Field = &IntField{}
_ Field = &FloatField{}
_ Field = &StringField{}
)
// TimeField represents the "time" field in a row in an InfluxDB response.
type TimeField struct {
Nanoseconds int64
}
func (f *TimeField) columnIndex(columns []string) (int, error) {
return findStringIndex(columns, "time")
}
func (f *TimeField) set(v interface{}) {
// Assuming decoder.UseNumber was called.
var err error
f.Nanoseconds, err = v.(json.Number).Int64()
if err != nil {
panic(err)
}
}
// IntField represents an integer field with the given Name in a row in an InfluxDB response.
type IntField struct {
Name string
Value int64
}
func (f *IntField) columnIndex(columns []string) (int, error) {
return findStringIndex(columns, f.Name)
}
func (f *IntField) set(v interface{}) {
// Assuming decoder.UseNumber was called.
var err error
f.Value, err = v.(json.Number).Int64()
if err != nil {
panic(err)
}
}
// FloatField represents a float field with the given Name in a row in an InfluxDB response.
type FloatField struct {
Name string
Value float64
}
func (f *FloatField) columnIndex(columns []string) (int, error) {
return findStringIndex(columns, f.Name)
}
func (f *FloatField) set(v interface{}) {
// Assuming decoder.UseNumber was called.
var err error
f.Value, err = v.(json.Number).Float64()
if err != nil {
panic(err)
}
}
// StringField represents a string field with the given Name in a row in an InfluxDB response.
type StringField struct {
Name string
Value string
}
func (f *StringField) columnIndex(columns []string) (int, error) {
return findStringIndex(columns, f.Name)
}
func (f *StringField) set(v interface{}) {
f.Value = v.(string)
}
func findStringIndex(columns []string, name string) (int, error) {
for i, col := range columns {
if col == name {
return i, nil
}
}
return -1, fmt.Errorf("Field not found: %s", name)
}
// Package resultparser contains utilities to simplify handling of InfluxDB query results.
package resultparser
import (
"bytes"
"encoding/json"
"fmt"
)
// ResultParser manages mapping named fields to actual values from InfluxDB query results.
type ResultParser struct {
// Extracted from the response json.
values [][]interface{}
// Which row we're currently pointing at.
rowIndex int
// Map column index to underlying Field.
columnsToFields map[int]Field
}
// NewResultParser returns a new ResultParser based on the given raw response and the given fields.
func NewResultParser(rawResp []byte, fields []Field) (*ResultParser, error) {
resp := new(resp)
dec := json.NewDecoder(bytes.NewReader(rawResp))
dec.UseNumber()
if err := dec.Decode(resp); err != nil {
return nil, err
}
if len(resp.Results) != 1 {
return nil, fmt.Errorf("Expected exactly one result in response, got %d", len(resp.Results))
}
if len(resp.Results[0].Series) != 1 {
return nil, fmt.Errorf("Expected exactly one series in result, got %d", len(resp.Results[0].Series))
}
columns := resp.Results[0].Series[0].Columns
columnsToFields := make(map[int]Field, len(fields))
for _, f := range fields {
idx, err := f.columnIndex(columns)
if err != nil {
return nil, err
}
columnsToFields[idx] = f
}
return &ResultParser{
values: resp.Results[0].Series[0].Values,
rowIndex: -1, // so that the first call to Next() sets to 0
columnsToFields: columnsToFields,
}, nil
}
// Next returns true if there is another row of values to parse.
func (p *ResultParser) Next() bool {
p.rowIndex++
return p.rowIndex < len(p.values)
}
// ParseFields assigns the correct values into the original passed-in Fields based on the Values in the current row.
// ParseFields currently will panic if a type conversion fails.
func (p *ResultParser) ParseFields() {
for colIdx, field := range p.columnsToFields {
// This can panic if the type conversion fails.
// If we start running into that in experiments, let's revisit returning an error from set().
field.set(p.values[p.rowIndex][colIdx])
}
}
// resp is a container for the raw InfluxDB responses.
type resp struct {
Results []struct {
Series []struct {
Columns []string `json:"columns"`
Values [][]interface{} `json:"values"`
} `json:"series"`
} `json:"results"`
}
package resultparser_test
import (
"testing"
"github.com/influxdata/laboratory/resultparser"
)
func TestResultParser_Fields(t *testing.T) {
rawResp := []byte(`{
"results": [
{
"series": [
{
"name": "timetonpoints",
"columns": [
"time",
"expid",
"latNs",
"linesperbatch",
"numpoints",
"numwriters",
"reqBytes",
"someFloat",
"status"
],
"values": [
[
1459985225813556804,
"b3blkgssusuw_1qn5vnc",
14917972,
"100",
"10000",
"1",
5690,
1.5,
204
],
[
1459985225815570190,
"b3c81v86pkqu_1nakltx",
1993293,
"100",
"10000",
"1",
5800,
2.75,
204
]
]
}
]
}
]
}`)
timeF := &resultparser.TimeField{}
reqBytesF := &resultparser.IntField{Name: "reqBytes"}
expIDF := &resultparser.StringField{Name: "expid"}
floatF := &resultparser.FloatField{Name: "someFloat"}
p, err := resultparser.NewResultParser(rawResp, []resultparser.Field{
timeF,
reqBytesF,
expIDF,
floatF,
})
if err != nil {
t.Fatalf("unexpected error: %s", err.Error())
}
rowCount := 0
for p.Next() {
rowCount++
p.ParseFields()
var (
expTime int64
expReqBytes int64
expID string
expFloat float64
)
if rowCount == 1 {
expTime = int64(1459985225813556804)
expReqBytes = int64(5690)
expID = "b3blkgssusuw_1qn5vnc"
expFloat = float64(1.5)
} else {
expTime = int64(1459985225815570190)
expReqBytes = int64(5800)
expID = "b3c81v86pkqu_1nakltx"
expFloat = float64(2.75)
}
if timeF.Nanoseconds != expTime {
t.Fatalf("time field: exp %d, got %d", expTime, timeF.Nanoseconds)
}
if reqBytesF.Value != expReqBytes {
t.Fatalf("req bytes field: exp %d, got %d", expReqBytes, reqBytesF.Value)
}
if expIDF.Value != expID {
t.Fatalf("id field: exp %d, got %d", expID, expIDF.Value)
}
if floatF.Value != expFloat {
t.Fatalf("float field: exp %d, got %d", expFloat, floatF.Value)
}
}
if rowCount != 2 {
t.Fatalf("exp 2 rows, got: %d", rowCount)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment