Skip to content

Instantly share code, notes, and snippets.

@bitkid
Created December 18, 2017 19:23
Show Gist options
  • Save bitkid/0ccc45d8d58dcf2b1ca88f9f9ed9dec6 to your computer and use it in GitHub Desktop.
Save bitkid/0ccc45d8d58dcf2b1ca88f9f9ed9dec6 to your computer and use it in GitHub Desktop.
public void testPullBatch() throws IOException, ExecutionException, InterruptedException {
pubsub.createTopic(PROJECT, TOPIC).get();
pubsub.createSubscription(PROJECT, SUBSCRIPTION, TOPIC).get();
Publisher publisher = Publisher.builder().project(PROJECT).pubsub(pubsub).batchSize(10).maxLatencyMs(1000).build();
final List<Message> messages = ImmutableList.of(Message.ofEncoded("m0"),
Message.ofEncoded("m1"),
Message.ofEncoded("m2"));
publisher.publish(TOPIC, messages.get(0));
publisher.publish(TOPIC, messages.get(1));
publisher.publish(TOPIC, messages.get(2));
publisher.close();
publisher.closeFuture().get();
final Map<String, ReceivedMessage> received = new HashMap<>();
// Pull until we've received 3 messages or time out. Store received messages in a map as they might be out of order.
final long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
while (true) {
final List<ReceivedMessage> response = pubsub.pull(PROJECT, SUBSCRIPTION).get();
for (final ReceivedMessage message : response) {
received.put(message.message().messageId().get(), message);
}
if (received.size() >= 3) {
break;
}
if (System.nanoTime() > deadlineNanos) {
fail("timeout");
}
}
// Verify received messages
assertThat(received.size(), is(3));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment