Skip to content

Instantly share code, notes, and snippets.

@soulne4ny
Last active September 22, 2018 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soulne4ny/9ef5becbbc44423b400ff12bb3fdf135 to your computer and use it in GitHub Desktop.
Save soulne4ny/9ef5becbbc44423b400ff12bb3fdf135 to your computer and use it in GitHub Desktop.
stan-0.11.0-extra-redelivery-case.go
package main
import (
"fmt"
"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
nats "github.com/nats-io/go-nats"
stan "github.com/nats-io/go-nats-streaming"
)
func main() {
err := run()
if err != nil {
log.Fatal(err)
}
}
func run() (err error) {
dt, err := strconv.ParseFloat(os.Args[1], 64)
if err != nil {
return
}
nc, err := connectNATS("nats://127.0.0.1:4222")
if err != nil {
return
}
sc, err := connectSTAN(nc)
if err != nil {
return
}
defer func() {
if e := sc.Close(); e == nil {
log.Print("stan.Conn.Close()d")
} else {
log.Print("stan.Conn.Close", e)
}
}()
sub, err := sc.QueueSubscribe(
"test-durable-queue",
"queue-group",
sleepyHandler(time.Duration(dt*float64(time.Second))),
stan.DurableName("test-durable"),
stan.SetManualAckMode(),
stan.MaxInflight(1),
stan.DeliverAllAvailable(),
stan.AckWait(1*time.Second),
)
if err != nil {
return
}
defer func() {
if e := sub.Close(); e == nil {
log.Print("stan.Subscription.Close()d")
} else {
log.Print("stan.Subscription.Close", e)
}
}()
waitSIGINT()
return
}
func sleepyHandler(dt time.Duration) func(*stan.Msg) {
log.Print("sleepyHandler will sleep ", dt)
return func(msg *stan.Msg) {
log.Printf("msg %d sleeping...", msg.Sequence)
time.Sleep(dt)
_ = msg.Ack()
log.Print("ack()ed.")
}
}
func connectSTAN(nc *nats.Conn) (sc stan.Conn, err error) {
clientID := fmt.Sprintf("long-acking-client-%d", os.Getpid())
sc, err = stan.Connect("test-cluster", clientID, stan.NatsConn(nc))
if err != nil {
nc.Close()
err = fmt.Errorf("stan.Connect(): %v", err)
return
}
log.Println("STAN connected.")
return
}
func connectNATS(url string) (nc *nats.Conn, err error) {
opts := []nats.Option{
nats.MaxReconnects(-1),
nats.ReconnectWait(250 * time.Millisecond),
nats.Name("stan-never-ack-in-time"),
}
nc, err = nats.Connect(url, opts...)
if err != nil {
err = fmt.Errorf("nats.Connect(): %v", err)
return
}
log.Println("NATS connected.")
nc.SetErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
log.Print("conn error", err)
})
nc.SetClosedHandler(func(nc *nats.Conn) {
log.Print("conn closed.")
})
nc.SetDisconnectHandler(func(nc *nats.Conn) {
log.Print("disconnect.")
})
nc.SetDiscoveredServersHandler(func(nc *nats.Conn) {
log.Print("discovered.")
})
nc.SetReconnectHandler(func(nc *nats.Conn) {
log.Print("reconnect.")
})
return
}
func waitSIGINT() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Signal(syscall.SIGHUP))
signal.Notify(signalChan, os.Interrupt)
signal.Notify(signalChan, os.Signal(syscall.SIGQUIT))
signal.Notify(signalChan, os.Signal(syscall.SIGTERM))
<-signalChan
}
cluster_id: "test-cluster"
store: "file"
dir: "./data"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment