This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| > bin/kafka-topics.sh --list --zookeeper localhost:2181 | |
| test-topic-1 | |
| test-topic-2 | |
| test-topic-3 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Configuration | |
| public class TopicAdministrator { | |
| private final TopicConfigurations configurations; | |
| private final GenericWebApplicationContext context; | |
| public TopicAdministrator(TopicConfigurations configurations, GenericWebApplicationContext genericContext) { | |
| this.configurations = configurations; | |
| this.context = genericContext; | |
| } | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // kotlin data class (equals/hashcode/copy available) | |
| data class Song (val name: String, val artist: Artist, var length: Int?) | |
| // pojo with 1 of the Song properties for brevity (and no equals/hashcode) | |
| public class Song { | |
| private String name; | |
| public Song(String name) { | |
| this.name = name; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // extension on model from 3rd party library | |
| fun ArtistModel.toStruct() : Struct = Struct(Artist.SCHEMA) | |
| .put(BaseSchema.HREF_FIELD, this.href) | |
| .put(BaseSchema.ID_FIELD, this.id) | |
| .put(BaseSchema.NAME_FIELD, this.name) | |
| // example when creating list of SourceRecords from result set | |
| val results: List<ArtistModel> = client.getResults() | |
| results.map { | |
| SourceRecord(partition, offset, topic, keySchema, it.id, valueSchema, it.toStruct()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| spring: | |
| kafka: | |
| # point the bootstrap servers to the running embedded kafka | |
| bootstrap-servers: ${spring.embedded.kafka.brokers} | |
| consumer: | |
| client-id: test-avro-consumer | |
| group-id: test-avro-group | |
| value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer | |
| producer: | |
| value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Configuration | |
| class MockSerdeConfig { | |
| // KafkaProperties groups all properties prefixed with `spring.kafka` | |
| private KafkaProperties props | |
| MockSerdeConfig(KafkaProperties kafkaProperties) { | |
| props = kafkaProperties | |
| } | |
| /** | |
| * Mock schema registry bean used by Kafka Avro Serde since |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| repositories { | |
| maven { | |
| url "http://packages.confluent.io/maven/" | |
| } | |
| } | |
| compile 'org.springframework.kafka:spring-kafka’ | |
| compile "org.apache.avro:avro” | |
| compile "io.confluent:kafka-avro-serializer | |
| testCompile 'org.springframework.kafka:spring-kafka-test' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @Bean | |
| public KafkaListenerContainerFactory kafkaListenerContainerFactory(RetryTemplate retryTemplate) { | |
| def factory = /** configure factory **/ | |
| // configure the listener container factory with retry support | |
| factory.setRetryTemplate(retryTemplate); | |
| factory.setRecoveryCallback(context -> { | |
| log.error("RetryPolicy limit has been exceeded! You should really handle this better."); | |
| return null; | |
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| AlwaysRetryPolicy: A subclass of NeverRetryPolicy that is mainly used as a base for other policies (e.g. a test stub) | |
| CircuitBreakerRetryPolicy: Trips circuit open after a given number of failures and stays open until a set timeout elapses | |
| CompositeRetryPolicy: mix and match multiple policies (they will be called in the order given) | |
| ExceptionClassifierRetryPolicy: specify different policies for different exception types | |
| ExpressionRetryPolicy: subclass of SimpleRetryPolicy that evaluates an expression against the last thrown exception | |
| NeverRetryPolicy: allows the first attempt but never permits a retry | |
| SimpleRetryPolicy: retry a fixed number of times for a set of named exceptions (and subclasses) | |
| TimeoutRetryPolicy: allows a retry as long as it hasn’t timed out (clock is started on a call to RetryContext) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ExponentialBackOffPolicy: increases back off period exponentially. The initial interval and multiplier are configurable. | |
| ExponentialRandomBackoffPolicy: chooses a random multiple of the interval that would come from a simple deterministic exponential. This has shown to at least be useful in testing scenarios where excessive contention is generated by the test needing many retries. | |
| FixedBackOffPolicy: pauses for a fixed period of time (using Sleeper.sleep(long)) before continuing. | |
| NoBackOffPolicy: performs all retries one after the other without pause | |
| UniformRandomBackOffPolicy: pauses for random period of time before continuing |