Skip to content

Instantly share code, notes, and snippets.

@fntlnz
Last active June 1, 2021 02:33
Show Gist options
  • Save fntlnz/c309e506ddf2c4396ebb4bd9c5c5d781 to your computer and use it in GitHub Desktop.
Save fntlnz/c309e506ddf2c4396ebb4bd9c5c5d781 to your computer and use it in GitHub Desktop.
InfluxData Flux as a library

Flux as a Library Example

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.

This gist contains a main.go file that shows how flux can be used as a library in your programs.

Components

The main components you need are:

  • The interpreter
  • The scope (aka Prelude)
  • The builtin library and your additional functions, if you want to define them
  • The language specification compiler
  • A querier

What does this example do?

This example takes all the components described above from the flux repo and uses them to compile, execute and print the results of this query.

fromGenerator(start: now(), stop: now(), count: 5, fn: (n) => 1)

The fromGenerator function is not defined by the builtin library but it is in the inputs package, you can define your own functions by registering them, you can see how that is done in from_generator.go https://github.com/influxdata/flux/blob/48f4e86c8512ff79983d2ed0668c295d74203822/stdlib/inputs/from_generator.go#L43

Since fromGenerator function as a data source we are completely decoupled from InfluxDB, to implement your own or know what are the supported ones you can take a look at this package https://github.com/influxdata/flux/tree/master/stdlib/inputs

Usage

go run main.go
Result: _result
Table: keys: [_start, _stop]
                    _time:time                  _value:int  
------------------------------  --------------------------  
2019-01-15T14:10:55.400516705Z                           1  
2019-01-15T14:10:55.400517551Z                           1  
2019-01-15T14:10:55.400518397Z                           1  
2019-01-15T14:10:55.400519243Z                           1  
2019-01-15T14:10:55.400520089Z                           1  
package main
import (
"context"
"math"
"os"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
_ "github.com/influxdata/flux/builtin"
_ "github.com/influxdata/flux/stdlib/inputs"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)
func main() {
t := `fromGenerator(start: now(), stop: now(), count: 5, fn: (n) => 1)`
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
i := interpreter.NewInterpreter()
scope := flux.Prelude()
astPkg := parser.ParseSource(t)
if ast.Check(astPkg) > 0 {
panic(ast.GetError(astPkg))
}
semPkg, err := semantic.New(astPkg)
if err != nil {
panic(err)
}
if _, err := i.Eval(semPkg, scope, flux.StdLib()); err != nil {
panic(err)
}
v := scope.Return()
// Ignore statements that do not return a value
if v == nil {
return
}
// Check for yield and execute query
if v.Type() == flux.TableObjectMonoType {
t := v.(*flux.TableObject)
now, ok := scope.Lookup("now")
if !ok {
panic(fmt.Errorf("now option not set"))
}
nowTime, err := now.Function().Call(nil)
if err != nil {
panic(err)
}
spec, err := flux.ToSpec([]values.Value{t}, nowTime.Time().Time())
if err != nil {
panic(err)
}
compiler := lang.SpecCompiler{
Spec: spec,
}
querier := NewQuerier()
results, err := querier.Query(ctx, compiler)
if err != nil {
panic(err)
}
defer results.Release()
for results.More() {
result := results.Next()
tables := result.Tables()
fmt.Println("Result:", result.Name())
err := tables.Do(func(tbl flux.Table) error {
_, err := execute.NewFormatter(tbl, nil).WriteTo(os.Stdout)
return err
})
if err != nil {
panic(err)
}
}
if err := results.Err(); err != nil {
panic(err.Error())
}
}
// Print value
if v.Type() != semantic.Invalid {
fmt.Printf("%s", v.Str())
}
}
type Querier struct {
c *control.Controller
}
func (q *Querier) Query(ctx context.Context, c flux.Compiler) (flux.ResultIterator, error) {
qry, err := q.c.Query(ctx, c)
if err != nil {
return nil, err
}
return flux.NewResultIteratorFromQuery(qry), nil
}
func NewQuerier() *Querier {
config := control.Config{
ConcurrencyQuota: 1,
MemoryBytesQuota: math.MaxInt64,
}
c := control.New(config)
return &Querier{
c: c,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment