Skip to content

Instantly share code, notes, and snippets.

@robie2011
Last active July 4, 2019 03:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robie2011/1caa4772b60b5a6f993e6f98e792a380 to your computer and use it in GitHub Desktop.
Save robie2011/1caa4772b60b5a6f993e6f98e792a380 to your computer and use it in GitHub Desktop.
KafkaStream: suppress-function doesn't work
package ch.rajakone.demo
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.streams.kstream.Printed
import org.apache.kafka.streams.kstream.Suppressed
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded
import org.apache.kafka.streams.kstream.TimeWindows
import java.time.Duration
import java.util.*
import kotlin.math.roundToLong
val kafkaServer = "kafka:9091"
val inputTopic = "test-input-long"
object ExampleProducer {
private fun createProperties(): Properties {
val props = Properties()
props.put("bootstrap.servers", kafkaServer)
props.put("acks", "all")
props.put("retries", 0)
props.put("batch.size", 16384)
props.put("linger.ms", 1)
props.put("buffer.memory", 33554432)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer")
return props
}
fun start() {
val producer = KafkaProducer<String, Long>(createProperties())
Runtime.getRuntime().addShutdownHook(Thread(producer::close))
while(true) {
val value = (Math.random() * 1000).roundToLong()
val rec = ProducerRecord(inputTopic, "random", value)
producer.send(rec)
Thread.sleep(1 * 1000)
}
}
}
object ExampleStreamProcessing {
private fun createProperties(): Properties {
val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "example-long-consumer-001")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass.name)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().javaClass.name)
return props
}
fun start(){
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
val topology = builder.build()
val streams = KafkaStreams(topology, createProperties())
streams.cleanUp()
streams.start()
Runtime.getRuntime().addShutdownHook(Thread(streams::close))
}
}
object KafkaStreamDemoStarter {
@JvmStatic
fun main(args: Array<String>) {
Thread(ExampleProducer::start).start()
ExampleStreamProcessing.start()
readLine()
}
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=60594:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/tools.jar:/Users/robert.rajakone/repos/2018_p7/kotlin-mse-project7/kafka-streams/out/production/classes:/Users/robert.rajakone/repos/2018_p7/kotlin-mse-project7/kafka-streams/out/production/resources:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-slf4j-impl/2.11.1/4b41b53a3a2d299ce381a69d165381ca19f62912/log4j-slf4j-impl-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.3.11/dd9bff00d6cfca58b6c1fe89be8e0678e35cf35f/kotlin-stdlib-jdk8-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-streams/2.1.0/ab1d9cf35cf0040a804266279faeaf082d3f66c3/kafka-streams-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/connect-json/2.1.0/6e2b621686935ee93e3f627477fcf7483769953d/connect-json-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/connect-api/2.1.0/589e4d3a3f90446fd97059e4cc13975f01d9a1d0/connect-api-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.1.0/34d9983705c953b97abb01e1cd04647f47272fe5/kafka-clients-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.module/jackson-module-kotlin/2.9.8/7ea1867daf999d6a3028362a0418da44b751bc13/jackson-module-kotlin-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-core/2.11.1/592a48674c926b01a9a747c7831bcd82a9e6d6e4/log4j-core-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-api/2.11.1/268f0fe4df3eefe052b57c87ec48517d64fb2a10/log4j-api-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.3.11/4839661cf6ce3c14b65ed7dcf5b9249b44ecca16/kotlin-stdlib-jdk7-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-reflect/1.3.10/dd02865be0351707554b16a896b766b2396cdafa/kotlin-reflect-1.3.10.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.3.11/4cbc5922a54376018307a731162ccaf3ef851a39/kotlin-stdlib-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.3.5-4/550b6393a007d0867c98611ca8cfbcf53f2eb991/zstd-jni-1.3.5-4.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.5.0/d36fb639f06aaa4f17307625f80e2e32f815672a/lz4-java-1.5.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.2/307b286efd119ad2c6d4291128bf110bddc68088/snappy-java-1.1.7.2.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.rocksdb/rocksdbjni/5.14.2/a6087318fab540ba0b4c6ff68475ffbedc0b3d10/rocksdbjni-5.14.2.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-databind/2.9.8/11283f21cc480aa86c4df7a0a3243ec508372ed2/jackson-databind-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-annotations/2.9.0/7c10d545325e3a6e72e06381afe469fd40eb701/jackson-annotations-2.9.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.3.11/d8b8e746e279f1c4f5e08bc14a96b82e6bb1de02/kotlin-stdlib-common-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-core/2.9.8/f5a654e4675769c716e5b387830d19b501ca191/jackson-core-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/javax.ws.rs/javax.ws.rs-api/2.1.1/d3466bc9321fe84f268a1adb3b90373fc14b0eb5/javax.ws.rs-api-2.1.1.jar ch.rajakone.demo.KafkaStreamDemoStarter
objc[36365]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java (0x10626c4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1062e64e0). One of the two will be used. Which one is undefined.
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - StreamsConfig values:
application.id = example-long-consumer-001
application.server =
bootstrap.servers = [kafka:9091]
buffered.records.per.partition = 1000
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$DoubleSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /tmp/kafka-streams
topology.optimization = none
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ProducerConfig values:
acks = all
batch.size = 16384
bootstrap.servers = [kafka:9091]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 1
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.LongSerializer
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - AdminClientConfig values:
bootstrap.servers = [kafka:9091]
client.dns.lookup = default
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-admin
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:604 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating restore consumer client
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [kafka:9091]
check.crcs = true
client.dns.lookup = default
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id =
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:614 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating shared producer client
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [kafka:9091]
buffer.memory = 33554432
client.dns.lookup = default
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-producer
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 100
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:657 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating consumer client
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9091]
check.crcs = true
client.dns.lookup = default
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = example-long-consumer-001
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 2147483647
max.poll.records = 1000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
13:07:03 WARN org.apache.kafka.common.config.AbstractConfig:287 - The configuration 'admin.retries' was supplied but isn't a known config.
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f
13:07:03 INFO org.apache.kafka.streams.processor.internals.StateDirectory:284 - stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:740 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Starting
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from CREATED to RUNNING
13:07:03 INFO org.apache.kafka.streams.KafkaStreams:800 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] Started Streams client
13:07:03 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w
13:07:03 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w
13:07:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler:654 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Discovered group coordinator kafka:9091 (id: 2147483646 rack: null)
13:07:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Revoking previously assigned partitions []
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
13:07:03 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from RUNNING to REBALANCING
13:07:03 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener:324 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] partition revocation took 1 ms.
suspended active tasks: []
suspended standby tasks: []
13:07:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator:486 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] (Re-)joining group
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor:632 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer] Assigned tasks to clients as {cea43c93-117d-49e7-8635-2ef47eae9885=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
13:07:06 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1:450 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Successfully joined group with generation 3
13:07:06 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:289 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Setting newly assigned partitions [test-input-long-0]
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener:284 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] partition assignment took 12 ms.
current active tasks: [0_0]
current standby tasks: []
previous active tasks: []
13:07:06 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w
13:07:06 INFO org.apache.kafka.streams.processor.internals.StoreChangelogReader:224 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Restoring task 0_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000001 from beginning of the changelog example-long-consumer-001-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog-0
13:07:06 INFO org.apache.kafka.clients.consumer.internals.Fetcher:583 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Resetting offset for partition example-long-consumer-001-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog-0 to offset 0.
13:07:06 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
13:07:06 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
13:07:06 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from REBALANCING to RUNNING
13:07:33 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager:222 - task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) [kafka-streams-2.1.0.jar:?]
13:07:33 ERROR org.apache.kafka.streams.processor.internals.AssignedTasks:359 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Failed to commit stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) [kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) [kafka-streams-2.1.0.jar:?]
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177) ~[kafka-streams-2.1.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217) ~[kafka-streams-2.1.0.jar:?]
... 10 more
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:1178 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Shutting down
13:07:33 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
13:07:33 INFO org.apache.kafka.clients.producer.KafkaProducer:1136 - [Producer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
13:07:33 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from RUNNING to ERROR
13:07:33 WARN org.apache.kafka.streams.KafkaStreams$StreamStateListener:418 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] All stream threads have died. The instance will be in error state and should be closed.
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:1198 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Shutdown complete
Exception in thread "example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114)
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36)
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217)
... 10 more
13:07:37 INFO org.apache.kafka.clients.producer.KafkaProducer:1136 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
13:07:37 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from ERROR to PENDING_SHUTDOWN
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
@the4thamigo-uk
Copy link

Did you solve this?

@marwakrouma
Copy link

I am also facing the same issue

@mjsax
Copy link

mjsax commented May 2, 2019

@the4thamigo-uk
Copy link

@xeronm
Copy link

xeronm commented Jul 1, 2019

Faced with the same issue. Solved when i explicitly defined storage for reduce DSL through Materialize.
This not works:

            .reduce( (aggValue, newValue) -> aggValue + newValue )
            .suppress( Suppressed.untilWindowCloses(BufferConfig.unbounded()) );

, but this works fine:

            .reduce( (aggValue, newValue) -> aggValue + newValue,
                     Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("streams-cell-tp-winagg")
                     .withKeySerde(Serdes.String())
                   )
            .suppress( Suppressed.untilWindowCloses(BufferConfig.unbounded()) );

@iorzt
Copy link

iorzt commented Jul 4, 2019

I faced the issue, but I solve this problem to add grace(0) after the fixed window.

.groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(Duration.ofMillis(5000L)).grace(Duration.ofMillis(0)))
            .aggregate()
            .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))

as I test the code, I find out that the calculation is triggered by stream event not a scheduler, it will calculate the date when the next window event arrives

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment