Skip to content

Instantly share code, notes, and snippets.

@SpringBootTest
@EmbeddedKafka(ports = [9092])
class KafkaExampleApplicationTests(
@Autowired val embeddedKafkaBroker: EmbeddedKafkaBroker,
@Autowired val adminClient: AdminClient,
@Autowired val topicPurger: TopicPurger
) {
private val topicName = "test.topic"
@Test
@Component
class TopicPurger(private val adminClient: AdminClient) {
fun purge(topicName: String) {
val topicDescription = adminClient.describeTopics(listOf(topicName)).all().get()[topicName]
val partitionSize = topicDescription?.partitions()?.size
val recordsToDelete = (0..partitionSize!!).associate { partitionIndex ->
TopicPartition(topicName, partitionIndex) to RecordsToDelete.beforeOffset(-1)
}
adminClient.deleteRecords(recordsToDelete)
}
@Configuration
class AppConfig {
@Value(value = "\${kafka.bootstrapAddress}")
lateinit var bootstrapAddress: String
@Bean
fun adminClient(): AdminClient {
return AdminClient.create(mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapAddress)
@mjedynak
mjedynak / kafka_spring_deps.txt
Last active February 12, 2022 11:26
kafka_spring_deps
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.kafka:spring-kafka")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.awaitility:awaitility-kotlin:4.1.1")
public class CityRetrieverWithOptional {
private final PersonService personService;
public Optional<String> retrieveCity(String name) {
return Optional.ofNullable(personService.getByName(name))
.map(Person::getAddress)
.map(Address::getCity);
}
}
public class CityRetriever {
private final PersonService personService;
public String retrieveCity(String name) {
Person person = personService.getByName(name);
if (person != null) {
Address address = person.getAddress();
if (address != null) {
if (address.getCity() != null) {
return address.getCity();
public interface PersonService {
Person getByName(String name);
}
public class Person {
private final String name;
private final int age;
private final Address address;
// ...
}
public class App {
private static final Logger logger = LoggerFactory.getLogger(App.class.getName());
public static void main(String[] args) {
invoke(new DefaultService());
logger.debug("----------------------\n");
invoke(new LazyInitService());
logger.debug("----------------------\n");
invoke(LazyInitWithProxyServiceFactory.createService());
logger.debug("----------------------\n");
public class LazyInitWithProxyThreadSafeServiceFactory {
public static Service createService() {
return Reflection.newProxy(Service.class, new ThreadSafeLazyInitInvocationHandler());
}
private static class ThreadSafeLazyInitInvocationHandler extends AbstractInvocationHandler {
private static final LazyInitializer<Service> initializer = new LazyInitializer<Service>() {
@Override
protected Service initialize() {
public class LazyInitWithProxyServiceFactory {
public static Service createService() {
return Reflection.newProxy(Service.class, new LazyInitInvocationHandler());
}
private static class LazyInitInvocationHandler extends AbstractInvocationHandler {
private Service service;
@Override