Skip to content

Instantly share code, notes, and snippets.

@mrsufgi
Created December 6, 2021 09:31
Show Gist options
  • Save mrsufgi/99ad8f697c2d2894b9904067d28223e5 to your computer and use it in GitHub Desktop.
Save mrsufgi/99ad8f697c2d2894b9904067d28223e5 to your computer and use it in GitHub Desktop.
func newEventCreatedTask(ctx context.Context, s domain.Event) (*asynq.Task, error) {
payload := map[string]interface{}{"event_id": s.ID, "payload": string(s.Payload)} // some type for payload
task, err := domain.NewTask(ctx, payload)
if err != nil {
log.Error("unable to create new task: %s", err.Error())
return nil, err
}
// transform to raw data
data, err := json.Marshal(*task)
if err != nil {
log.Errorf("unable to unmarshal task: %s", err.Error())
return nil, err
}
return asynq.NewTask("event:created", data), nil
}
func (es *EventsService) CreateEvent(ctx context.Context, event SomeInput) error {
task, err := newEventCreatedTask(ctx, domain.Event{ID: uuid.NewString(), Payload: json.RawMessage(*event)})
if err != nil {
log.Errorf("could not create task: %v", err)
return err
}
res, err := es.ac.EnqueueContext(ctx, task, "some-queue")
if err != nil {
log.Errorf("could not enqueue task: %v", err)
return err
}
log.Debugf("new event created task enqueued: %#v", res.ID)
return nil
}
type CustomClientOpt struct {
// Network type to use, either tcp or unix.
// Default is tcp.
Network string
// Redis server address in "host:port" format.
Addr string
// Username to authenticate the current connection when Redis ACLs are used.
// See: https://redis.io/commands/auth.
Username string
// Password to authenticate the current connection.
// See: https://redis.io/commands/auth.
Password string
// Redis DB to select after connecting to a server.
// See: https://redis.io/commands/select.
DB int
// Dial timeout for establishing new connections.
// Default is 5 seconds.
DialTimeout time.Duration
// Timeout for socket reads.
// If timeout is reached, read commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is 3 seconds.
ReadTimeout time.Duration
// Timeout for socket writes.
// If timeout is reached, write commands will fail with a timeout error
// instead of blocking.
//
// Use value -1 for no timeout and 0 for default.
// Default is ReadTimout.
WriteTimeout time.Duration
// Maximum number of socket connections.
// Default is 10 connections per every CPU as reported by runtime.NumCPU.
PoolSize int
// TLS Config used to connect to a server.
// TLS will be negotiated only if this field is set.
TLSConfig *tls.Config
}
// nolint
func (opt CustomClientOpt) MakeRedisClient() interface{} {
rdb := redis.NewClient(&redis.Options{
Network: opt.Network,
Addr: opt.Addr,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
TLSConfig: opt.TLSConfig,
})
rdb.AddHook(redisotel.TracingHook{})
return rdb
}
type Task struct {
OtelContext propagation.MapCarrier `json:"otel_context"`
Payload string `json:"payload"`
}
func NewTask(ctx context.Context, p interface{}) (*Task, error) {
payload, err := json.Marshal(p)
if err != nil {
return nil, err
}
task := &Task{
Payload: string(payload),
OtelContext: make(propagation.MapCarrier),
}
otel.GetTextMapPropagator().Inject(ctx, &task.OtelContext)
return task, nil
}
func NewEventHandler(m *asynq.ServeMux, s someService) *SnapshotsHandler {
handler := &Handler{
Service: s,
}
// register handlers
m.HandleFunc("event:created", handler.HandleEvent)
return handler
}
func (vh *SnapshotsHandler) HandleEventCreatedTask(ctx context.Context, t *asynq.Task) error {
var payload domain.Task
if err := json.Unmarshal(t.Payload(), &payload); err != nil {
return err
}
var taskPayload map[string]interface{} // TODO: use type!
if err := json.Unmarshal([]byte(payload.Payload), &taskPayload); err != nil {
return err
}
// do something with payload...
}
r := setupRedis()
rc := asynq.NewClient(r)
defer rc.Close()
...
mux := asynq.NewServeMux()
mux.Use(TaskTracingMiddleware)
...
func setupRedis() asynq.RedisConnOpt {
return domain.CustomClientOpt{Addr: fmt.Sprintf("%s:%s", viper.GetString("REDIS_HOST"), viper.GetString("REDIS_PORT"))}
}
var tracer = otel.Tracer("asynq/tasks")
func TaskTracingMiddleware(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
var task domain.Task
err := json.Unmarshal(t.Payload(), &task)
if err != nil {
log.Fatalf("error with unmarshaling:", err.Error())
return err
}
// extract the context from the task
ctx = otel.GetTextMapPropagator().Extract(ctx, &task.OtelContext)
ctx, span := tracer.Start(ctx, fmt.Sprintf("middleware-task-%s", t.Type()))
defer span.End()
return h.ProcessTask(ctx, t)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment