Skip to content

Instantly share code, notes, and snippets.

@kylebrandt
Created September 18, 2019 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylebrandt/0cfc1d7ad7436c7b9a7adb8fb1d036b3 to your computer and use it in GitHub Desktop.
Save kylebrandt/0cfc1d7ad7436c7b9a7adb8fb1d036b3 to your computer and use it in GitHub Desktop.
Old DF (dataframe) to Arrow Diff
diff --git a/pkg/dataframe/csv.go b/pkg/dataframe/csv.go
new file mode 100644
index 0000000000..0838a7c413
--- /dev/null
+++ b/pkg/dataframe/csv.go
@@ -0,0 +1,48 @@
+package dataframe
+
+import (
+ "encoding/csv"
+ "io"
+)
+
+// FromCSV is a simple CSV loader, primarily for testing
+func FromCSV(reader io.Reader, hasHeader bool, schema Schema) (*DataFrame, error) {
+ df := new(DataFrame)
+ csvReader := csv.NewReader(reader)
+ i := 0
+ for {
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ if i == 0 && hasHeader {
+ for fieldIdx, header := range record {
+ if fieldIdx >= len(schema) {
+ break
+ }
+ schema[fieldIdx].SetName(header)
+ df.Schema = append(df.Schema, schema[fieldIdx])
+ }
+ i++
+ continue
+ }
+
+ row := []Field{}
+ for fieldIdx, fieldValue := range record {
+ if fieldIdx >= len(schema) {
+ break
+ }
+ v, err := schema[fieldIdx].Extract(fieldValue)
+ if err != nil {
+ return nil, err
+ }
+ row = append(row, v)
+ }
+ df.Records = append(df.Records, row)
+ }
+ return df, nil
+}
diff --git a/pkg/dataframe/dataframe.go b/pkg/dataframe/dataframe.go
new file mode 100644
index 0000000000..6402195d57
--- /dev/null
+++ b/pkg/dataframe/dataframe.go
@@ -0,0 +1,102 @@
+// Package dataframe provides the DataFrame type.
+// The DataFrame type is used to hold data returned from Grafana Datasources.
+// This type is meant to tightly integrated with the DataFrame type in Grafana's Frontend.
+package dataframe
+
+import (
+ "fmt"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/memory"
+)
+
+// DataFrame holds Table data.
+type DataFrame struct {
+ Schema Schema
+ Type FrameType
+ Records []Fields
+}
+
+// FrameType indicates the type of data the Dataframe holds
+type FrameType int
+
+const (
+ // NumericFrame indicates the Dataframe holds numeric values.
+ NumericFrame FrameType = iota
+
+ // TimeSeriesFrame indicates the Dataframe holds timeseries data.
+ TimeSeriesFrame
+
+ // HistogramFrame indicates the Dataframe holds histograms data.
+ HistogramFrame
+
+ // OtherFrame indicates the DataFrame holds mixed or another data type.
+ OtherFrame
+)
+
+func (ft FrameType) String() string {
+ switch ft {
+ case NumericFrame:
+ return "Number"
+ case TimeSeriesFrame:
+ return "TimeSeries"
+ case HistogramFrame:
+ return "Histogram"
+ default:
+ return "Other"
+ }
+}
+
+func (ft FrameType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`"%v"`, ft.String())), nil
+}
+
+// DataFrames is a collection of DataFrames uniquely identified by key.
+type DataFrames []DataFrame
+
+// Fields is a slice fo Field.
+type Fields []Field
+
+// Field represents a unique field within a dataframe identified by its column and record position.
+type Field interface{}
+
+// ToArrow is an experiment to create an arrow Table from the dataframe
+func (d *DataFrame) ToArrow() *array.TableReader {
+ arrowFields := make([]arrow.Field, len(d.Schema))
+ for i, cs := range d.Schema {
+ arrowFields[i] = arrow.Field{Name: cs.GetName(), Type: cs.ArrowType()}
+ }
+ schema := arrow.NewSchema(arrowFields, nil)
+
+ pool := memory.NewGoAllocator()
+
+ rb := array.NewRecordBuilder(pool, schema)
+ defer rb.Release()
+
+ records := make([]array.Record, len(d.Records))
+ for rowIdx, row := range d.Records {
+ for fieldIdx, field := range row {
+ switch arrowFields[fieldIdx].Type.(type) {
+ case *arrow.StringType:
+ rb.Field(fieldIdx).(*array.StringBuilder).Append(*(field.(*string)))
+ //rb.Field(fieldIdx).(*array.StringBuilder).AppendValues([]string{*(field.(*string))}, []bool{})
+ case *arrow.Float64Type:
+ rb.Field(fieldIdx).(*array.Float64Builder).Append(*(field.(*float64)))
+ //rb.Field(fieldIdx).(*array.Float64Builder).AppendValues([]float64{*(field.(*float64))}, []bool{})
+ default:
+ fmt.Println("unmatched")
+ }
+ }
+ rec := rb.NewRecord()
+ defer rec.Release()
+ records[rowIdx] = rec
+ }
+ table := array.NewTableFromRecords(schema, records)
+ defer table.Release()
+ tableReader := array.NewTableReader(table, 3)
+ //tableReader.Retain()
+
+ return tableReader
+
+}
diff --git a/pkg/dataframe/dataframe_test.go b/pkg/dataframe/dataframe_test.go
new file mode 100644
index 0000000000..fcc459b799
--- /dev/null
+++ b/pkg/dataframe/dataframe_test.go
@@ -0,0 +1,103 @@
+package dataframe
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow/go/arrow/ipc"
+)
+
+// Not really tests ... just my repl/notebook for now
+
+func TestLoadingDataFrameFromCSV(t *testing.T) {
+ data, err := os.Open("./testdata/simpleTimeSeries.csv")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ schema := Schema{
+ NewTimeColumn(time.RFC3339),
+ NewNumberColumn(),
+ NewStringColumn(),
+ }
+ df, err := FromCSV(
+ bufio.NewReader(data),
+ true,
+ schema)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ df.Type = TimeSeriesFrame
+ v, err := json.MarshalIndent(df, "", " ")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ _ = v
+ fmt.Println(string(v))
+}
+
+func TestLoadingNumberDataFrameFromCSVAndWritingToArrow(t *testing.T) {
+ data, err := os.Open("./testdata/stringNumber.csv")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ schema := Schema{
+ NewStringColumn(),
+ NewNumberColumn(),
+ }
+ df, err := FromCSV(
+ bufio.NewReader(data),
+ true,
+ schema)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ df.Type = TimeSeriesFrame
+ v, err := json.MarshalIndent(df, "", " ")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ _ = v
+ tableReader := df.ToArrow()
+
+ outFile, err := os.OpenFile("/home/kbrandt/tmp/arrowstuff", os.O_APPEND|os.O_WRONLY|os.O_CREATE, os.FileMode(0644))
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ writer, err := ipc.NewFileWriter(outFile, ipc.WithSchema(tableReader.Schema()))
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ for tableReader.Next() {
+ rec := tableReader.Record()
+ err := writer.Write(rec)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ }
+ err = writer.Close()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ err = outFile.Close()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+}
diff --git a/pkg/dataframe/schema.go b/pkg/dataframe/schema.go
new file mode 100644
index 0000000000..bf5b42e17b
--- /dev/null
+++ b/pkg/dataframe/schema.go
@@ -0,0 +1,143 @@
+package dataframe
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/apache/arrow/go/arrow"
+)
+
+// Schema is a slice of ColumnSchema.
+type Schema []ColumnSchema
+
+type ColumnSchema interface {
+ GetName() string
+ SetName(name string)
+ ColumnType() ColumnType
+ Extract(v string) (interface{}, error)
+ ArrowType() arrow.DataType
+}
+
+type BaseSchema struct {
+ Name string
+ Type ColumnType
+ ArrowDataType arrow.DataType
+}
+
+func (b *BaseSchema) GetName() string {
+ return b.Name
+}
+
+func (b *BaseSchema) SetName(name string) {
+ b.Name = name
+}
+
+func (b *BaseSchema) ColumnType() ColumnType {
+ return b.Type
+}
+
+func (b *BaseSchema) ArrowType() arrow.DataType {
+ return b.ArrowDataType
+}
+
+type TimeColumnSchema struct {
+ BaseSchema
+ Format string
+}
+
+func NewTimeColumn(format string) *TimeColumnSchema {
+ t := new(TimeColumnSchema)
+ t.Type = DateTime
+ t.Format = format
+ t.ArrowDataType = arrow.PrimitiveTypes.Date64
+ return t
+}
+
+func (tcs *TimeColumnSchema) ColumnType() ColumnType {
+ return DateTime
+}
+
+func (tcs *TimeColumnSchema) Extract(v string) (interface{}, error) {
+ if v == "" {
+ return nil, nil
+ }
+ t, err := time.Parse(tcs.Format, v)
+ if err != nil {
+ return nil, err
+ }
+ return &t, nil
+}
+
+type NumberColumnSchema struct{ BaseSchema }
+
+func NewNumberColumn() *NumberColumnSchema {
+ n := new(NumberColumnSchema)
+ n.Type = Number
+ n.ArrowDataType = &arrow.Float64Type{}
+
+ return n
+}
+
+func (ncs *NumberColumnSchema) Extract(v string) (interface{}, error) {
+ if v == "" {
+ return nil, nil
+ }
+ f, err := strconv.ParseFloat(v, 64)
+ if err != nil {
+ return nil, err
+ }
+ return &f, nil
+
+}
+
+type StringColumnSchema struct{ BaseSchema }
+
+func NewStringColumn() *StringColumnSchema {
+ s := new(StringColumnSchema)
+ s.Type = String
+ s.ArrowDataType = &arrow.StringType{}
+ return s
+}
+
+func (scs *StringColumnSchema) Extract(v string) (interface{}, error) {
+ return &v, nil
+
+}
+
+// ColumnType is the type of Data that a DataFrame column holds.
+type ColumnType int
+
+const (
+ // DateTime is the ColumnType holds a value that is a representation of absolute time.
+ DateTime ColumnType = iota
+
+ // Number is the ColumnType that indicates the column will have integers and floats.
+ Number
+
+ // String is the ColumnType that indicate the column will have string values.
+ String
+
+ // Bool is the ColumnType that indicates the column will have a boolean values.
+ Bool
+
+ // Other is the ColumnType that indicates the column has an unknown type or mix of value types.
+ Other
+)
+
+func (c ColumnType) String() string {
+ switch c {
+ case DateTime:
+ return "DateTime"
+ case Number:
+ return "Number"
+ case String:
+ return "String"
+ default:
+ return "Other"
+ }
+}
+
+func (c ColumnType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`"%v"`, c.String())), nil
+}
diff --git a/pkg/dataframe/testdata/simpleTimeSeries.csv b/pkg/dataframe/testdata/simpleTimeSeries.csv
new file mode 100644
index 0000000000..7f54305f26
--- /dev/null
+++ b/pkg/dataframe/testdata/simpleTimeSeries.csv
@@ -0,0 +1,9 @@
+time,value,color
+2006-01-02T15:04:05Z,3,blue
+2006-01-02T15:04:06Z,5,
+2006-01-02T15:04:07Z,6,yellow
+2006-01-02T15:04:08Z,1,blue
+2006-01-02T15:04:09Z,,blue
+,1,
+2006-01-02T15:04:11Z,2,blue
+,,
\ No newline at end of file
diff --git a/pkg/dataframe/testdata/stringNumber.csv b/pkg/dataframe/testdata/stringNumber.csv
new file mode 100644
index 0000000000..971968a914
--- /dev/null
+++ b/pkg/dataframe/testdata/stringNumber.csv
@@ -0,0 +1,4 @@
+thing,value
+foo,1
+bar,3
+why,7
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment