Skip to content

Instantly share code, notes, and snippets.

@lburgazzoli
Created March 3, 2021 11:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lburgazzoli/4a73c1b3fcb7291fa2dc72096a168af4 to your computer and use it in GitHub Desktop.
Save lburgazzoli/4a73c1b3fcb7291fa2dc72096a168af4 to your computer and use it in GitHub Desktop.
class CamelSource {
int maxElements;
int timeout;
TimeUnit timeoutUnit;
List<SourceRecord> records;
BlockingQueue<SourceRecord> queue;
@Override
public void start(Map<jString, String> props) {
maxElements = props.get();
timeout = props.get()
timeoutUnit = props.get()
records = new ArrayList<>(maxElements);
queue = new ArrayBlockingQueue<>(maxElements);
context.registry().bind("kafka-connect-record", new KafkaConnectDataFormat())
context.addRoute(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("kamelet:staceppa")
.marshal("kafka-connect-record")
.process(queue::offer)
}
});
}
@Override
public List<SourceRecord> poll() {
final Record record = queue.poll(timeout, timeoutUnit);
if (record != null) {
records.clear();
records.add(record);
queue.drain(records, maxElements - records.size());
return records;
} else {
retun null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment