Last active
July 4, 2019 03:22
-
-
Save robie2011/1caa4772b60b5a6f993e6f98e792a380 to your computer and use it in GitHub Desktop.
KafkaStream: suppress-function doesn't work
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ch.rajakone.demo | |
import org.apache.kafka.clients.producer.KafkaProducer | |
import org.apache.kafka.clients.producer.ProducerRecord | |
import org.apache.kafka.common.serialization.Serdes | |
import org.apache.kafka.streams.KafkaStreams | |
import org.apache.kafka.streams.StreamsBuilder | |
import org.apache.kafka.streams.StreamsConfig | |
import org.apache.kafka.streams.kstream.Printed | |
import org.apache.kafka.streams.kstream.Suppressed | |
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded | |
import org.apache.kafka.streams.kstream.TimeWindows | |
import java.time.Duration | |
import java.util.* | |
import kotlin.math.roundToLong | |
val kafkaServer = "kafka:9091" | |
val inputTopic = "test-input-long" | |
object ExampleProducer { | |
private fun createProperties(): Properties { | |
val props = Properties() | |
props.put("bootstrap.servers", kafkaServer) | |
props.put("acks", "all") | |
props.put("retries", 0) | |
props.put("batch.size", 16384) | |
props.put("linger.ms", 1) | |
props.put("buffer.memory", 33554432) | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer") | |
return props | |
} | |
fun start() { | |
val producer = KafkaProducer<String, Long>(createProperties()) | |
Runtime.getRuntime().addShutdownHook(Thread(producer::close)) | |
while(true) { | |
val value = (Math.random() * 1000).roundToLong() | |
val rec = ProducerRecord(inputTopic, "random", value) | |
producer.send(rec) | |
Thread.sleep(1 * 1000) | |
} | |
} | |
} | |
object ExampleStreamProcessing { | |
private fun createProperties(): Properties { | |
val props = Properties() | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "example-long-consumer-001") | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer) | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().javaClass.name) | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Double().javaClass.name) | |
return props | |
} | |
fun start(){ | |
val builder = StreamsBuilder() | |
builder.stream<String,Double>(inputTopic) | |
.groupByKey() | |
.windowedBy(TimeWindows.of(Duration.ofSeconds(15))) | |
.count() | |
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working) | |
.toStream() | |
.print(Printed.toSysOut()) | |
val topology = builder.build() | |
val streams = KafkaStreams(topology, createProperties()) | |
streams.cleanUp() | |
streams.start() | |
Runtime.getRuntime().addShutdownHook(Thread(streams::close)) | |
} | |
} | |
object KafkaStreamDemoStarter { | |
@JvmStatic | |
fun main(args: Array<String>) { | |
Thread(ExampleProducer::start).start() | |
ExampleStreamProcessing.start() | |
readLine() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA CE.app/Contents/lib/idea_rt.jar=60594:/Applications/IntelliJ IDEA CE.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/lib/tools.jar:/Users/robert.rajakone/repos/2018_p7/kotlin-mse-project7/kafka-streams/out/production/classes:/Users/robert.rajakone/repos/2018_p7/kotlin-mse-project7/kafka-streams/out/production/resources:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-slf4j-impl/2.11.1/4b41b53a3a2d299ce381a69d165381ca19f62912/log4j-slf4j-impl-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.3.11/dd9bff00d6cfca58b6c1fe89be8e0678e35cf35f/kotlin-stdlib-jdk8-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-streams/2.1.0/ab1d9cf35cf0040a804266279faeaf082d3f66c3/kafka-streams-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/connect-json/2.1.0/6e2b621686935ee93e3f627477fcf7483769953d/connect-json-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/connect-api/2.1.0/589e4d3a3f90446fd97059e4cc13975f01d9a1d0/connect-api-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.1.0/34d9983705c953b97abb01e1cd04647f47272fe5/kafka-clients-2.1.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.module/jackson-module-kotlin/2.9.8/7ea1867daf999d6a3028362a0418da44b751bc13/jackson-module-kotlin-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-core/2.11.1/592a48674c926b01a9a747c7831bcd82a9e6d6e4/log4j-core-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-api/2.11.1/268f0fe4df3eefe052b57c87ec48517d64fb2a10/log4j-api-2.11.1.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.3.11/4839661cf6ce3c14b65ed7dcf5b9249b44ecca16/kotlin-stdlib-jdk7-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-reflect/1.3.10/dd02865be0351707554b16a896b766b2396cdafa/kotlin-reflect-1.3.10.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.3.11/4cbc5922a54376018307a731162ccaf3ef851a39/kotlin-stdlib-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.3.5-4/550b6393a007d0867c98611ca8cfbcf53f2eb991/zstd-jni-1.3.5-4.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.5.0/d36fb639f06aaa4f17307625f80e2e32f815672a/lz4-java-1.5.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.2/307b286efd119ad2c6d4291128bf110bddc68088/snappy-java-1.1.7.2.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.rocksdb/rocksdbjni/5.14.2/a6087318fab540ba0b4c6ff68475ffbedc0b3d10/rocksdbjni-5.14.2.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-databind/2.9.8/11283f21cc480aa86c4df7a0a3243ec508372ed2/jackson-databind-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-annotations/2.9.0/7c10d545325e3a6e72e06381afe469fd40eb701/jackson-annotations-2.9.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.3.11/d8b8e746e279f1c4f5e08bc14a96b82e6bb1de02/kotlin-stdlib-common-1.3.11.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-core/2.9.8/f5a654e4675769c716e5b387830d19b501ca191/jackson-core-2.9.8.jar:/Users/robert.rajakone/.gradle/caches/modules-2/files-2.1/javax.ws.rs/javax.ws.rs-api/2.1.1/d3466bc9321fe84f268a1adb3b90373fc14b0eb5/javax.ws.rs-api-2.1.1.jar ch.rajakone.demo.KafkaStreamDemoStarter | |
objc[36365]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java (0x10626c4c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x1062e64e0). One of the two will be used. Which one is undefined. | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - StreamsConfig values: | |
application.id = example-long-consumer-001 | |
application.server = | |
bootstrap.servers = [kafka:9091] | |
buffered.records.per.partition = 1000 | |
cache.max.bytes.buffering = 10485760 | |
client.id = | |
commit.interval.ms = 30000 | |
connections.max.idle.ms = 540000 | |
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndFailExceptionHandler | |
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde | |
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler | |
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp | |
default.value.serde = class org.apache.kafka.common.serialization.Serdes$DoubleSerde | |
max.task.idle.ms = 0 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
num.standby.replicas = 0 | |
num.stream.threads = 1 | |
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper | |
poll.ms = 100 | |
processing.guarantee = at_least_once | |
receive.buffer.bytes = 32768 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
replication.factor = 1 | |
request.timeout.ms = 40000 | |
retries = 0 | |
retry.backoff.ms = 100 | |
rocksdb.config.setter = null | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
state.cleanup.delay.ms = 600000 | |
state.dir = /tmp/kafka-streams | |
topology.optimization = none | |
upgrade.from = null | |
windowstore.changelog.additional.retention.ms = 86400000 | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ProducerConfig values: | |
acks = all | |
batch.size = 16384 | |
bootstrap.servers = [kafka:9091] | |
buffer.memory = 33554432 | |
client.dns.lookup = default | |
client.id = | |
compression.type = none | |
connections.max.idle.ms = 540000 | |
delivery.timeout.ms = 120000 | |
enable.idempotence = false | |
interceptor.classes = [] | |
key.serializer = class org.apache.kafka.common.serialization.StringSerializer | |
linger.ms = 1 | |
max.block.ms = 60000 | |
max.in.flight.requests.per.connection = 5 | |
max.request.size = 1048576 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner | |
receive.buffer.bytes = 32768 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 30000 | |
retries = 0 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
transaction.timeout.ms = 60000 | |
transactional.id = null | |
value.serializer = class org.apache.kafka.common.serialization.LongSerializer | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - AdminClientConfig values: | |
bootstrap.servers = [kafka:9091] | |
client.dns.lookup = default | |
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-admin | |
connections.max.idle.ms = 300000 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
receive.buffer.bytes = 65536 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 120000 | |
retries = 5 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0 | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0 | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:604 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating restore consumer client | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ConsumerConfig values: | |
auto.commit.interval.ms = 5000 | |
auto.offset.reset = none | |
bootstrap.servers = [kafka:9091] | |
check.crcs = true | |
client.dns.lookup = default | |
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer | |
connections.max.idle.ms = 540000 | |
default.api.timeout.ms = 60000 | |
enable.auto.commit = false | |
exclude.internal.topics = true | |
fetch.max.bytes = 52428800 | |
fetch.max.wait.ms = 500 | |
fetch.min.bytes = 1 | |
group.id = | |
heartbeat.interval.ms = 3000 | |
interceptor.classes = [] | |
internal.leave.group.on.close = false | |
isolation.level = read_uncommitted | |
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
max.partition.fetch.bytes = 1048576 | |
max.poll.interval.ms = 2147483647 | |
max.poll.records = 1000 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] | |
receive.buffer.bytes = 65536 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 30000 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
session.timeout.ms = 10000 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0 | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:614 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating shared producer client | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ProducerConfig values: | |
acks = 1 | |
batch.size = 16384 | |
bootstrap.servers = [kafka:9091] | |
buffer.memory = 33554432 | |
client.dns.lookup = default | |
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-producer | |
compression.type = none | |
connections.max.idle.ms = 540000 | |
delivery.timeout.ms = 120000 | |
enable.idempotence = false | |
interceptor.classes = [] | |
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer | |
linger.ms = 100 | |
max.block.ms = 60000 | |
max.in.flight.requests.per.connection = 5 | |
max.request.size = 1048576 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner | |
receive.buffer.bytes = 32768 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 30000 | |
retries = 2147483647 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
transaction.timeout.ms = 60000 | |
transactional.id = null | |
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0 | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:657 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Creating consumer client | |
13:07:03 INFO org.apache.kafka.common.config.AbstractConfig:279 - ConsumerConfig values: | |
auto.commit.interval.ms = 5000 | |
auto.offset.reset = earliest | |
bootstrap.servers = [kafka:9091] | |
check.crcs = true | |
client.dns.lookup = default | |
client.id = example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer | |
connections.max.idle.ms = 540000 | |
default.api.timeout.ms = 60000 | |
enable.auto.commit = false | |
exclude.internal.topics = true | |
fetch.max.bytes = 52428800 | |
fetch.max.wait.ms = 500 | |
fetch.min.bytes = 1 | |
group.id = example-long-consumer-001 | |
heartbeat.interval.ms = 3000 | |
interceptor.classes = [] | |
internal.leave.group.on.close = false | |
isolation.level = read_uncommitted | |
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
max.partition.fetch.bytes = 1048576 | |
max.poll.interval.ms = 2147483647 | |
max.poll.records = 1000 | |
metadata.max.age.ms = 300000 | |
metric.reporters = [] | |
metrics.num.samples = 2 | |
metrics.recording.level = INFO | |
metrics.sample.window.ms = 30000 | |
partition.assignment.strategy = [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] | |
receive.buffer.bytes = 65536 | |
reconnect.backoff.max.ms = 1000 | |
reconnect.backoff.ms = 50 | |
request.timeout.ms = 30000 | |
retry.backoff.ms = 100 | |
sasl.client.callback.handler.class = null | |
sasl.jaas.config = null | |
sasl.kerberos.kinit.cmd = /usr/bin/kinit | |
sasl.kerberos.min.time.before.relogin = 60000 | |
sasl.kerberos.service.name = null | |
sasl.kerberos.ticket.renew.jitter = 0.05 | |
sasl.kerberos.ticket.renew.window.factor = 0.8 | |
sasl.login.callback.handler.class = null | |
sasl.login.class = null | |
sasl.login.refresh.buffer.seconds = 300 | |
sasl.login.refresh.min.period.seconds = 60 | |
sasl.login.refresh.window.factor = 0.8 | |
sasl.login.refresh.window.jitter = 0.05 | |
sasl.mechanism = GSSAPI | |
security.protocol = PLAINTEXT | |
send.buffer.bytes = 131072 | |
session.timeout.ms = 10000 | |
ssl.cipher.suites = null | |
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] | |
ssl.endpoint.identification.algorithm = https | |
ssl.key.password = null | |
ssl.keymanager.algorithm = SunX509 | |
ssl.keystore.location = null | |
ssl.keystore.password = null | |
ssl.keystore.type = JKS | |
ssl.protocol = TLS | |
ssl.provider = null | |
ssl.secure.random.implementation = null | |
ssl.trustmanager.algorithm = PKIX | |
ssl.truststore.location = null | |
ssl.truststore.password = null | |
ssl.truststore.type = JKS | |
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer | |
13:07:03 WARN org.apache.kafka.common.config.AbstractConfig:287 - The configuration 'admin.retries' was supplied but isn't a known config. | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:109 - Kafka version : 2.1.0 | |
13:07:03 INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo:110 - Kafka commitId : eec43959745f444f | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StateDirectory:284 - stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:740 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Starting | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from CREATED to RUNNING | |
13:07:03 INFO org.apache.kafka.streams.KafkaStreams:800 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] Started Streams client | |
13:07:03 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w | |
13:07:03 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w | |
13:07:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler:654 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Discovered group coordinator kafka:9091 (id: 2147483646 rack: null) | |
13:07:03 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Revoking previously assigned partitions [] | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED | |
13:07:03 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from RUNNING to REBALANCING | |
13:07:03 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions | |
13:07:03 INFO org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener:324 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] partition revocation took 1 ms. | |
suspended active tasks: [] | |
suspended standby tasks: [] | |
13:07:03 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator:486 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] (Re-)joining group | |
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor:632 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer] Assigned tasks to clients as {cea43c93-117d-49e7-8635-2ef47eae9885=[activeTasks: ([0_0]) standbyTasks: ([]) assignedTasks: ([0_0]) prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. | |
13:07:06 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1:450 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Successfully joined group with generation 3 | |
13:07:06 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:289 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-consumer, groupId=example-long-consumer-001] Setting newly assigned partitions [test-input-long-0] | |
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED | |
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener:284 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] partition assignment took 12 ms. | |
current active tasks: [0_0] | |
current standby tasks: [] | |
previous active tasks: [] | |
13:07:06 INFO org.apache.kafka.clients.Metadata:285 - Cluster ID: H2BKYAjhTy-S8PgdUjVG3w | |
13:07:06 INFO org.apache.kafka.streams.processor.internals.StoreChangelogReader:224 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Restoring task 0_0's state store KSTREAM-AGGREGATE-STATE-STORE-0000000001 from beginning of the changelog example-long-consumer-001-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog-0 | |
13:07:06 INFO org.apache.kafka.clients.consumer.internals.Fetcher:583 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Resetting offset for partition example-long-consumer-001-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog-0 to offset 0. | |
13:07:06 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions | |
13:07:06 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions | |
13:07:06 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING | |
13:07:06 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from REBALANCING to RUNNING | |
13:07:33 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager:222 - task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001: | |
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String | |
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) [kafka-streams-2.1.0.jar:?] | |
13:07:33 ERROR org.apache.kafka.streams.processor.internals.AssignedTasks:359 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Failed to commit stream task 0_0 due to the following error: | |
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001 | |
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) [kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) [kafka-streams-2.1.0.jar:?] | |
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String | |
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177) ~[kafka-streams-2.1.0.jar:?] | |
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217) ~[kafka-streams-2.1.0.jar:?] | |
... 10 more | |
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN | |
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:1178 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Shutting down | |
13:07:33 INFO org.apache.kafka.clients.consumer.KafkaConsumer:1031 - [Consumer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions | |
13:07:33 INFO org.apache.kafka.clients.producer.KafkaProducer:1136 - [Producer clientId=example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. | |
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:207 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD | |
13:07:33 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from RUNNING to ERROR | |
13:07:33 WARN org.apache.kafka.streams.KafkaStreams$StreamStateListener:418 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] All stream threads have died. The instance will be in error state and should be closed. | |
13:07:33 INFO org.apache.kafka.streams.processor.internals.StreamThread:1198 - stream-thread [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1] Shutdown complete | |
Exception in thread "example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001 | |
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220) | |
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204) | |
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491) | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) | |
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431) | |
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346) | |
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405) | |
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029) | |
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883) | |
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777) | |
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747) | |
Caused by: java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String | |
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86) | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78) | |
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37) | |
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115) | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146) | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129) | |
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93) | |
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:114) | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36) | |
at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:98) | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) | |
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) | |
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) | |
at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:130) | |
at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:177) | |
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:217) | |
... 10 more | |
13:07:37 INFO org.apache.kafka.clients.producer.KafkaProducer:1136 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. | |
13:07:37 INFO org.apache.kafka.streams.KafkaStreams:257 - stream-client [example-long-consumer-001-cea43c93-117d-49e7-8635-2ef47eae9885] State transition from ERROR to PENDING_SHUTDOWN | |
Process finished with exit code 130 (interrupted by signal 2: SIGINT) |
I am also facing the same issue
Not sure. This test seems to do the same thing and works: https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java#L359-L366
FYI : I raised this https://issues.apache.org/jira/browse/KAFKA-8317
Faced with the same issue. Solved when i explicitly defined storage for reduce DSL through Materialize.
This not works:
.reduce( (aggValue, newValue) -> aggValue + newValue )
.suppress( Suppressed.untilWindowCloses(BufferConfig.unbounded()) );
, but this works fine:
.reduce( (aggValue, newValue) -> aggValue + newValue,
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("streams-cell-tp-winagg")
.withKeySerde(Serdes.String())
)
.suppress( Suppressed.untilWindowCloses(BufferConfig.unbounded()) );
I faced the issue, but I solve this problem to add grace(0) after the fixed window.
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofMillis(5000L)).grace(Duration.ofMillis(0)))
.aggregate()
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
as I test the code, I find out that the calculation is triggered by stream event not a scheduler, it will calculate the date when the next window event arrives
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Did you solve this?