Skip to content

Instantly share code, notes, and snippets.

@cfchou
Last active October 24, 2017 10:13
Show Gist options
  • Save cfchou/c2ac4060aaf0fcada38a3d85b3c07a71 to your computer and use it in GitHub Desktop.
Save cfchou/c2ac4060aaf0fcada38a3d85b3c07a71 to your computer and use it in GitHub Desktop.
package main
import (
"gopkg.in/cfchou/go-gentle.v3/gentle"
"context"
"math/rand"
"database/sql"
_ "github.com/mattn/go-sqlite3"
"fmt"
"time"
)
// GameScore implements gentle.Message interface
type GameScore struct {
id string // better to be unique for tracing it in log
Score int
}
// ID is the only method that a gentle.Message must have
func (s GameScore) ID() string {
return s.id
}
// scoreStream is a gentle.Stream that wraps an API call to an external service for
// getting game scores.
// For simple cases that the logic can be defined entirely in a function, we can
// to just declare it to be a gentle.SimpleStream.
var scoreStream gentle.SimpleStream = func(_ context.Context) (gentle.Message, error) {
// simulate a result from an external service
return &GameScore{
id: "",
Score: rand.Intn(100),
}, nil
}
// DbWriter is a gentle.Handler which writes scores to the database.
// Instead of using gentle.SimpleHandler, we define a struct explicitly
// implementing gentle.Handler interface.
type DbWriter struct {
db *sql.DB
table string
}
func (h *DbWriter) Handle(_ context.Context, msg gentle.Message) (gentle.Message, error) {
gameScore := msg.(*GameScore)
statement := fmt.Sprintf("INSERT INTO %s (score, date) VALUES (?, DATETIME());", h.table)
_, err := h.db.Exec(statement, gameScore.Score)
if err != nil {
return nil, err
}
return msg, nil
}
func main() {
db, _ := sql.Open("sqlite3", "scores.sqlite")
defer db.Close()
db.Exec("DROP TABLE IF EXISTS game;")
db.Exec("CREATE TABLE game (score INTEGER, date DATETIME);")
dbWriter := &DbWriter{
db: db,
table: "game",
}
// Rate-limit the queries while allowing burst of some
gentleScoreStream := gentle.NewRateLimitedStream(
gentle.NewRateLimitedStreamOpts("myApp", "rlQuery",
gentle.NewTokenBucketRateLimit(500*time.Millisecond, 5)),
scoreStream)
// Limit concurrent writes to Db
limitedDbWriter := gentle.NewBulkheadHandler(
gentle.NewBulkheadHandlerOpts("myApp", "bkWrite", 16),
dbWriter)
// Constantly backing off when limitedDbWriter returns an error
backoffFactory := gentle.NewConstBackOffFactory(
gentle.NewConstBackOffFactoryOpts(500*time.Millisecond, 5*time.Minute))
gentleDbWriter := gentle.NewRetryHandler(
gentle.NewRetryHandlerOpts("myApp", "rtWrite", backoffFactory),
limitedDbWriter)
// Compose the final Stream
stream := gentle.AppendHandlersStream(gentleScoreStream, gentleDbWriter)
// Keep fetching scores from the remote service to our database.
// The amount of simultaneous go-routines are capped by the size of ticketPool.
ticketPool := make(chan struct{}, 1000)
for {
ticketPool <- struct{}{}
go func() {
defer func(){ <-ticketPool }()
stream.Get(context.Background())
}()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment