Skip to content

Instantly share code, notes, and snippets.

@lukaso
Last active February 2, 2024 11:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukaso/347c8d9afdc6b21ce8fbd15456594480 to your computer and use it in GitHub Desktop.
Save lukaso/347c8d9afdc6b21ce8fbd15456594480 to your computer and use it in GitHub Desktop.
NATS update filters on a consumer (to work around Nack bug where can't update filters on consumers)
// Change filters on a NATS consumer (workaround until nack can support this)
// No warranty. Use at your own risk. Mutates your system so do some tests before going all in.
package main
import (
"context"
"fmt"
"os"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
// Set NATS url
url := os.Getenv("NATS_URL")
if url == "" {
url = nats.DefaultURL
}
// Comma separated list of filters you want to change to
filtersText := os.Getenv("FILTERS")
if filtersText == "" {
fmt.Println("No filters provided (comma separated list of filters)")
return
}
filters := strings.Split(filtersText, ",")
for i, filter := range filters {
filters[i] = strings.TrimSpace(filter)
}
fmt.Printf("Filters: %v %v\n", filters, len(filters))
// Connect to NATS
nc, err := nats.Connect(url)
if err != nil {
panic(err)
}
defer nc.Drain()
js, _ := jetstream.New(nc)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Replace with your stream and consumer names
streamName := "all-requests"
consumerName := "specific-requests"
// Retrieve the consumer configuration
// Get requests stream
stream, _ := js.Stream(ctx, streamName)
consumer, err := stream.Consumer(ctx, consumerName)
if err != nil {
panic(err)
}
consumerInfo, err := consumer.Info(ctx)
if err != nil {
panic(err)
}
consumerConfig := consumerInfo.Config
fmt.Printf("Consumer config : %v\n", consumerConfig)
// Update filters on existing consumer config
consumerConfig.FilterSubjects = filters
fmt.Printf("Consumer config updated: %v\n", consumerConfig)
// Update consumer
_, err = stream.CreateOrUpdateConsumer(ctx, consumerConfig)
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment