-
-
Save jeromegn/22d88b1b0399b40d4a714b4367dec430 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package corrosion2 | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"fmt" | |
"io" | |
"net/http" | |
"reflect" | |
"github.com/pkg/errors" | |
"github.com/sirupsen/logrus" | |
) | |
type Subscriber struct { | |
api string | |
logger *logrus.Logger | |
httpClient *http.Client | |
} | |
type Event struct { | |
Change []interface{} `json:"change"` | |
Row []interface{} `json:"row"` | |
Eoq map[string]interface{} `json:"eoq"` | |
Columns []interface{} `json:"columns"` | |
} | |
func NewCorrosionSubscriber(logger *logrus.Logger, httpClient *http.Client, baseURL string) *Subscriber { | |
return &Subscriber{ | |
api: fmt.Sprintf("%s/subscriptions", baseURL), | |
httpClient: httpClient, | |
logger: logger, | |
} | |
} | |
func (s *Subscriber) Subscribe(ctx context.Context, query string, params []string, events chan Event, errChan chan error) { | |
var payloadJSON []byte | |
var err error | |
if len(params) == 0 { | |
payloadJSON, err = json.Marshal(query) | |
} else { | |
payloadJSON, err = json.Marshal([]interface{}{query, params}) | |
} | |
if err != nil { | |
errChan <- err | |
return | |
} | |
req, err := http.NewRequest("POST", s.api, bytes.NewBuffer(payloadJSON)) | |
if err != nil { | |
errChan <- err | |
return | |
} | |
req.Header.Set("Content-Type", "application/json") | |
resp, err := s.httpClient.Do(req) | |
if err != nil { | |
errChan <- err | |
return | |
} | |
defer resp.Body.Close() | |
if resp.StatusCode != http.StatusOK { | |
body, err := io.ReadAll(resp.Body) | |
if err != nil { | |
errChan <- err | |
return | |
} | |
errChan <- fmt.Errorf("unexpected status code: %d, %s", resp.StatusCode, string(body)) | |
return | |
} | |
decoder := json.NewDecoder(resp.Body) | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
var e Event | |
if err := decoder.Decode(&e); err != nil { | |
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { | |
errChan <- err | |
return | |
} | |
s.logger.Error("failed to decode JSON", err) | |
continue | |
} | |
events <- e | |
} | |
} | |
} | |
func DetectEvent(e Event) string { | |
if e.Eoq != nil { | |
return "eoq" | |
} | |
if len(e.Row) > 0 { | |
return "row" | |
} | |
if len(e.Change) > 0 { | |
return "change" | |
} | |
if len(e.Columns) > 0 { | |
return "columns" | |
} | |
return "unknown" | |
} | |
func scanChangeType[T string](change []interface{}, index int, cType *string) error { | |
changeType, ok := (change)[index].(string) | |
if !ok { | |
return fmt.Errorf("value not string") | |
} | |
*cType = changeType | |
return nil | |
} | |
func scan[T any](row []interface{}, s T) error { | |
structValues := reflect.ValueOf(s).Elem() | |
fieldsToIgnore := 3 | |
for i := 0; i < structValues.NumField()-fieldsToIgnore; i++ { | |
fieldValue := structValues.Field(i) | |
column := row[i] | |
columnValue := reflect.ValueOf(column) | |
if columnValue.Type().ConvertibleTo(fieldValue.Type()) { | |
fieldValue.Set(columnValue.Convert(fieldValue.Type())) | |
} else { | |
return fmt.Errorf("value type mismatch for field %s", structValues.Type().Field(i).Name) | |
} | |
} | |
return nil | |
} | |
func scanRowID[T int](change []interface{}, index int, row *int) error { | |
// Go stores JSON integers as float64 when it unmarshals into interface{} | |
val, ok := (change[index]).(float64) | |
if !ok { | |
return fmt.Errorf("row_id is not a float64") | |
} | |
*row = int(val) | |
return nil | |
} | |
func scanChange[T any](change []interface{}, index int, row T) error { | |
dbRow, ok := (change)[index].([]interface{}) | |
if !ok { | |
return fmt.Errorf("row not array") | |
} | |
return scan[T](dbRow, row) | |
} | |
func ParseRow[T any](ctx context.Context, event Event) (string, int, T, error) { | |
var svc T | |
if err := scanChange(event.Row, 1, &svc); err != nil { | |
return "", 0, svc, errors.Wrap(err, "[row] failed to scan row") | |
} | |
var rowID int | |
if err := scanRowID(event.Row, 0, &rowID); err != nil { | |
return "", 0, svc, errors.Wrap(err, "[row] failed to scan rowid") | |
} | |
return "insert", rowID, svc, nil | |
} | |
func ParseChange[T any](ctx context.Context, event Event) (string, int, T, error) { | |
var svc T | |
if err := scanChange(event.Change, 2, &svc); err != nil { | |
return "", 0, svc, errors.Wrap(err, "[change] failed to scan row") | |
} | |
var rowID int | |
if err := scanRowID(event.Change, 1, &rowID); err != nil { | |
return "", 0, svc, errors.Wrap(err, "[change] failed to scan rowid") | |
} | |
var changeType string | |
if err := scanChangeType(event.Change, 0, &changeType); err != nil { | |
return "", 0, svc, errors.Wrap(err, "[change] failed to scan changeType") | |
} | |
return changeType, rowID, svc, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment