Skip to content

Instantly share code, notes, and snippets.

@anuchandy
Created May 31, 2023 23:22
Show Gist options
  • Save anuchandy/9b46fb005e783f16fad38e04c1f7ed4a to your computer and use it in GitHub Desktop.
Save anuchandy/9b46fb005e783f16fad38e04c1f7ed4a to your computer and use it in GitHub Desktop.
PublishEventListUsingBatch.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static java.nio.charset.StandardCharsets.UTF_8;
public class PublishEventListUsingBatch {
private static final EventData END = new EventData();
public static void main(String[] args) {
String connectionString = "<connection-string>";
Mono<Void> publish = Mono.using(() -> {
return new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();
}, producer -> {
final List<EventData> events = getEvents();
return publishEventList(producer, new CreateBatchOptions(), events);
}, producer -> producer.close());
publish.block();
}
private static Mono<Void> publishEventList(EventHubProducerAsyncClient producer,
CreateBatchOptions options, List<EventData> events) {
final Iterator<EventData> eventItr = events.iterator();
if (eventItr.hasNext()) {
return publishNextBatch(producer, options, eventItr.next(), eventItr);
} else {
return Mono.empty();
}
}
private static Mono<Void> publishNextBatch(EventHubProducerAsyncClient producer,
CreateBatchOptions options, EventData first, Iterator<EventData> eventsItr) {
return producer.createBatch(options).flatMap(batch -> {
EventData next = first;
do {
if (!batch.tryAdd(next)) {
if (next == first) {
return Mono.error(
new IllegalArgumentException("The event " + first + " is too big to send even in a Batch."));
}
return producer.send(batch).then(Mono.just(next));
}
if (eventsItr.hasNext()) {
next = eventsItr.next();
} else {
return producer.send(batch).then(Mono.just(END));
}
} while (true);
}).flatMap(missed -> {
if (missed == END) {
return Mono.empty();
} else {
return publishNextBatch(producer, options, missed, eventsItr);
}
});
}
private static List<EventData> getEvents() {
final List<EventData> events = new ArrayList<>();
events.add(new EventData("Roast beef".getBytes(UTF_8)));
events.add(new EventData("Cheese".getBytes(UTF_8)));
events.add(new EventData("Tofu".getBytes(UTF_8)));
events.add(new EventData("Turkey".getBytes(UTF_8)));
return events;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment