Skip to content

Instantly share code, notes, and snippets.

@hlubek

hlubek/main.go Secret

Created July 18, 2023 16:42
Show Gist options
  • Save hlubek/1a667ec6050bea703b58ba0036d26cc9 to your computer and use it in GitHub Desktop.
Save hlubek/1a667ec6050bea703b58ba0036d26cc9 to your computer and use it in GitHub Desktop.
Example application to publish and consume messages to showcase lost messages
package main
import (
"errors"
"fmt"
"os"
"os/signal"
"strconv"
"time"
"github.com/ThreeDotsLabs/go-event-driven/common/log"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"
)
func main() {
app := cli.NewApp()
app.Flags = []cli.Flag{
&cli.StringFlag{
Name: "redis-addr",
Value: "localhost:6379",
EnvVars: []string{"REDIS_ADDR"},
},
&cli.IntFlag{
Name: "redis-db",
Value: 0,
EnvVars: []string{"REDIS_DB"},
},
&cli.IntFlag{
Name: "verbosity",
Value: int(logrus.DebugLevel),
Aliases: []string{"v"},
},
}
app.Before = func(c *cli.Context) error {
logrus.SetLevel(logrus.Level(c.Int("verbosity")))
return nil
}
app.Commands = []*cli.Command{
{
Name: "consume",
Flags: []cli.Flag{
&cli.DurationFlag{
Name: "delay",
Value: 3 * time.Second,
},
&cli.StringFlag{
Name: "worker-id",
},
},
Action: func(c *cli.Context) error {
logger := log.NewWatermill(logrus.NewEntry(logrus.StandardLogger()))
router, err := message.NewRouter(message.RouterConfig{
CloseTimeout: 30 * time.Second,
}, logger)
if err != nil {
return fmt.Errorf("creating router: %w", err)
}
rdb := redis.NewClient(&redis.Options{
Addr: c.String("redis-addr"),
DB: c.Int("redis-db"),
})
sub, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{
Client: rdb,
ConsumerGroup: "worker",
Consumer: c.String("worker-id"),
}, logger)
if err != nil {
return fmt.Errorf("creating subscriber: %w", err)
}
router.AddNoPublisherHandler(
"log-count",
"numbers",
sub,
func(msg *message.Message) (err error) {
logrus.Infof("Handling message %s, taking some time...", msg.UUID)
select {
case <-msg.Context().Done():
return errors.New("message context canceled")
case <-time.After(c.Duration("delay")):
err := rdb.LPush(msg.Context(), "results", string(msg.Payload)).Err()
if err != nil {
return fmt.Errorf("pushing result to redis: %w", err)
}
logrus.
WithField("number", string(msg.Payload)).
Infof("Pushed number done for message %s!", msg.UUID)
return nil
}
},
)
ctx, cancel := signal.NotifyContext(c.Context, os.Interrupt)
defer cancel()
err = router.Run(ctx)
if err != nil {
return fmt.Errorf("running router: %w", err)
}
logrus.Debug("Finished gracefully")
return nil
},
},
{
Name: "publish",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "count",
Value: 10,
},
&cli.DurationFlag{
Name: "interval",
Value: 2 * time.Second,
},
},
Action: func(c *cli.Context) error {
logger := log.NewWatermill(logrus.NewEntry(logrus.StandardLogger()))
rdb := redis.NewClient(&redis.Options{
Addr: c.String("redis-addr"),
DB: c.Int("redis-db"),
})
pub, err := redisstream.NewPublisher(redisstream.PublisherConfig{
Client: rdb,
}, logger)
if err != nil {
return fmt.Errorf("creating publisher: %w", err)
}
for i := 0; i < c.Int("count"); i++ {
select {
case <-c.Context.Done():
return nil
case <-time.After(c.Duration("interval")):
messageUUID := watermill.NewUUID()
err := pub.Publish("numbers", message.NewMessage(messageUUID, []byte(strconv.Itoa(i))))
if err != nil {
return fmt.Errorf("publishing message %s: %w", messageUUID, err)
}
logrus.
WithField("number", i).
Infof("Published message %s", messageUUID)
}
}
return nil
},
},
{
Name: "clean",
Action: func(c *cli.Context) error {
rdb := redis.NewClient(&redis.Options{
Addr: c.String("redis-addr"),
DB: c.Int("redis-db"),
})
err := rdb.Del(c.Context, "results", "numbers").Err()
if err != nil {
return fmt.Errorf("deleting redis keys: %w", err)
}
return nil
},
},
}
err := app.Run(os.Args)
if err != nil {
logrus.Fatal(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment