Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package dispatcher
import (
"crypto/md5"
"fmt"
"os"
"time"
"github.com/go-redis/redis"
"github.com/nats-io/go-nats"
)
type Work struct {
Topic string
Message []byte
}
var WorkQueue = make(chan Work, 1000)
func StartDispatcher() {
// Setup NATS
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL
}
nc, _ := nats.Connect(natsURL)
defer nc.Close()
// Setup Redis
rc := redis.NewClient(&redis.Options{
Addr: os.Getenv("REDIS_ADDR"),
Password: os.Getenv("REDIS_PASS"),
DB: 0,
})
for {
select {
case work := <-WorkQueue:
hash := md5.Sum(work.Message)
key := fmt.Sprintf("%v-%v", work.Topic, hash)
_, err := rc.Get(key).Result()
if err == redis.Nil {
nc.Publish(work.Topic, work.Message)
_, err := rc.Set(key, 1, 600*time.Second).Result()
if err != nil {
fmt.Printf("Something wrong seting redis key: %v", err)
}
} else if err != nil {
fmt.Printf("Error while getting from Redis: %v", err)
} else {
// If we saw the value again reset the expiry
_, err := rc.Set(key, 1, 600*time.Second).Result()
if err != nil {
fmt.Printf("Something wrong seting redis key: %v", err)
}
}
}
}
}
package handlers
import (
"github.com/gin-gonic/gin"
"github.com/regner/albiondata-backend/dispatcher"
)
type ingestPostRequest struct {
Items []string `json:"items"`
}
func IngestMarketOrders(c *gin.Context) {
var incomingRequest ingestPostRequest
c.BindJSON(&incomingRequest)
for _, v := range incomingRequest.Items {
work := dispatcher.Work{
Topic: "market-order",
Message: []byte(v),
}
dispatcher.WorkQueue <- work
}
}
package main
import (
"github.com/gin-gonic/gin"
"github.com/regner/albiondata-backend/dispatcher"
"github.com/regner/albiondata-backend/handlers"
)
func main() {
go dispatcher.StartDispatcher()
r := gin.Default()
r.POST("/api/v1/ingest/", handlers.IngestMarketOrders)
r.Run(":8080")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment