Last active
March 9, 2022 00:02
-
-
Save scottf/442a042f516cd21c56b04037465175d6 to your computer and use it in GitHub Desktop.
Code to Fix KV discard policy on upgrade to 2.7.2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.nats.client.Connection; | |
import io.nats.client.JetStreamApiException; | |
import io.nats.client.JetStreamManagement; | |
import io.nats.client.Nats; | |
import io.nats.client.api.DiscardPolicy; | |
import io.nats.client.api.StreamConfiguration; | |
import io.nats.client.api.StreamInfo; | |
import io.nats.client.support.NatsKeyValueUtil; | |
import java.util.List; | |
public class KvFixDiscardPolicy { | |
// USAGE NOTES: | |
// 1. Use whatever connect(...) call you need, for instance if you need user/pass | |
// This would apply while connecting to the server on line 28 | |
// public static Connection connect(Options options) throws IOException, InterruptedException { | |
// public static Connection connect(String url) throws IOException, InterruptedException { | |
// public static Connection connect(String url, AuthHandler handler) throws IOException, InterruptedException { | |
// public static Connection connect(Options options) throws IOException, InterruptedException { | |
// | |
// 2. You may need to use JetStreamOptions when creating your management context, for accounts/prefixes/domains | |
// This would apply to getting the JetStreamManagement context on line 31. | |
// JetStreamManagement jetStreamManagement() throws IOException; | |
// JetStreamManagement jetStreamManagement(JetStreamOptions options) throws IOException; | |
public static void main(String[] args) { | |
try (Connection nc = Nats.connect()) { | |
// get a JetStreamManagement context | |
JetStreamManagement jsm = nc.jetStreamManagement(); | |
// get a list of all the stream names and then loop through them all | |
List<String> streamNames = jsm.getStreamNames(); | |
for (String streamName : streamNames) { | |
// bucket stream names start with 'KV_' | |
if (streamName.startsWith("KV_")) { | |
String bucket = NatsKeyValueUtil.extractBucketName(streamName); | |
System.out.print("Updating Bucket `" + bucket + "` -> "); | |
try { | |
// lookup the stream info which contains the stream config | |
StreamInfo si = jsm.getStreamInfo(streamName); | |
// if the policy is already correct, nothing to do | |
if (si.getConfiguration().getDiscardPolicy() == DiscardPolicy.New) { | |
System.out.println("Already Updated."); | |
} | |
else { | |
// build a new stream configuration with the old configuration | |
// plus the changed discard policy. | |
StreamConfiguration sc = StreamConfiguration.builder(si.getConfiguration()) | |
.discardPolicy(DiscardPolicy.New) | |
.build(); | |
// update the stream | |
si = jsm.updateStream(sc); | |
// double check that the update worked | |
if (si.getConfiguration().getDiscardPolicy() == DiscardPolicy.New) { | |
System.out.println("Update Successful"); | |
} | |
else { | |
System.out.println("Update Failed Without Exception"); | |
} | |
} | |
} catch (JetStreamApiException jsae) { | |
System.out.println("Update Failed With Exception: "); | |
jsae.printStackTrace(); | |
} | |
} | |
else { | |
System.out.println("Skipping Non-Kv Stream `" + streamName + "`."); | |
} | |
} | |
} | |
catch (Exception e) { | |
System.out.println("Program Failed With Exception."); | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment