Skip to content

Instantly share code, notes, and snippets.

@oscerd
Created March 31, 2023 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oscerd/e160575b92df13ae43c66f3b5473636e to your computer and use it in GitHub Desktop.
Save oscerd/e160575b92df13ae43c66f3b5473636e to your computer and use it in GitHub Desktop.
template:
beans:
- name: kafkaHeaderDeserializer
type: "#class:org.apache.camel.kamelets.utils.serialization.kafka.KafkaHeaderDeserializer"
property:
- key: enabled
value: '{{deserializeHeaders}}'
- name: myAggregatorStrategy
type: "#class:org.apache.camel.processor.aggregate.StringAggregationStrategy"
property:
- key: delimiter
value: ' - '
from:
uri: "kafka:{{topic}}"
parameters:
brokers: "{{bootstrapServers}}"
autoCommitEnable: "{{autoCommitEnable}}"
allowManualCommit: "{{allowManualCommit}}"
pollOnError: "{{pollOnError}}"
autoOffsetReset: "{{autoOffsetReset}}"
groupId: "{{?consumerGroup}}"
steps:
- aggregate:
aggregation-strategy: "{{myAggregatorStrategy}}"
aggregation-repository: "#class:org.apache.camel.processor.aggregate.MemoryAggregationRepository"
completion-size: 10
correlation-expression:
simple: "${headers[kafka.KEY]}"
- process:
ref: "{{kafkaHeaderDeserializer}}"
- choice:
when:
- simple: '${exchangeProperty.CamelAggregationCompleteCurrentGroup} != null'
steps:
- log: "Daje"
- to: "kamelet:sink"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment