Skip to content

Instantly share code, notes, and snippets.

@danielcaro
Last active November 26, 2020 19:02
Show Gist options
  • Save danielcaro/9144b882428ba9b37f53885c1586a1da to your computer and use it in GitHub Desktop.
Save danielcaro/9144b882428ba9b37f53885c1586a1da to your computer and use it in GitHub Desktop.
package com.aim.atlas.utils.data.streams
import com.aim.atlas.core.channels.Channels.Companion.InstanceValidity
import com.aim.atlas.core.entity.resolver.validity.events.InstanceValidityCurrentStatusEvent
import com.aim.atlas.core.entity.tasks.ActivityConfiguration
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.kstream.KStream
import org.slf4j.LoggerFactory
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.annotation.Input
import org.springframework.cloud.stream.annotation.Output
import org.springframework.cloud.stream.annotation.StreamListener
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.handler.annotation.SendTo
import java.util.*
@Configuration
@EnableBinding(StreamForActivityConfig.StreamForActivityConfigChannels::class)
open class StreamForActivityConfig {
companion object {
private const val topic_resource_validity_current_status_event = InstanceValidity.INSTANCE_VALIDITY_CURRENT_STATUS_EVENT
private const val publish_activity_config_aggregate_event_resource_validity = "publish_activity_config_aggregates_resource_validity"
}
private val logger = LoggerFactory.getLogger(this::class.java)
@Bean
open fun kafkaStreamTopology(): Topology? {
val streamsBuilder = StreamsBuilder()
streamsBuilder.stream<Any, InstanceValidityCurrentStatusEvent>(
topic_resource_validity_current_status_event
).filter { key, value ->
value.ownerResource.resourceType == ActivityConfiguration::class.java.simpleName
&& value.ownerResource.resourceUuid != null
}.map { key, value -> KeyValue(key, value) }
.to(publish_activity_config_aggregate_event_resource_validity)
return streamsBuilder.build()
}
// https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder
// https://stackoverflow.com/questions/51454931/unable-to-run-just-this-simple-stream-serde-configuration-needed/51521687
// https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_apache_kafka_streams_binder.html#_multiple_input_bindings
@StreamListener
@SendTo("publish_activity_config_aggregate")
fun publishActivityConfigAggregator(
@Input(publish_activity_config_aggregate_event_resource_validity)
resourceValidityStream: KStream<*, InstanceValidityCurrentStatusEvent>
): KStream<*, InstanceValidityCurrentStatusEvent> {
return resourceValidityStream
}
interface StreamForActivityConfigChannels{
@Input(publish_activity_config_aggregate_event_resource_validity)
fun channelWithResourceValidityForActivityConfig(): KStream<*, InstanceValidityCurrentStatusEvent>
@Output("publish_activity_config_aggregate")
fun dispatch(): KStream<*, InstanceValidityCurrentStatusEvent>
}
}
/****
compile('org.springframework.cloud:spring-cloud-starter-stream-kafka')
compile('org.springframework.cloud:spring-cloud-stream-binder-kafka-streams')
cloud:
stream:
kafka:
streams:
properties:
binder:
timeWindow.length: 500
brokers: ${KAFKA:kafka-service}:${KAFKA_PORT:9092}
serdeError: logAndContinue
auto-add-partitions: true
auto-create-topics: true
configuration:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
value.serde: org.apache.kafka.common.serialization.Serdes$ByteArraySerde
bindings:
publishActivityConfigAggregator:
applicationId: publishActivityConfigAggregator
topic_resource_validity_current_status_event:
group: publish-activity-config-aggregator
***/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment