Skip to content

Instantly share code, notes, and snippets.

View diogodanielsoaresferreira's full-sized avatar
🚀

Diogo Ferreira diogodanielsoaresferreira

🚀
View GitHub Profile
//...
rest("/api").get("/hello").to("direct:hello");
from("direct:hello").transform().constant("Hello world!");
//...
//...
errorHandler(deadLetterChannel("kafka:dead_letter?brokers=localhost:9092")
.useOriginalMessage());
//...
//...
from("timer://foo?period=5000").log("timer");
//...
public class CamelRoutes extends EndpointRouteBuilder {
public void configure() {
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.unmarshal().json(UserMessage.class)
.filter(simple("${body.type} == 'chat'"))
.to("seda:incoming_event");
from("seda:incoming_event?multipleConsumers=true")
//...
from("kafka:input_topic?brokers=localhost:9092&groupId=group1")
.unmarshal().json(UserMessage.class)
.filter(simple("${body.type} == 'chat'"))
.log("${body}")
.split(simple("${body.devices}"))
.log("${body}")
.end()
.marshal().json()
.to("kafka:output_topic?brokers=localhost:9092")
from("kafka:input_topic?brokers=localhost:9092&groupId=group2")
.unmarshal().json(UserMessage.class)
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.bean(NLPUtils.class, "createUserMessages")
.log("${body}");
public class NLPUtils {
public static UserMessages createUserMessages(final CombinedUserMessage event) {
return UserMessages.builder()
.emitter(event.getEmitter())
.text(String.join(". ", event.getText()))
.build();
}
}
from("kafka:input_topic?brokers=localhost:9092&groupId=group2")
.unmarshal().json(UserMessage.class)
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.bean(NLPUtils.class, "createTranscription")
.log("${body}");
public class CombinedUserMessagesAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
final UserMessage body = newExchange.getIn().getBody(UserMessage.class);
final CombinedUserMessage newEventBody = CombinedUserMessage.builder()
.emitter(body.getEmitter())
.text(List.of(body.getText()))
.build();
newExchange.getIn().setBody(newEventBody);
//...
from("kafka:input_topic?brokers=localhost:9092&groupId=group2")
.unmarshal().json(UserMessage.class)
.aggregate(simple("${body.emitter}"), new CombinedUserMessagesAggregationStrategy())
.completionInterval(5000)
.log("${body}");
//...