Created
October 26, 2022 13:13
-
-
Save gel-hidden/0a8627cf93f5396d6b73c2a6e71aad3e to your computer and use it in GitHub Desktop.
SpringIntegration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demo | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import org.springframework.boot.autoconfigure.SpringBootApplication | |
import org.springframework.boot.runApplication | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import org.springframework.integration.annotation.* | |
import org.springframework.integration.channel.QueueChannel | |
import org.springframework.integration.dsl.AggregatorSpec | |
import org.springframework.integration.dsl.IntegrationFlowAdapter | |
import org.springframework.integration.dsl.IntegrationFlowDefinition | |
import org.springframework.integration.dsl.MessageChannels | |
import org.springframework.integration.store.MessageGroup | |
import org.springframework.messaging.Message | |
import org.springframework.messaging.handler.annotation.Header | |
import org.springframework.messaging.support.MessageBuilder | |
import org.springframework.stereotype.Component | |
@SpringBootApplication | |
class DemoApplication | |
fun main(args: Array<String>) { | |
runApplication<DemoApplication>(*args) | |
} | |
const val CORRELATION_ID_HEADER = "correlation_id" | |
const val DESTINATION_CHANNEL_HEADER = "dest_channel" | |
const val MODEL_ID_HEADER = "model_id" | |
data class MyModel(val id: Long, val data: String?) | |
@Component | |
class UpdateShowLocationFlow : IntegrationFlowAdapter() { | |
@Filter | |
fun filter(model: MyModel): Boolean = model.data != null | |
@Transformer | |
fun transform(message: Message<MyModel>): Message<String> { | |
val model = message.payload | |
return MessageBuilder | |
.withPayload(model.data!!) | |
.setHeader(CORRELATION_ID_HEADER, message.headers["model_parent_id"]) | |
.setHeader(MODEL_ID_HEADER, model.id) | |
.setHeader(DESTINATION_CHANNEL_HEADER, "finalOutputActivator") | |
.build() | |
} | |
override fun buildFlow(): IntegrationFlowDefinition<*> = from("updateModels") | |
.split { spec -> | |
spec.poller { pf -> | |
pf.fixedRate(500).maxMessagesPerPoll(1) | |
} | |
} | |
.filter(this) | |
.transform(this) | |
.channel("updateModelData") | |
} | |
@Component | |
class UpdateLocationFlow : IntegrationFlowAdapter() { | |
val logger: Logger = LoggerFactory.getLogger(this::class.java) | |
@ServiceActivator | |
fun handle(message: Message<String>): String { | |
logger.info("Doing some work for model with id ${message.headers[MODEL_ID_HEADER]}") | |
Thread.sleep(300) | |
logger.info("Completed some work for model with id ${message.headers[MODEL_ID_HEADER]}") | |
return "Some new data" | |
} | |
private fun outputProcessor(messageGroup: MessageGroup): Message<List<String>> { | |
return MessageBuilder | |
.withPayload(messageGroup.messages.map { it.payload.toString() }) | |
.build() | |
} | |
private fun aggregate(spec: AggregatorSpec): AggregatorSpec { | |
return spec.correlationStrategy { cs -> cs.headers[CORRELATION_ID_HEADER] } | |
.releaseStrategy { rs -> rs.size() > 100; } | |
.groupTimeout(1000) | |
.sendPartialResultOnExpiry(true) | |
.expireGroupsUponTimeout(false) | |
.outputProcessor { outputProcessor(it) } | |
} | |
override fun buildFlow(): IntegrationFlowDefinition<*> = from("updateModelData") | |
.handle(this, "handle") { spec -> | |
spec.poller { pf -> | |
pf.fixedDelay(10000) | |
.maxMessagesPerPoll(1) | |
} | |
} | |
.aggregate { aggregate(it) } | |
.route("headers['${DESTINATION_CHANNEL_HEADER}']") | |
} | |
@MessagingGateway | |
interface MyGateway { | |
@Gateway(requestChannel = "updateModels") | |
fun updateModels(models: List<MyModel>, @Header(value = "model_parent_id") parentId: Int) | |
} | |
@Configuration | |
class ChannelConfiguration { | |
@Bean | |
fun updateModels(): QueueChannel { | |
return MessageChannels.queue().get() | |
} | |
@Bean | |
fun updateModelData(): QueueChannel { | |
return MessageChannels.queue().get() | |
} | |
@Bean | |
fun finalOutputActivator(): QueueChannel { | |
return MessageChannels.queue().get() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment