-
-
Save hlubek/1a667ec6050bea703b58ba0036d26cc9 to your computer and use it in GitHub Desktop.
Example application to publish and consume messages to showcase lost messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"os" | |
"os/signal" | |
"strconv" | |
"time" | |
"github.com/ThreeDotsLabs/go-event-driven/common/log" | |
"github.com/ThreeDotsLabs/watermill" | |
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream" | |
"github.com/ThreeDotsLabs/watermill/message" | |
"github.com/redis/go-redis/v9" | |
"github.com/sirupsen/logrus" | |
"github.com/urfave/cli/v2" | |
) | |
func main() { | |
app := cli.NewApp() | |
app.Flags = []cli.Flag{ | |
&cli.StringFlag{ | |
Name: "redis-addr", | |
Value: "localhost:6379", | |
EnvVars: []string{"REDIS_ADDR"}, | |
}, | |
&cli.IntFlag{ | |
Name: "redis-db", | |
Value: 0, | |
EnvVars: []string{"REDIS_DB"}, | |
}, | |
&cli.IntFlag{ | |
Name: "verbosity", | |
Value: int(logrus.DebugLevel), | |
Aliases: []string{"v"}, | |
}, | |
} | |
app.Before = func(c *cli.Context) error { | |
logrus.SetLevel(logrus.Level(c.Int("verbosity"))) | |
return nil | |
} | |
app.Commands = []*cli.Command{ | |
{ | |
Name: "consume", | |
Flags: []cli.Flag{ | |
&cli.DurationFlag{ | |
Name: "delay", | |
Value: 3 * time.Second, | |
}, | |
&cli.StringFlag{ | |
Name: "worker-id", | |
}, | |
}, | |
Action: func(c *cli.Context) error { | |
logger := log.NewWatermill(logrus.NewEntry(logrus.StandardLogger())) | |
router, err := message.NewRouter(message.RouterConfig{ | |
CloseTimeout: 30 * time.Second, | |
}, logger) | |
if err != nil { | |
return fmt.Errorf("creating router: %w", err) | |
} | |
rdb := redis.NewClient(&redis.Options{ | |
Addr: c.String("redis-addr"), | |
DB: c.Int("redis-db"), | |
}) | |
sub, err := redisstream.NewSubscriber(redisstream.SubscriberConfig{ | |
Client: rdb, | |
ConsumerGroup: "worker", | |
Consumer: c.String("worker-id"), | |
}, logger) | |
if err != nil { | |
return fmt.Errorf("creating subscriber: %w", err) | |
} | |
router.AddNoPublisherHandler( | |
"log-count", | |
"numbers", | |
sub, | |
func(msg *message.Message) (err error) { | |
logrus.Infof("Handling message %s, taking some time...", msg.UUID) | |
select { | |
case <-msg.Context().Done(): | |
return errors.New("message context canceled") | |
case <-time.After(c.Duration("delay")): | |
err := rdb.LPush(msg.Context(), "results", string(msg.Payload)).Err() | |
if err != nil { | |
return fmt.Errorf("pushing result to redis: %w", err) | |
} | |
logrus. | |
WithField("number", string(msg.Payload)). | |
Infof("Pushed number done for message %s!", msg.UUID) | |
return nil | |
} | |
}, | |
) | |
ctx, cancel := signal.NotifyContext(c.Context, os.Interrupt) | |
defer cancel() | |
err = router.Run(ctx) | |
if err != nil { | |
return fmt.Errorf("running router: %w", err) | |
} | |
logrus.Debug("Finished gracefully") | |
return nil | |
}, | |
}, | |
{ | |
Name: "publish", | |
Flags: []cli.Flag{ | |
&cli.IntFlag{ | |
Name: "count", | |
Value: 10, | |
}, | |
&cli.DurationFlag{ | |
Name: "interval", | |
Value: 2 * time.Second, | |
}, | |
}, | |
Action: func(c *cli.Context) error { | |
logger := log.NewWatermill(logrus.NewEntry(logrus.StandardLogger())) | |
rdb := redis.NewClient(&redis.Options{ | |
Addr: c.String("redis-addr"), | |
DB: c.Int("redis-db"), | |
}) | |
pub, err := redisstream.NewPublisher(redisstream.PublisherConfig{ | |
Client: rdb, | |
}, logger) | |
if err != nil { | |
return fmt.Errorf("creating publisher: %w", err) | |
} | |
for i := 0; i < c.Int("count"); i++ { | |
select { | |
case <-c.Context.Done(): | |
return nil | |
case <-time.After(c.Duration("interval")): | |
messageUUID := watermill.NewUUID() | |
err := pub.Publish("numbers", message.NewMessage(messageUUID, []byte(strconv.Itoa(i)))) | |
if err != nil { | |
return fmt.Errorf("publishing message %s: %w", messageUUID, err) | |
} | |
logrus. | |
WithField("number", i). | |
Infof("Published message %s", messageUUID) | |
} | |
} | |
return nil | |
}, | |
}, | |
{ | |
Name: "clean", | |
Action: func(c *cli.Context) error { | |
rdb := redis.NewClient(&redis.Options{ | |
Addr: c.String("redis-addr"), | |
DB: c.Int("redis-db"), | |
}) | |
err := rdb.Del(c.Context, "results", "numbers").Err() | |
if err != nil { | |
return fmt.Errorf("deleting redis keys: %w", err) | |
} | |
return nil | |
}, | |
}, | |
} | |
err := app.Run(os.Args) | |
if err != nil { | |
logrus.Fatal(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment