Created
February 6, 2022 03:24
-
-
Save UltiRequiem/10795ad9d06b4a87fd764d65763e2931 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net/http" | |
"os" | |
"github.com/yomorun/yomo" | |
"github.com/yomorun/yomo/rx" | |
) | |
// Event for dissect-tracker | |
type Event struct { | |
ID string `json:"eventId"` | |
Type string `json:"eventType"` | |
UserID string `json:"userId"` | |
ClientID string `json:"clientId"` | |
SessionID string `json:"sessionId"` | |
URL string `json:"url"` | |
SourceID *string `json:"sourceId"` | |
Referrer *string `json:"referrer"` | |
UserAgent string `json:"userAgent"` | |
Browser string `json:"browser"` | |
BrowserVersion string `json:"browserVersion"` | |
Os string `json:"os"` | |
IPAddress *string `json:"ipAddress"` | |
Isp *string `json:"isp"` | |
Country *string `json:"country"` | |
TimeZone *string `json:"timeZone"` | |
Timestamp *int `json:"timestamp"` | |
ClientUserID *string `json:"clientUserId"` | |
PageID *string `json:"pageId"` | |
PageType *string `json:"pageType"` | |
Campaign *string `json:"campaign"` | |
Latitude *string `json:"latitude"` | |
Longitude *string `json:"longitude"` | |
ScreenX *int `json:"screenX"` | |
ScreenY *int `json:"screenY"` | |
MouseClientX *int `json:"mouseClientX"` | |
MouseClientY *int `json:"mouseClientY"` | |
MousePageX *int `json:"mousePageX"` | |
MousePageY *int `json:"mousePageY"` | |
Delay *int `json:"delay"` | |
Top *int `json:"top"` | |
Left *int `json:"left"` | |
Xpath *string `json:"xpath"` | |
KeyCode *int `json:"keyCode"` | |
ControlKey *bool `json:"controlKey"` | |
ShiftKey *bool `json:"shiftKey"` | |
AltKey *bool `json:"altKey"` | |
} | |
const batchSize = 10 | |
const bufferTime = 1e3 | |
var ( | |
pandaproxyURL = "" | |
topic = "" | |
) | |
func init() { | |
pandaproxyURL, topic = os.Getenv("PANDAPROXY_URL"), os.Getenv("REDPANDA_TOPIC") | |
if pandaproxyURL == "" { | |
pandaproxyURL = "http://localhost:8082" | |
} | |
if topic == "" { | |
topic = "dissect-events" | |
} | |
} | |
type postData struct { | |
Records []eventDataItem `json:"records"` | |
} | |
type eventDataItem struct { | |
Value interface{} `json:"value"` | |
Partition int `json:"partition"` | |
} | |
// get POST body to Redpanda Proxy | |
func getPostBody(data []interface{}) ([]byte, error) { | |
items := make([]eventDataItem, len((data))) | |
for i, event := range data { | |
b, err := json.Marshal(event) | |
if err != nil { | |
log.Println("json.Marshal the event failed. ", err) | |
} else { | |
items[i] = eventDataItem{ | |
Value: b, | |
Partition: 0, | |
} | |
} | |
} | |
return json.Marshal(postData{ | |
Records: items, | |
}) | |
} | |
// write to Redpanda | |
var produce = func(_ context.Context, v interface{}) (interface{}, error) { | |
data, ok := v.([]interface{}) | |
if !ok { | |
return nil, errors.New("v is not a slice") | |
} | |
postBody, err := getPostBody(data) | |
if err != nil { | |
return nil, err | |
} | |
// post data to Redpanda | |
resp, err := http.Post(fmt.Sprintf("%s/topics/%s", pandaproxyURL, topic), "application/vnd.kafka.binary.v2+json", bytes.NewBuffer(postBody)) | |
if err != nil { | |
log.Fatalln(err) | |
return nil, err | |
} | |
defer resp.Body.Close() | |
// read the response body | |
body, err := ioutil.ReadAll(resp.Body) | |
if err != nil { | |
log.Fatalln(err) | |
return nil, err | |
} | |
log.Printf(string(body)) | |
return fmt.Sprintf("⚡️ write %d items to redpanda successfully", len(data)), nil | |
} | |
// Handler will handle data in Rx way | |
func Handler(rxstream rx.Stream) rx.Stream { | |
return rxstream. | |
Unmarshal(json.Unmarshal, func() interface{} { return &Event{} }). | |
BufferWithTime(bufferTime). | |
Map(produce). | |
StdOut() | |
} | |
func main() { | |
cli, err := yomo.NewOutputConnector(yomo.WithName("RedPanda")).Connect("localhost", 9000) | |
if err != nil { | |
log.Print("❌ Connect to yomo-server failure: ", err) | |
return | |
} | |
defer cli.Close() | |
cli.Run(Handler) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment