Skip to content

Instantly share code, notes, and snippets.

@scottf
Last active March 9, 2022 00:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save scottf/442a042f516cd21c56b04037465175d6 to your computer and use it in GitHub Desktop.
Save scottf/442a042f516cd21c56b04037465175d6 to your computer and use it in GitHub Desktop.
Code to Fix KV discard policy on upgrade to 2.7.2
import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.Nats;
import io.nats.client.api.DiscardPolicy;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.support.NatsKeyValueUtil;
import java.util.List;
public class KvFixDiscardPolicy {
// USAGE NOTES:
// 1. Use whatever connect(...) call you need, for instance if you need user/pass
// This would apply while connecting to the server on line 28
// public static Connection connect(Options options) throws IOException, InterruptedException {
// public static Connection connect(String url) throws IOException, InterruptedException {
// public static Connection connect(String url, AuthHandler handler) throws IOException, InterruptedException {
// public static Connection connect(Options options) throws IOException, InterruptedException {
//
// 2. You may need to use JetStreamOptions when creating your management context, for accounts/prefixes/domains
// This would apply to getting the JetStreamManagement context on line 31.
// JetStreamManagement jetStreamManagement() throws IOException;
// JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException;
public static void main(String[] args) {
try (Connection nc = Nats.connect()) {
// get a JetStreamManagement context
JetStreamManagement jsm = nc.jetStreamManagement();
// get a list of all the stream names and then loop through them all
List<String> streamNames = jsm.getStreamNames();
for (String streamName : streamNames) {
// bucket stream names start with 'KV_'
if (streamName.startsWith("KV_")) {
String bucket = NatsKeyValueUtil.extractBucketName(streamName);
System.out.print("Updating Bucket `" + bucket + "` -> ");
try {
// lookup the stream info which contains the stream config
StreamInfo si = jsm.getStreamInfo(streamName);
// if the policy is already correct, nothing to do
if (si.getConfiguration().getDiscardPolicy() == DiscardPolicy.New) {
System.out.println("Already Updated.");
}
else {
// build a new stream configuration with the old configuration
// plus the changed discard policy.
StreamConfiguration sc = StreamConfiguration.builder(si.getConfiguration())
.discardPolicy(DiscardPolicy.New)
.build();
// update the stream
si = jsm.updateStream(sc);
// double check that the update worked
if (si.getConfiguration().getDiscardPolicy() == DiscardPolicy.New) {
System.out.println("Update Successful");
}
else {
System.out.println("Update Failed Without Exception");
}
}
} catch (JetStreamApiException jsae) {
System.out.println("Update Failed With Exception: ");
jsae.printStackTrace();
}
}
else {
System.out.println("Skipping Non-Kv Stream `" + streamName + "`.");
}
}
}
catch (Exception e) {
System.out.println("Program Failed With Exception.");
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment