Skip to content

Instantly share code, notes, and snippets.

@UltiRequiem
Created February 6, 2022 03:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save UltiRequiem/10795ad9d06b4a87fd764d65763e2931 to your computer and use it in GitHub Desktop.
Save UltiRequiem/10795ad9d06b4a87fd764d65763e2931 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"github.com/yomorun/yomo"
"github.com/yomorun/yomo/rx"
)
// Event for dissect-tracker
type Event struct {
ID string `json:"eventId"`
Type string `json:"eventType"`
UserID string `json:"userId"`
ClientID string `json:"clientId"`
SessionID string `json:"sessionId"`
URL string `json:"url"`
SourceID *string `json:"sourceId"`
Referrer *string `json:"referrer"`
UserAgent string `json:"userAgent"`
Browser string `json:"browser"`
BrowserVersion string `json:"browserVersion"`
Os string `json:"os"`
IPAddress *string `json:"ipAddress"`
Isp *string `json:"isp"`
Country *string `json:"country"`
TimeZone *string `json:"timeZone"`
Timestamp *int `json:"timestamp"`
ClientUserID *string `json:"clientUserId"`
PageID *string `json:"pageId"`
PageType *string `json:"pageType"`
Campaign *string `json:"campaign"`
Latitude *string `json:"latitude"`
Longitude *string `json:"longitude"`
ScreenX *int `json:"screenX"`
ScreenY *int `json:"screenY"`
MouseClientX *int `json:"mouseClientX"`
MouseClientY *int `json:"mouseClientY"`
MousePageX *int `json:"mousePageX"`
MousePageY *int `json:"mousePageY"`
Delay *int `json:"delay"`
Top *int `json:"top"`
Left *int `json:"left"`
Xpath *string `json:"xpath"`
KeyCode *int `json:"keyCode"`
ControlKey *bool `json:"controlKey"`
ShiftKey *bool `json:"shiftKey"`
AltKey *bool `json:"altKey"`
}
const batchSize = 10
const bufferTime = 1e3
var (
pandaproxyURL = ""
topic = ""
)
func init() {
pandaproxyURL, topic = os.Getenv("PANDAPROXY_URL"), os.Getenv("REDPANDA_TOPIC")
if pandaproxyURL == "" {
pandaproxyURL = "http://localhost:8082"
}
if topic == "" {
topic = "dissect-events"
}
}
type postData struct {
Records []eventDataItem `json:"records"`
}
type eventDataItem struct {
Value interface{} `json:"value"`
Partition int `json:"partition"`
}
// get POST body to Redpanda Proxy
func getPostBody(data []interface{}) ([]byte, error) {
items := make([]eventDataItem, len((data)))
for i, event := range data {
b, err := json.Marshal(event)
if err != nil {
log.Println("json.Marshal the event failed. ", err)
} else {
items[i] = eventDataItem{
Value: b,
Partition: 0,
}
}
}
return json.Marshal(postData{
Records: items,
})
}
// write to Redpanda
var produce = func(_ context.Context, v interface{}) (interface{}, error) {
data, ok := v.([]interface{})
if !ok {
return nil, errors.New("v is not a slice")
}
postBody, err := getPostBody(data)
if err != nil {
return nil, err
}
// post data to Redpanda
resp, err := http.Post(fmt.Sprintf("%s/topics/%s", pandaproxyURL, topic), "application/vnd.kafka.binary.v2+json", bytes.NewBuffer(postBody))
if err != nil {
log.Fatalln(err)
return nil, err
}
defer resp.Body.Close()
// read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatalln(err)
return nil, err
}
log.Printf(string(body))
return fmt.Sprintf("⚡️ write %d items to redpanda successfully", len(data)), nil
}
// Handler will handle data in Rx way
func Handler(rxstream rx.Stream) rx.Stream {
return rxstream.
Unmarshal(json.Unmarshal, func() interface{} { return &Event{} }).
BufferWithTime(bufferTime).
Map(produce).
StdOut()
}
func main() {
cli, err := yomo.NewOutputConnector(yomo.WithName("RedPanda")).Connect("localhost", 9000)
if err != nil {
log.Print("❌ Connect to yomo-server failure: ", err)
return
}
defer cli.Close()
cli.Run(Handler)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment