Last active
February 8, 2022 18:08
-
-
Save scottf/80a9647cf7f60d69dd2d56d030ea1015 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.nats.client.*; | |
import io.nats.client.api.ConsumerConfiguration; | |
import io.nats.client.api.StorageType; | |
import io.nats.client.api.StreamConfiguration; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.List; | |
class FetchTests { | |
private static final String URL = "nats://localhost:4222"; | |
private static final String STREAM_NAME = "TestStream"; | |
private static final String SUBJECT = "test"; | |
private static final String DURABLE = "testDurable"; | |
private static final long DELAY = 10; // seconds | |
private static final int PUB_COUNT = 200; | |
public static void main(String[] args) { | |
try (Connection conn = Nats.connect(URL)) { | |
JetStreamManagement jsm = conn.jetStreamManagement();; | |
JetStream js = conn.jetStream(); | |
// make sure the stream is gone before starting. delete exceptions when stream does not exist | |
log("Testing with JetStream publishes..."); | |
resetStream(jsm); | |
execute(jsm, js, FetchTests::jsPublishAfterDelay); | |
log("Testing with core publishes..."); | |
resetStream(jsm); | |
execute(jsm, js, FetchTests::corePublishAfterDelay); | |
} | |
catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
private static void execute(JetStreamManagement jsm, JetStream js, Runnable runnable) throws IOException, JetStreamApiException { | |
// Create the consumer | |
ConsumerConfiguration cc = | |
ConsumerConfiguration.builder() | |
.durable(DURABLE) | |
.ackWait(300) | |
.maxAckPending(2000) | |
.build(); | |
jsm.addOrUpdateConsumer(STREAM_NAME, cc); | |
// make the pull subscription | |
PullSubscribeOptions so = PullSubscribeOptions.builder().durable(DURABLE).build(); | |
JetStreamSubscription sub = js.subscribe(SUBJECT, so); | |
// start the publisher | |
Thread publisher = new Thread(runnable); | |
publisher.start(); | |
// Start fetch before messages are published | |
log("Before-1: Starting Fetch..."); | |
List<Message> messages = sub.fetch(10, Duration.ofSeconds(DELAY * 3)); | |
log("Before-2: Fetched message: " + messages.size()); | |
// Start fetch after messages are published | |
log("After-1: Starting Fetch..."); | |
List<Message> messages2 = sub.fetch(PUB_COUNT, Duration.ofSeconds(DELAY * 30)); | |
log("After-2: Fetched messages second try: " + messages2.size()); | |
} | |
// This publisher uses JetStream to publish messages to a subject | |
private static void jsPublishAfterDelay() { | |
try (Connection conn = Nats.connect(URL)) { | |
JetStream js = conn.jetStream(); | |
// Wait DELAY seconds so we publish in the middle of the first fetch | |
log("Sleep before publish..."); | |
Thread.sleep(DELAY * 1000); | |
log("About to publish..."); | |
for (int i = 0; i < PUB_COUNT; i++) { | |
String str = "Test" + i; | |
js.publish(SUBJECT, str.getBytes()); | |
} | |
log("Done publishing"); | |
} catch (Exception e) { | |
log("Failed loading messages into nats! " + e); | |
} | |
} | |
// This publisher uses core publish to publish messages to a subject | |
// only use core if you don't care about PublishAcks | |
private static void corePublishAfterDelay() { | |
try (Connection conn = Nats.connect(URL)) { | |
// Wait DELAY seconds so we publish in the middle of the first fetch | |
log("Sleep before publish..."); | |
Thread.sleep(DELAY); | |
log("About to publish..."); | |
for (int i = 0; i < PUB_COUNT; i++) { | |
String str = "Test" + i; | |
conn.publish(SUBJECT, str.getBytes()); | |
} | |
log("Done publishing"); | |
} catch (Exception e) { | |
log("Failed loading messages into nats! " + e); | |
} | |
} | |
private static void resetStream(JetStreamManagement jsm) throws IOException, JetStreamApiException { | |
try { jsm.deleteStream(STREAM_NAME); } catch (Exception ignore) {} | |
// Create the stream | |
StreamConfiguration streamConfig = | |
StreamConfiguration.builder() | |
.name(STREAM_NAME) | |
.storageType(StorageType.File) | |
.subjects(SUBJECT) | |
.maxAge(Duration.ofDays(3)) | |
// .noAck(false) // default is already false | |
.build(); | |
jsm.addStream(streamConfig); | |
} | |
private static void log(String s) { | |
System.out.println("" + System.currentTimeMillis() + " [" + Thread.currentThread().getName() + "] " + s); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment