Skip to content

Instantly share code, notes, and snippets.

@jeromegn
Created May 28, 2024 13:59
Show Gist options
  • Save jeromegn/22d88b1b0399b40d4a714b4367dec430 to your computer and use it in GitHub Desktop.
Save jeromegn/22d88b1b0399b40d4a714b4367dec430 to your computer and use it in GitHub Desktop.
package corrosion2
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"reflect"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type Subscriber struct {
api string
logger *logrus.Logger
httpClient *http.Client
}
type Event struct {
Change []interface{} `json:"change"`
Row []interface{} `json:"row"`
Eoq map[string]interface{} `json:"eoq"`
Columns []interface{} `json:"columns"`
}
func NewCorrosionSubscriber(logger *logrus.Logger, httpClient *http.Client, baseURL string) *Subscriber {
return &Subscriber{
api: fmt.Sprintf("%s/subscriptions", baseURL),
httpClient: httpClient,
logger: logger,
}
}
func (s *Subscriber) Subscribe(ctx context.Context, query string, params []string, events chan Event, errChan chan error) {
var payloadJSON []byte
var err error
if len(params) == 0 {
payloadJSON, err = json.Marshal(query)
} else {
payloadJSON, err = json.Marshal([]interface{}{query, params})
}
if err != nil {
errChan <- err
return
}
req, err := http.NewRequest("POST", s.api, bytes.NewBuffer(payloadJSON))
if err != nil {
errChan <- err
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := s.httpClient.Do(req)
if err != nil {
errChan <- err
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
errChan <- err
return
}
errChan <- fmt.Errorf("unexpected status code: %d, %s", resp.StatusCode, string(body))
return
}
decoder := json.NewDecoder(resp.Body)
for {
select {
case <-ctx.Done():
return
default:
var e Event
if err := decoder.Decode(&e); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
errChan <- err
return
}
s.logger.Error("failed to decode JSON", err)
continue
}
events <- e
}
}
}
func DetectEvent(e Event) string {
if e.Eoq != nil {
return "eoq"
}
if len(e.Row) > 0 {
return "row"
}
if len(e.Change) > 0 {
return "change"
}
if len(e.Columns) > 0 {
return "columns"
}
return "unknown"
}
func scanChangeType[T string](change []interface{}, index int, cType *string) error {
changeType, ok := (change)[index].(string)
if !ok {
return fmt.Errorf("value not string")
}
*cType = changeType
return nil
}
func scan[T any](row []interface{}, s T) error {
structValues := reflect.ValueOf(s).Elem()
fieldsToIgnore := 3
for i := 0; i < structValues.NumField()-fieldsToIgnore; i++ {
fieldValue := structValues.Field(i)
column := row[i]
columnValue := reflect.ValueOf(column)
if columnValue.Type().ConvertibleTo(fieldValue.Type()) {
fieldValue.Set(columnValue.Convert(fieldValue.Type()))
} else {
return fmt.Errorf("value type mismatch for field %s", structValues.Type().Field(i).Name)
}
}
return nil
}
func scanRowID[T int](change []interface{}, index int, row *int) error {
// Go stores JSON integers as float64 when it unmarshals into interface{}
val, ok := (change[index]).(float64)
if !ok {
return fmt.Errorf("row_id is not a float64")
}
*row = int(val)
return nil
}
func scanChange[T any](change []interface{}, index int, row T) error {
dbRow, ok := (change)[index].([]interface{})
if !ok {
return fmt.Errorf("row not array")
}
return scan[T](dbRow, row)
}
func ParseRow[T any](ctx context.Context, event Event) (string, int, T, error) {
var svc T
if err := scanChange(event.Row, 1, &svc); err != nil {
return "", 0, svc, errors.Wrap(err, "[row] failed to scan row")
}
var rowID int
if err := scanRowID(event.Row, 0, &rowID); err != nil {
return "", 0, svc, errors.Wrap(err, "[row] failed to scan rowid")
}
return "insert", rowID, svc, nil
}
func ParseChange[T any](ctx context.Context, event Event) (string, int, T, error) {
var svc T
if err := scanChange(event.Change, 2, &svc); err != nil {
return "", 0, svc, errors.Wrap(err, "[change] failed to scan row")
}
var rowID int
if err := scanRowID(event.Change, 1, &rowID); err != nil {
return "", 0, svc, errors.Wrap(err, "[change] failed to scan rowid")
}
var changeType string
if err := scanChangeType(event.Change, 0, &changeType); err != nil {
return "", 0, svc, errors.Wrap(err, "[change] failed to scan changeType")
}
return changeType, rowID, svc, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment