Skip to content

Instantly share code, notes, and snippets.

@gel-hidden
Created October 26, 2022 13:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gel-hidden/0a8627cf93f5396d6b73c2a6e71aad3e to your computer and use it in GitHub Desktop.
Save gel-hidden/0a8627cf93f5396d6b73c2a6e71aad3e to your computer and use it in GitHub Desktop.
SpringIntegration
package com.example.demo
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.*
import org.springframework.integration.channel.QueueChannel
import org.springframework.integration.dsl.AggregatorSpec
import org.springframework.integration.dsl.IntegrationFlowAdapter
import org.springframework.integration.dsl.IntegrationFlowDefinition
import org.springframework.integration.dsl.MessageChannels
import org.springframework.integration.store.MessageGroup
import org.springframework.messaging.Message
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.support.MessageBuilder
import org.springframework.stereotype.Component
@SpringBootApplication
class DemoApplication
fun main(args: Array<String>) {
runApplication<DemoApplication>(*args)
}
const val CORRELATION_ID_HEADER = "correlation_id"
const val DESTINATION_CHANNEL_HEADER = "dest_channel"
const val MODEL_ID_HEADER = "model_id"
data class MyModel(val id: Long, val data: String?)
@Component
class UpdateShowLocationFlow : IntegrationFlowAdapter() {
@Filter
fun filter(model: MyModel): Boolean = model.data != null
@Transformer
fun transform(message: Message<MyModel>): Message<String> {
val model = message.payload
return MessageBuilder
.withPayload(model.data!!)
.setHeader(CORRELATION_ID_HEADER, message.headers["model_parent_id"])
.setHeader(MODEL_ID_HEADER, model.id)
.setHeader(DESTINATION_CHANNEL_HEADER, "finalOutputActivator")
.build()
}
override fun buildFlow(): IntegrationFlowDefinition<*> = from("updateModels")
.split { spec ->
spec.poller { pf ->
pf.fixedRate(500).maxMessagesPerPoll(1)
}
}
.filter(this)
.transform(this)
.channel("updateModelData")
}
@Component
class UpdateLocationFlow : IntegrationFlowAdapter() {
val logger: Logger = LoggerFactory.getLogger(this::class.java)
@ServiceActivator
fun handle(message: Message<String>): String {
logger.info("Doing some work for model with id ${message.headers[MODEL_ID_HEADER]}")
Thread.sleep(300)
logger.info("Completed some work for model with id ${message.headers[MODEL_ID_HEADER]}")
return "Some new data"
}
private fun outputProcessor(messageGroup: MessageGroup): Message<List<String>> {
return MessageBuilder
.withPayload(messageGroup.messages.map { it.payload.toString() })
.build()
}
private fun aggregate(spec: AggregatorSpec): AggregatorSpec {
return spec.correlationStrategy { cs -> cs.headers[CORRELATION_ID_HEADER] }
.releaseStrategy { rs -> rs.size() > 100; }
.groupTimeout(1000)
.sendPartialResultOnExpiry(true)
.expireGroupsUponTimeout(false)
.outputProcessor { outputProcessor(it) }
}
override fun buildFlow(): IntegrationFlowDefinition<*> = from("updateModelData")
.handle(this, "handle") { spec ->
spec.poller { pf ->
pf.fixedDelay(10000)
.maxMessagesPerPoll(1)
}
}
.aggregate { aggregate(it) }
.route("headers['${DESTINATION_CHANNEL_HEADER}']")
}
@MessagingGateway
interface MyGateway {
@Gateway(requestChannel = "updateModels")
fun updateModels(models: List<MyModel>, @Header(value = "model_parent_id") parentId: Int)
}
@Configuration
class ChannelConfiguration {
@Bean
fun updateModels(): QueueChannel {
return MessageChannels.queue().get()
}
@Bean
fun updateModelData(): QueueChannel {
return MessageChannels.queue().get()
}
@Bean
fun finalOutputActivator(): QueueChannel {
return MessageChannels.queue().get()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment