Skip to content

Instantly share code, notes, and snippets.

@jac18281828
Created September 25, 2018 18:25
Show Gist options
  • Save jac18281828/1ce8aee492ec7a24b13298ede7da1310 to your computer and use it in GitHub Desktop.
Save jac18281828/1ce8aee492ec7a24b13298ede7da1310 to your computer and use it in GitHub Desktop.
Embedded Kafka Server based on Kafka-junit. Works with spring dependency injection
import com.salesforce.kafka.test.AbstractKafkaTestResource;
import com.salesforce.kafka.test.KafkaCluster;
import com.salesforce.kafka.test.KafkaTestCluster;
import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Properties;
/**
* Embedded Kafka Service suitable for dependency injection in spring
*/
@Service
@Profile("IT")
@Ignore
class KafkaEmbedded extends AbstractKafkaTestResource<KafkaEmbedded> {
private static final Logger logger = LoggerFactory.getLogger(SharedKafkaTestResource.class);
public KafkaEmbedded() {
}
public KafkaEmbedded(Properties brokerProperties) {
super(brokerProperties);
}
@PostConstruct
public void start() throws Exception {
logger.info("Starting kafka test server");
if (this.getKafkaCluster() != null) {
throw new IllegalStateException("Unknown State! Kafka Test Server already exists!");
} else {
this.setKafkaCluster(new KafkaTestCluster(this.getNumberOfBrokers(), this.getBrokerProperties()));
this.getKafkaCluster().start();
}
}
@PreDestroy
public void shutdown() {
logger.info("Shutting down kafka test server");
if (this.getKafkaCluster() != null) {
try {
this.getKafkaCluster().close();
} catch (Exception var2) {
throw new RuntimeException(var2);
}
this.setKafkaCluster((KafkaCluster)null);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment