Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Old DF (dataframe) to Arrow Diff
diff --git a/pkg/dataframe/csv.go b/pkg/dataframe/csv.go
new file mode 100644
index 0000000000..0838a7c413
--- /dev/null
+++ b/pkg/dataframe/csv.go
@@ -0,0 +1,48 @@
+package dataframe
+
+import (
+ "encoding/csv"
+ "io"
+)
+
+// FromCSV is a simple CSV loader, primarily for testing
+func FromCSV(reader io.Reader, hasHeader bool, schema Schema) (*DataFrame, error) {
+ df := new(DataFrame)
+ csvReader := csv.NewReader(reader)
+ i := 0
+ for {
+ record, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ if i == 0 && hasHeader {
+ for fieldIdx, header := range record {
+ if fieldIdx >= len(schema) {
+ break
+ }
+ schema[fieldIdx].SetName(header)
+ df.Schema = append(df.Schema, schema[fieldIdx])
+ }
+ i++
+ continue
+ }
+
+ row := []Field{}
+ for fieldIdx, fieldValue := range record {
+ if fieldIdx >= len(schema) {
+ break
+ }
+ v, err := schema[fieldIdx].Extract(fieldValue)
+ if err != nil {
+ return nil, err
+ }
+ row = append(row, v)
+ }
+ df.Records = append(df.Records, row)
+ }
+ return df, nil
+}
diff --git a/pkg/dataframe/dataframe.go b/pkg/dataframe/dataframe.go
new file mode 100644
index 0000000000..6402195d57
--- /dev/null
+++ b/pkg/dataframe/dataframe.go
@@ -0,0 +1,102 @@
+// Package dataframe provides the DataFrame type.
+// The DataFrame type is used to hold data returned from Grafana Datasources.
+// This type is meant to tightly integrated with the DataFrame type in Grafana's Frontend.
+package dataframe
+
+import (
+ "fmt"
+
+ "github.com/apache/arrow/go/arrow"
+ "github.com/apache/arrow/go/arrow/array"
+ "github.com/apache/arrow/go/arrow/memory"
+)
+
+// DataFrame holds Table data.
+type DataFrame struct {
+ Schema Schema
+ Type FrameType
+ Records []Fields
+}
+
+// FrameType indicates the type of data the Dataframe holds
+type FrameType int
+
+const (
+ // NumericFrame indicates the Dataframe holds numeric values.
+ NumericFrame FrameType = iota
+
+ // TimeSeriesFrame indicates the Dataframe holds timeseries data.
+ TimeSeriesFrame
+
+ // HistogramFrame indicates the Dataframe holds histograms data.
+ HistogramFrame
+
+ // OtherFrame indicates the DataFrame holds mixed or another data type.
+ OtherFrame
+)
+
+func (ft FrameType) String() string {
+ switch ft {
+ case NumericFrame:
+ return "Number"
+ case TimeSeriesFrame:
+ return "TimeSeries"
+ case HistogramFrame:
+ return "Histogram"
+ default:
+ return "Other"
+ }
+}
+
+func (ft FrameType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`"%v"`, ft.String())), nil
+}
+
+// DataFrames is a collection of DataFrames uniquely identified by key.
+type DataFrames []DataFrame
+
+// Fields is a slice fo Field.
+type Fields []Field
+
+// Field represents a unique field within a dataframe identified by its column and record position.
+type Field interface{}
+
+// ToArrow is an experiment to create an arrow Table from the dataframe
+func (d *DataFrame) ToArrow() *array.TableReader {
+ arrowFields := make([]arrow.Field, len(d.Schema))
+ for i, cs := range d.Schema {
+ arrowFields[i] = arrow.Field{Name: cs.GetName(), Type: cs.ArrowType()}
+ }
+ schema := arrow.NewSchema(arrowFields, nil)
+
+ pool := memory.NewGoAllocator()
+
+ rb := array.NewRecordBuilder(pool, schema)
+ defer rb.Release()
+
+ records := make([]array.Record, len(d.Records))
+ for rowIdx, row := range d.Records {
+ for fieldIdx, field := range row {
+ switch arrowFields[fieldIdx].Type.(type) {
+ case *arrow.StringType:
+ rb.Field(fieldIdx).(*array.StringBuilder).Append(*(field.(*string)))
+ //rb.Field(fieldIdx).(*array.StringBuilder).AppendValues([]string{*(field.(*string))}, []bool{})
+ case *arrow.Float64Type:
+ rb.Field(fieldIdx).(*array.Float64Builder).Append(*(field.(*float64)))
+ //rb.Field(fieldIdx).(*array.Float64Builder).AppendValues([]float64{*(field.(*float64))}, []bool{})
+ default:
+ fmt.Println("unmatched")
+ }
+ }
+ rec := rb.NewRecord()
+ defer rec.Release()
+ records[rowIdx] = rec
+ }
+ table := array.NewTableFromRecords(schema, records)
+ defer table.Release()
+ tableReader := array.NewTableReader(table, 3)
+ //tableReader.Retain()
+
+ return tableReader
+
+}
diff --git a/pkg/dataframe/dataframe_test.go b/pkg/dataframe/dataframe_test.go
new file mode 100644
index 0000000000..fcc459b799
--- /dev/null
+++ b/pkg/dataframe/dataframe_test.go
@@ -0,0 +1,103 @@
+package dataframe
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow/go/arrow/ipc"
+)
+
+// Not really tests ... just my repl/notebook for now
+
+func TestLoadingDataFrameFromCSV(t *testing.T) {
+ data, err := os.Open("./testdata/simpleTimeSeries.csv")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ schema := Schema{
+ NewTimeColumn(time.RFC3339),
+ NewNumberColumn(),
+ NewStringColumn(),
+ }
+ df, err := FromCSV(
+ bufio.NewReader(data),
+ true,
+ schema)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ df.Type = TimeSeriesFrame
+ v, err := json.MarshalIndent(df, "", " ")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ _ = v
+ fmt.Println(string(v))
+}
+
+func TestLoadingNumberDataFrameFromCSVAndWritingToArrow(t *testing.T) {
+ data, err := os.Open("./testdata/stringNumber.csv")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ schema := Schema{
+ NewStringColumn(),
+ NewNumberColumn(),
+ }
+ df, err := FromCSV(
+ bufio.NewReader(data),
+ true,
+ schema)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ df.Type = TimeSeriesFrame
+ v, err := json.MarshalIndent(df, "", " ")
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ _ = v
+ tableReader := df.ToArrow()
+
+ outFile, err := os.OpenFile("/home/kbrandt/tmp/arrowstuff", os.O_APPEND|os.O_WRONLY|os.O_CREATE, os.FileMode(0644))
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ writer, err := ipc.NewFileWriter(outFile, ipc.WithSchema(tableReader.Schema()))
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ for tableReader.Next() {
+ rec := tableReader.Record()
+ err := writer.Write(rec)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ }
+ err = writer.Close()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ err = outFile.Close()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+}
diff --git a/pkg/dataframe/schema.go b/pkg/dataframe/schema.go
new file mode 100644
index 0000000000..bf5b42e17b
--- /dev/null
+++ b/pkg/dataframe/schema.go
@@ -0,0 +1,143 @@
+package dataframe
+
+import (
+ "fmt"
+ "strconv"
+ "time"
+
+ "github.com/apache/arrow/go/arrow"
+)
+
+// Schema is a slice of ColumnSchema.
+type Schema []ColumnSchema
+
+type ColumnSchema interface {
+ GetName() string
+ SetName(name string)
+ ColumnType() ColumnType
+ Extract(v string) (interface{}, error)
+ ArrowType() arrow.DataType
+}
+
+type BaseSchema struct {
+ Name string
+ Type ColumnType
+ ArrowDataType arrow.DataType
+}
+
+func (b *BaseSchema) GetName() string {
+ return b.Name
+}
+
+func (b *BaseSchema) SetName(name string) {
+ b.Name = name
+}
+
+func (b *BaseSchema) ColumnType() ColumnType {
+ return b.Type
+}
+
+func (b *BaseSchema) ArrowType() arrow.DataType {
+ return b.ArrowDataType
+}
+
+type TimeColumnSchema struct {
+ BaseSchema
+ Format string
+}
+
+func NewTimeColumn(format string) *TimeColumnSchema {
+ t := new(TimeColumnSchema)
+ t.Type = DateTime
+ t.Format = format
+ t.ArrowDataType = arrow.PrimitiveTypes.Date64
+ return t
+}
+
+func (tcs *TimeColumnSchema) ColumnType() ColumnType {
+ return DateTime
+}
+
+func (tcs *TimeColumnSchema) Extract(v string) (interface{}, error) {
+ if v == "" {
+ return nil, nil
+ }
+ t, err := time.Parse(tcs.Format, v)
+ if err != nil {
+ return nil, err
+ }
+ return &t, nil
+}
+
+type NumberColumnSchema struct{ BaseSchema }
+
+func NewNumberColumn() *NumberColumnSchema {
+ n := new(NumberColumnSchema)
+ n.Type = Number
+ n.ArrowDataType = &arrow.Float64Type{}
+
+ return n
+}
+
+func (ncs *NumberColumnSchema) Extract(v string) (interface{}, error) {
+ if v == "" {
+ return nil, nil
+ }
+ f, err := strconv.ParseFloat(v, 64)
+ if err != nil {
+ return nil, err
+ }
+ return &f, nil
+
+}
+
+type StringColumnSchema struct{ BaseSchema }
+
+func NewStringColumn() *StringColumnSchema {
+ s := new(StringColumnSchema)
+ s.Type = String
+ s.ArrowDataType = &arrow.StringType{}
+ return s
+}
+
+func (scs *StringColumnSchema) Extract(v string) (interface{}, error) {
+ return &v, nil
+
+}
+
+// ColumnType is the type of Data that a DataFrame column holds.
+type ColumnType int
+
+const (
+ // DateTime is the ColumnType holds a value that is a representation of absolute time.
+ DateTime ColumnType = iota
+
+ // Number is the ColumnType that indicates the column will have integers and floats.
+ Number
+
+ // String is the ColumnType that indicate the column will have string values.
+ String
+
+ // Bool is the ColumnType that indicates the column will have a boolean values.
+ Bool
+
+ // Other is the ColumnType that indicates the column has an unknown type or mix of value types.
+ Other
+)
+
+func (c ColumnType) String() string {
+ switch c {
+ case DateTime:
+ return "DateTime"
+ case Number:
+ return "Number"
+ case String:
+ return "String"
+ default:
+ return "Other"
+ }
+}
+
+func (c ColumnType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`"%v"`, c.String())), nil
+}
diff --git a/pkg/dataframe/testdata/simpleTimeSeries.csv b/pkg/dataframe/testdata/simpleTimeSeries.csv
new file mode 100644
index 0000000000..7f54305f26
--- /dev/null
+++ b/pkg/dataframe/testdata/simpleTimeSeries.csv
@@ -0,0 +1,9 @@
+time,value,color
+2006-01-02T15:04:05Z,3,blue
+2006-01-02T15:04:06Z,5,
+2006-01-02T15:04:07Z,6,yellow
+2006-01-02T15:04:08Z,1,blue
+2006-01-02T15:04:09Z,,blue
+,1,
+2006-01-02T15:04:11Z,2,blue
+,,
\ No newline at end of file
diff --git a/pkg/dataframe/testdata/stringNumber.csv b/pkg/dataframe/testdata/stringNumber.csv
new file mode 100644
index 0000000000..971968a914
--- /dev/null
+++ b/pkg/dataframe/testdata/stringNumber.csv
@@ -0,0 +1,4 @@
+thing,value
+foo,1
+bar,3
+why,7
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.