Skip to content

Instantly share code, notes, and snippets.

@owulveryck
Last active September 13, 2022 19:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save owulveryck/08405837b13ed215de79214e40e20a3b to your computer and use it in GitHub Desktop.
Save owulveryck/08405837b13ed215de79214e40e20a3b to your computer and use it in GitHub Desktop.
Trivial server that validates data and send it to kafka

Running the sample:

go run . definition.cue

Getting the OpenAPI

❯ curl http://localhost:8181/openapi returns:

{
   "openapi": "3.0.0",
   "info": {
      "title": "Generated by cue.",
      "version": "no version"
   },
   "paths": {},
   "components": {
      "schemas": {
         "Identity": {
            "type": "object",
            "required": [
               "first",
               "Last"
            ],
            "properties": {
               "first": {
                  "description": "first name of the person",
                  "type": "string",
                  "pattern": "[A-Z].*"
               },
               "Last": {
                  "description": "Last name of the person",
                  "type": "string",
                  "pattern": "[A-Z].*"
               },
               "Age": {
                  "description": "Age of the person",
                  "type": "number",
                  "maximum": 130,
                  "exclusiveMaximum": true
               }
            }
         }
      }
   }
}

Posting and validating messages:

❯ curl -XPOST -d@good.json http://localhost:8181/
ok%          
❯ curl -XPOST -d@bad.json http://localhost:8181/
#Identity.Age: invalid value 140 (out of bound <130)
{
"first": "John",
"Last": "Doe",
"Age": 140
}
#Identity: {
// first name of the person
first: =~"[A-Z].*"
// Last name of the person
Last: =~"[A-Z].*"
// Age of the person
Age?: number & <130
}
module owulveryck.github.io/test1
go 1.16
require (
cuelang.org/go v0.4.0 // indirect
github.com/Shopify/sarama v1.25.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.4.1 // indirect
github.com/cloudevents/sdk-go/v2 v2.4.1 // indirect
github.com/cockroachdb/apd/v2 v2.0.1 // indirect
github.com/google/uuid v1.2.0 // indirect
)
{
"first": "John",
"Last": "Doe",
"Age": 40
}
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"time"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"cuelang.org/go/cue/load"
"cuelang.org/go/encoding/openapi"
"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/google/uuid"
)
// DataProduct is a structure that holds the definition as a CUE value
type DataProduct struct {
definition cue.Value
source string // the source of the event
dest cloudevents.Client
// ...
}
// ServeHTTP to make the *DataProduct a http handler
// This is an example, we do not handle the method properly nor we check the content type
// This methods reads the payload from the request an calls the ExtractData method for validation
func (d *DataProduct) ServeHTTP(w http.ResponseWriter, r *http.Request) {
b, _ := ioutil.ReadAll(r.Body)
defer r.Body.Close()
data, err := d.ExtractData(b)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
e, err := d.CreateEvent("mysubject", "mytype", data)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if d.dest != nil {
if result := d.dest.Send(
// Set the producer message key
kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(e.ID())),
e,
); cloudevents.IsUndelivered(result) {
http.Error(w, result.Error(), http.StatusInternalServerError)
return
}
fmt.Fprint(w, "sent to the channel")
}
fmt.Fprint(w, "ok")
}
// ExtractData tries to reconstruct a data from the payload b, and
// unifies it with the data definition.
// then it validates the resulting value
func (d *DataProduct) ExtractData(b []byte) (cue.Value, error) {
data := d.definition.Context().CompileBytes(b)
unified := d.definition.Unify(data)
opts := []cue.Option{
cue.Attributes(true),
cue.Definitions(true),
cue.Hidden(true),
}
return data, unified.Validate(opts...)
}
// CreateEvent with the payload encoded in JSON and d.source as source event
func (d *DataProduct) CreateEvent(sub, typ string, payload cue.Value) (event.Event, error) {
e := cloudevents.NewEvent()
b, err := payload.MarshalJSON()
if err != nil {
return e, err
}
e.SetType(typ)
e.SetSource(d.source)
e.SetSubject(sub)
e.SetTime(time.Now())
e.SetID(uuid.Must(uuid.NewRandom()).String())
err = e.SetData(cloudevents.ApplicationJSON, b)
if err != nil {
return e, err
}
_, err = json.MarshalIndent(e, " ", " ")
return e, err
}
func generateOpenAPI(defFile string, config *load.Config) ([]byte, error) {
buildInstances := load.Instances([]string{defFile}, config)
insts := cue.Build(buildInstances)
b, err := openapi.Gen(insts[0], nil)
if err != nil {
return nil, err
}
var out bytes.Buffer
err = json.Indent(&out, b, "", " ")
if err != nil {
return nil, err
}
return out.Bytes(), nil
}
func extractValue(defFile string) (cue.Value, error) {
ctx := cuecontext.New()
buildInstances := load.Instances([]string{defFile}, nil)
values, err := ctx.BuildInstances(buildInstances)
v := values[0]
it, err := v.Fields(cue.All())
if err != nil {
return cue.Value{}, err
}
for it.Next() {
if it.Selector().IsDefinition() {
return it.Value(), nil
}
}
return cue.Value{}, errors.New("nothing found")
}
func main() {
run()
}
func run() {
// Read the definition file name
defFile := os.Args[1]
openapi, err := generateOpenAPI(defFile, nil)
if err != nil {
log.Fatal(err)
}
def, err := extractValue(defFile)
if err != nil {
log.Fatal(err)
}
// Configure the channel
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
var c cloudevents.Client
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Printf("failed to create protocol: %s", err.Error())
} else {
defer sender.Close(context.Background())
c, err = cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
}
mux := http.NewServeMux()
mux.Handle("/", &DataProduct{
definition: def,
source: "sample",
dest: c,
})
mux.HandleFunc("/openapi", func(w http.ResponseWriter, _ *http.Request) {
io.Copy(w, bytes.NewBuffer(openapi))
})
log.Fatal(http.ListenAndServe(":8181", mux))
}
package main
import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/event"
)
// TestHandler is triggering a http test server with the DataProduct's handler,
// send a post request with various payload and check the result
func TestHandler(t *testing.T) {
ctx := cuecontext.New()
val := ctx.CompileString(`
{
// first name of the person
first: =~ "[A-Z].*"
// Last name of the person
Last: =~ "[A-Z].*"
// Age of the person
Age?: number & < 130
}
`)
ts := httptest.NewServer(&DataProduct{
definition: val,
})
defer ts.Close()
tests := []struct {
name string
payload io.Reader
expectedStatus int
}{
{
name: "valid",
payload: bytes.NewBufferString(`
{
"first": "John",
"Last": "Doe",
"Age": 40
}
`),
expectedStatus: http.StatusOK,
},
{
name: "invalid",
payload: bytes.NewBufferString(`
{
"first": "John",
"Last": "Doe",
"Age": 140
}
`),
expectedStatus: http.StatusBadRequest,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
req, _ := http.NewRequest("POST", ts.URL, tt.payload)
res, err := http.DefaultClient.Do(req)
if err != nil {
t.Error(err)
}
if res.StatusCode != tt.expectedStatus {
t.Errorf("expected %v, got %v", tt.expectedStatus, res.StatusCode)
}
})
}
}
func TestDataProduct_CreateEvent(t *testing.T) {
ctx := cuecontext.New()
val := ctx.CompileString(`{
"first": "John",
"Last": "Doe",
"Age": 40
}`)
expectedEventJSON := []byte(`{
"specversion": "1.0",
"id": "",
"source": "mysource",
"type": "mytype",
"subject": "subject",
"datacontenttype": "application/json",
"time": "2021-06-14T09:34:34.297881Z",
"data_base64": "eyJmaXJzdCI6IkpvaG4iLCJMYXN0IjoiRG9lIiwiQWdlIjo0MH0="
}`)
expectedEvent := cloudevents.NewEvent()
err := expectedEvent.UnmarshalJSON(expectedEventJSON)
if err != nil {
t.Fatal(err)
}
type args struct {
sub string
typ string
payload cue.Value
}
tests := []struct {
name string
d *DataProduct
args args
want event.Event
wantErr bool
}{
{
"simple valid",
&DataProduct{
source: "mysource",
},
args{
sub: "subject",
typ: "mytype",
payload: val,
},
expectedEvent,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.d.CreateEvent(tt.args.sub, tt.args.typ, tt.args.payload)
if (err != nil) != tt.wantErr {
t.Errorf("DataProduct.CreateEvent() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got.Data(), tt.want.Data()) {
t.Errorf("DataProduct.CreateEvent() = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got.Source(), tt.want.Source()) {
t.Errorf("DataProduct.CreateEvent() = %v, want %v", got, tt.want)
}
if !reflect.DeepEqual(got.Type(), tt.want.Type()) {
t.Errorf("DataProduct.CreateEvent() = %v, want %v", got, tt.want)
}
})
}
}
func BenchmarkRun(b *testing.B) {
def, err := extractValue("testdata/definition.cue")
if err != nil {
b.Fatal(err)
}
// Configure the channel
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
var c cloudevents.Client
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Printf("failed to create protocol: %s", err.Error())
} else {
defer sender.Close(context.Background())
c, err = cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
}
mux := http.NewServeMux()
mux.Handle("/", &DataProduct{
definition: def,
source: "sample",
dest: c,
})
ts := httptest.NewServer(mux)
defer ts.Close()
for i := 0; i < b.N; i++ {
req, err := http.NewRequest("POST", ts.URL, bytes.NewBufferString(`{ "first": "John", "Last": "Doe", "Age": 40 }`))
if err != nil {
b.Fatal(err)
}
res, err := http.DefaultClient.Do(req)
if err != nil || res.StatusCode != http.StatusOK {
b.Fail()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment