Last active
February 2, 2024 11:24
-
-
Save lukaso/347c8d9afdc6b21ce8fbd15456594480 to your computer and use it in GitHub Desktop.
NATS update filters on a consumer (to work around Nack bug where can't update filters on consumers)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Change filters on a NATS consumer (workaround until nack can support this) | |
// No warranty. Use at your own risk. Mutates your system so do some tests before going all in. | |
package main | |
import ( | |
"context" | |
"fmt" | |
"os" | |
"strings" | |
"time" | |
"github.com/nats-io/nats.go" | |
"github.com/nats-io/nats.go/jetstream" | |
) | |
func main() { | |
// Set NATS url | |
url := os.Getenv("NATS_URL") | |
if url == "" { | |
url = nats.DefaultURL | |
} | |
// Comma separated list of filters you want to change to | |
filtersText := os.Getenv("FILTERS") | |
if filtersText == "" { | |
fmt.Println("No filters provided (comma separated list of filters)") | |
return | |
} | |
filters := strings.Split(filtersText, ",") | |
for i, filter := range filters { | |
filters[i] = strings.TrimSpace(filter) | |
} | |
fmt.Printf("Filters: %v %v\n", filters, len(filters)) | |
// Connect to NATS | |
nc, err := nats.Connect(url) | |
if err != nil { | |
panic(err) | |
} | |
defer nc.Drain() | |
js, _ := jetstream.New(nc) | |
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | |
defer cancel() | |
// Replace with your stream and consumer names | |
streamName := "all-requests" | |
consumerName := "specific-requests" | |
// Retrieve the consumer configuration | |
// Get requests stream | |
stream, _ := js.Stream(ctx, streamName) | |
consumer, err := stream.Consumer(ctx, consumerName) | |
if err != nil { | |
panic(err) | |
} | |
consumerInfo, err := consumer.Info(ctx) | |
if err != nil { | |
panic(err) | |
} | |
consumerConfig := consumerInfo.Config | |
fmt.Printf("Consumer config : %v\n", consumerConfig) | |
// Update filters on existing consumer config | |
consumerConfig.FilterSubjects = filters | |
fmt.Printf("Consumer config updated: %v\n", consumerConfig) | |
// Update consumer | |
_, err = stream.CreateOrUpdateConsumer(ctx, consumerConfig) | |
if err != nil { | |
panic(err) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment