Skip to content

Instantly share code, notes, and snippets.

@double16
Last active June 28, 2018 21:15
Show Gist options
  • Save double16/eb8eba2597d7a039f8044816f2c8f092 to your computer and use it in GitHub Desktop.
Save double16/eb8eba2597d7a039f8044816f2c8f092 to your computer and use it in GitHub Desktop.
Kafka Producer Integration Testing
class ExampleSpec extends spock.lang.Specification {
LoggingProducerOutput output
void setup() {
output = new LoggingProducerOutput(logPath: Paths.get('/tmp/producer.log')).withEmpty()
}
void "records should be produced"() {
when: "a record is produced"
String key = "the_record_key"
// ...
then: "record is found in log"
output.find { it.key == key }
}
}
import groovy.util.logging.Slf4j
import org.apache.kafka.clients.producer.ProducerInterceptor
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.RecordMetadata
import org.yaml.snakeyaml.DumperOptions
import org.yaml.snakeyaml.Yaml
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
/**
* Logs messages sent to a Kafka producer to a YAML file for use by test verifications. This class is
* not intended for use in production.
* Use configuration "interceptor.LoggingProducerInterceptor.file" to specify the output file, otherwise a
* temporary unique file name will be chosen.
*/
@Slf4j
class LoggingProducerInterceptor implements ProducerInterceptor {
static ThreadLocal<Yaml> YAML = new ThreadLocal<Yaml>() {
@Override
protected Yaml initialValue() {
new Yaml(new DumperOptions(defaultFlowStyle: DumperOptions.FlowStyle.FLOW))
}
}
Path logPath
@Override
void configure(Map<String, ?> configs) {
String configuredLogPath = configs.get("interceptor.${LoggingProducerInterceptor.simpleName}.file".toString())
logPath = configuredLogPath ? Paths.get(configuredLogPath) : Files.createTempFile('producer', '.log')
try {
logPath.withWriterAppend {
it.write('- ')
YAML.get().dump([configs: configs], it)
}
} catch (IOException e) {
// ignore, in case of multiple threads
log.debug("Writing configs to ${logPath}", e)
}
}
@Override
ProducerRecord onSend(ProducerRecord record) {
int retries = 3
while (retries-- > 0) {
try {
logPath.withWriterAppend {
it.write('- ')
YAML.get().dump([thread: Thread.currentThread().id, key: record.key(), value: record.value()], it)
}
retries = 0
} catch (IOException e) {
// ignore and retry, in case of multiple threads
log.debug("Writing producer record to ${logPath}", e)
Thread.sleep(100)
}
}
return record
}
@Override
void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// nothing to do
}
@Override
void close() {
Files.delete(logPath)
}
}
import groovy.transform.AutoClone
import org.yaml.snakeyaml.DumperOptions
import org.yaml.snakeyaml.Yaml
import java.nio.file.Path
/**
* Provides the output of LoggingProducerInterceptor as an Iterable so Groovy constructs can be used to filter and
* process the YAML output.
*/
@AutoClone
class LoggingProducerOutput implements Iterable {
static ThreadLocal<Yaml> YAML = new ThreadLocal<Yaml>() {
@Override
protected Yaml initialValue() {
new Yaml(new DumperOptions(defaultFlowStyle: DumperOptions.FlowStyle.FLOW))
}
}
/** The path of the YAML log file. */
Path logPath
private boolean limitToCurrentThread
private Integer startingEvent
private Iterator all() {
logPath.withReader {
YAML.get().load(it).iterator()
}
}
@Override
Iterator iterator() {
Iterator result = all()
if (startingEvent != null) {
result = result.drop(startingEvent)
}
if (limitToCurrentThread) {
result = result.findAll { it.thread == Thread.currentThread().id }.iterator()
}
result
}
/**
* Limits the results to only the current Thread id. Note that per JDK the Thread id may be reused.
*/
Iterator findAllByThread() {
iterator().findAll { it.thread == Thread.currentThread().id }.iterator()
}
/**
* Return a new object limited to the current thread. Note that per JDK the Thread id may be reused.
*/
LoggingProducerOutput withThread() {
LoggingProducerOutput result = clone()
result.limitToCurrentThread = true
result
}
/**
* Return a new object excluding all current messages.
*/
LoggingProducerOutput withEmpty() {
LoggingProducerOutput result = clone()
result.startingEvent = all().size()
result
}
}
- {configs: {compression.type: snappy, value.serializer: org.apache.kafka.common.serialization.StringSerializer,
interceptor.LoggingProducerInterceptor.file: /var/folders/3v/05f6zqc164g*b55zrdlhk*z00000gn/T/producer3683756155677198489.log,
acks: '1', bootstrap.servers: 'localhost:32818', interceptor.classes: LoggingProducerInterceptor,
key.serializer: org.apache.kafka.common.serialization.StringSerializer, client.id: api,
linger.ms: '100'}}
- {thread: 10, key: 5af19764bfe52300144eea35, value: '{"_links":{"self":{"href":"http://169.254.169.254:8080/app/api/user/doe","hreflang":"en","type":"user"}},"id":"5af19764bfe52300144eea35","name":"John Doe"}'}
- {thread: 10, key: 5af19764bfe52300144eea35, value: '{"_links":{"self":{"href":"http://169.254.169.254:8080/app/api/user/doe","hreflang":"en","type":"user"}},"id":"5af19764bfe52300144eea35","seasons":"John Doe"}'}
Map producerTestConfig = [:]
if (Environment.current == Environment.TEST) {
Path producerLogPath = Files.createTempFile('producer', '.log')
loggingProducerOutput(LoggingProducerOutput) {
logPath = producerLogPath
}
producerTestConfig['interceptor.classes'] = LoggingProducerInterceptor.name
producerTestConfig["interceptor.${LoggingProducerInterceptor.simpleName}.file".toString()] = producerLogPath.toString()
}
domainEventProducer(KafkaProducer, commonKafkaConfig + producerTestConfig + [
'key.serializer' : 'org.apache.kafka.common.serialization.StringSerializer',
'value.serializer' : 'org.apache.kafka.common.serialization.StringSerializer',
'acks' : '1',
'linger.ms' : '100',
])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment