Last active
July 2, 2021 12:04
-
-
Save fragaLY/f0a9a235c3e924b90dc83de5ec964271 to your computer and use it in GitHub Desktop.
Spring Boot Kafka EOS Support [ for 'prod' profile - config server in use ]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring: | |
cloud: | |
config: | |
enabled: false | |
thymeleaf: | |
enabled: false | |
application: | |
name: consumer-service | |
main: | |
lazy-initialization: false | |
web-application-type: servlet | |
banner-mode: off | |
lifecycle: | |
timeout-per-shutdown-phase: 60s | |
jackson: | |
time-zone: UTC | |
locale: en_US | |
datasource: | |
driverClassName: org.postgresql.Driver | |
url: "jdbc:postgresql://localhost:5432/notification-dev" | |
username: "user" | |
password: "P@55w0rd" | |
hikari: | |
minimumIdle: 1 | |
maximumPoolSize: 3 | |
autoCommit: false | |
connectionTestQuery: SELECT 1 | |
jpa: | |
database-platform: org.hibernate.dialect.PostgreSQL95Dialect | |
properties: | |
hibernate: | |
default_schema: "notification" | |
flyway: | |
enabled: true | |
locations: "classpath:db/migration" | |
url: "jdbc:postgresql://localhost:5432/notification-dev" | |
username: "user" | |
password: "P@55w0rd" | |
schemas: "notification" | |
zipkin: | |
kafka: | |
topic: zipkin | |
sender: | |
type: KAFKA | |
kafka: | |
bootstrap-servers: http://localhost:9092, http://localhost:9093, http://localhost:9094 | |
consumer: | |
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer | |
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer | |
group-id: notification-events-listener-group | |
enable-auto-commit: false | |
auto-offset-reset: earliest | |
properties: | |
isolation.level: read_committed | |
template: | |
default-topic: notification-event | |
listener: | |
ack-mode: manual | |
server: | |
port: 8084 | |
shutdown: graceful | |
servlet: | |
session: | |
timeout: 60m | |
cookie: | |
http-only: true | |
application-display-name: kafka-consumer | |
compression: | |
enabled: true | |
mime-types: "text/html, text/xml, text/plain, text/css, text/javascript, application/javascript, application/json, application/xml, image/jpeg, image/png, application/octet-stream" | |
error: | |
whitelabel: | |
enabled: false | |
logging: | |
level: | |
root: info | |
org: | |
springframework: | |
boot: info | |
kafka: info | |
--- | |
spring: | |
application: | |
name: consumer-service | |
main: | |
banner-mode: off | |
cloud: | |
config: | |
enabled: true | |
config: | |
activate: | |
on-profile: prod | |
import: configserver:http://config-server-prod:8088?fail-fast=true&max-attempts=3&max-interval=1500&multiplier=1.2&initial-interval=1100 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring: | |
application: | |
name: producer-service | |
cloud: | |
config: | |
enabled: false | |
main: | |
lazy-initialization: false | |
web-application-type: servlet | |
banner-mode: off | |
jackson: | |
time-zone: UTC | |
locale: en_US | |
mvc: | |
format: | |
date: iso | |
date-time: iso | |
lifecycle: | |
timeout-per-shutdown-phase: 60s | |
zipkin: | |
kafka: | |
topic: zipkin | |
sender: | |
type: KAFKA | |
kafka: | |
producer: | |
bootstrap-servers: http://localhost:9092, http://localhost:9093, http://localhost:9094 | |
key-serializer: org.apache.kafka.common.serialization.LongSerializer | |
value-serializer: org.apache.kafka.common.serialization.StringSerializer | |
acks: all | |
retries: 5 | |
transaction-id-prefix: kafka-producer | |
properties: | |
enable.idempotence: true | |
compression.type: none | |
max.in.flight.requests.per.connection: 1 | |
auto.create.topics.enable: false | |
template: | |
default-topic: notification-event | |
properties: | |
min.insync.replicas: 2 | |
listener: | |
ack-mode: manual | |
missing-topics-fatal: false | |
server: | |
port: 8083 | |
shutdown: graceful | |
servlet: | |
session: | |
timeout: 60m | |
cookie: | |
http-only: true | |
application-display-name: kafka-producer | |
compression: | |
enabled: true | |
mime-types: "text/html, text/xml, text/plain, text/css, text/javascript, application/javascript, application/json, application/xml, image/jpeg, image/png, application/octet-stream" | |
error: | |
whitelabel: | |
enabled: false | |
logging: | |
level: | |
root: info | |
org: | |
springframework: | |
boot: info | |
kafka: info | |
--- | |
spring: | |
application: | |
name: producer-service | |
main: | |
banner-mode: off | |
cloud: | |
config: | |
enabled: true | |
config: | |
activate: | |
on-profile: prod | |
import: configserver:http://config-server-prod:8088?fail-fast=true&max-attempts=3&max-interval=1500&multiplier=1.2&initial-interval=1100 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile | |
import org.jetbrains.kotlin.util.parseSpaceSeparatedArgs | |
plugins { | |
application | |
id("org.springframework.boot") version "2.5.2" | |
id("io.spring.dependency-management") version "1.0.11.RELEASE" | |
kotlin("jvm") version "1.5.20" | |
kotlin("plugin.spring") version "1.5.20" | |
kotlin("plugin.jpa") version "1.5.20" | |
id("com.google.cloud.tools.jib") version "3.1.1" | |
id("org.flywaydb.flyway") version "7.10.0" | |
} | |
springBoot { | |
buildInfo() | |
} | |
group = "by.vk" | |
version = "0.0.1" | |
java.sourceCompatibility = JavaVersion.VERSION_11 | |
application { | |
mainClass.set("com.kafka.consumer.demo.NotificationConsumerKt") | |
applicationName = "kafka-consumer" | |
} | |
repositories { | |
mavenCentral() | |
} | |
extra["springCloudVersion"] = "2020.0.3" | |
configurations.all { | |
exclude(group = "org.springframework.boot", module = "spring-boot-starter-tomcat") | |
exclude(group = "org.junit.vintage", module = "junit-vintage-engine") | |
} | |
dependencies { | |
implementation("org.springframework.boot:spring-boot-starter-data-jpa") | |
implementation("org.springframework.boot:spring-boot-starter-web") | |
implementation("org.springframework.boot:spring-boot-starter-undertow") | |
implementation("org.springframework.cloud:spring-cloud-config-client") | |
implementation("com.fasterxml.jackson.module:jackson-module-kotlin") | |
implementation("org.springframework.boot:spring-boot-starter-validation") | |
implementation("org.springframework.cloud:spring-cloud-starter-sleuth") | |
implementation("org.springframework.cloud:spring-cloud-sleuth-zipkin") | |
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka") | |
implementation("org.jetbrains.kotlin:kotlin-reflect") | |
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") | |
implementation("org.springframework.kafka:spring-kafka") | |
implementation("org.postgresql:postgresql") | |
implementation("org.flywaydb:flyway-core") | |
implementation("ch.qos.logback.contrib:logback-json-classic:0.1.5") | |
implementation("ch.qos.logback.contrib:logback-jackson:0.1.5") | |
testImplementation("org.springframework.boot:spring-boot-starter-test") | |
testImplementation("org.springframework.kafka:spring-kafka-test") | |
testImplementation("org.testcontainers:postgresql:1.14.+") | |
testImplementation("org.testcontainers:junit-jupiter:1.14.+") | |
testImplementation("com.h2database:h2") | |
} | |
dependencyManagement { | |
imports { | |
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}") | |
} | |
} | |
tasks.withType<Test> { | |
useJUnitPlatform() | |
} | |
tasks.withType<KotlinCompile> { | |
kotlinOptions { | |
freeCompilerArgs = listOf("-Xjsr305=strict") | |
jvmTarget = "11" | |
} | |
} | |
kotlin { | |
sourceSets["test"].apply { | |
kotlin.srcDirs("src/test/kotlin/unit", "src/test/kotlin/integration") | |
} | |
} | |
object DockerProps { | |
const val BASE_IMAGE = "gcr.io/distroless/java:11" | |
const val APP_PORT = "8084" | |
const val DEBUG_PORT = "5084" | |
const val JMX_PORT = "38084" | |
} | |
object JVMProps { | |
const val XMX = "512m" | |
const val XMS = "128m" | |
const val MAX_METASPACE_SIZE = "128m" | |
const val MAX_DIRECT_MEMORY_SIZE = "256m" | |
const val HEAPDUMP_PATH = "/opt/tmp/heapdump.bin" | |
} | |
jib { | |
from { | |
image = DockerProps.BASE_IMAGE | |
} | |
container { | |
jvmFlags = parseSpaceSeparatedArgs("-noverify -XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -XX:InitialRAMPercentage=50.0 -XX:+OptimizeStringConcat -XX:+UseStringDeduplication -XX:+ExitOnOutOfMemoryError -XX:+AlwaysActAsServerClassMachine -Xmx${JVMProps.XMX} -Xms${JVMProps.XMS} -XX:MaxMetaspaceSize=${JVMProps.MAX_METASPACE_SIZE} -XX:MaxDirectMemorySize=${JVMProps.MAX_DIRECT_MEMORY_SIZE} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${JVMProps.HEAPDUMP_PATH} -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${DockerProps.JMX_PORT} -Dcom.sun.management.jmxremote.rmi.port=${DockerProps.JMX_PORT} -Dspring.profiles.active=prod") | |
ports = listOf(DockerProps.APP_PORT, DockerProps.DEBUG_PORT, DockerProps.JMX_PORT) | |
labels.set(mapOf("maintainer" to "Vadzim Kavalkou <vadzim.kavalkou@gmail.com>", | |
"app-name" to application.applicationName, | |
"service-version" to version.toString())) | |
creationTime = "USE_CURRENT_TIMESTAMP" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
class Consumer(private val mapper: ObjectMapper, private val notificationService: NotificationService) { | |
private val logger = LoggerFactory.getLogger(javaClass) | |
@KafkaListener(groupId = "\${spring.kafka.consumer.group-id}", topics = ["notification-event"]) | |
fun onMessage(@Payload payload: String, meta: ConsumerRecordMetadata, acknowledgment: Acknowledgment) { | |
val event = mapper.readValue(payload, NotificationEvent::class.java) | |
val notification = when (event.type) { | |
EventType.CREATE_NOTIFICATION -> notificationService.create(event.notification) | |
EventType.UPDATE_NOTIFICATION -> notificationService.update(event.notification) | |
} | |
acknowledgment.acknowledge() | |
.run { logger.info("[CONSUMER] Notification [${notification}] from partition ${meta.partition()} with offset ${meta.offset()} is processed") } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3.7' | |
services: | |
config-server-prod: | |
image: fragaly/config-server | |
restart: on-failure | |
postgres-prod: | |
image: postgres:13.3-alpine | |
restart: on-failure | |
environment: | |
- POSTGRES_USER=user | |
- POSTGRES_PASSWORD=P@55w0rd | |
- POSTGRES_DB=notification-dev | |
zookeeper-prod: | |
image: confluentinc/cp-zookeeper:6.2.0 | |
healthcheck: | |
test: echo stat | nc localhost 2181 | |
interval: 2s | |
timeout: 2s | |
retries: 15 | |
ports: | |
- 2181:2181 | |
environment: | |
ZOOKEEPER_SERVER_ID: 1 | |
ZOOKEEPER_CLIENT_PORT: 2181 | |
ZOOKEEPER_TICK_TIME: 2000 | |
kafka-prod-1: | |
image: confluentinc/cp-kafka:6.2.0 | |
healthcheck: | |
test: ps augwwx | egrep [S]upportedKafka | |
depends_on: | |
- zookeeper-prod | |
environment: | |
KAFKA_BROKER_ID: 1 | |
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-prod:2181' | |
KAFKA_LISTENERS: INTERNAL://kafka-prod-1:29092 | |
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-prod-1:29092 | |
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" | |
kafka-prod-2: | |
image: confluentinc/cp-kafka:6.2.0 | |
healthcheck: | |
test: ps augwwx | egrep [S]upportedKafka | |
depends_on: | |
- zookeeper-prod | |
environment: | |
KAFKA_BROKER_ID: 2 | |
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-prod:2181' | |
KAFKA_LISTENERS: INTERNAL://kafka-prod-2:29092 | |
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-prod-2:29092 | |
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" | |
kafka-prod-3: | |
image: confluentinc/cp-kafka:6.2.0 | |
healthcheck: | |
test: ps augwwx | egrep [S]upportedKafka | |
depends_on: | |
- zookeeper-prod | |
environment: | |
KAFKA_BROKER_ID: 3 | |
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-prod:2181' | |
KAFKA_LISTENERS: INTERNAL://kafka-prod-3:29092 | |
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-prod-3:29092 | |
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT | |
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL | |
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 | |
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" | |
schema-registry-prod: | |
image: confluentinc/cp-schema-registry:6.2.0 | |
restart: always | |
depends_on: | |
- kafka-prod-1 | |
environment: | |
SCHEMA_REGISTRY_HOST_NAME: schema-registry-prod | |
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 | |
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka-prod-1:29092' | |
kafka-setup: | |
image: confluentinc/cp-kafka:6.2.0 | |
depends_on: | |
- kafka-prod-1 | |
- kafka-prod-2 | |
- kafka-prod-3 | |
command: "bash -c 'echo Waiting for Kafka to be ready... && \ | |
cub kafka-ready -b kafka-prod-1:29092 1 20 && \ | |
kafka-topics --create --if-not-exists --zookeeper zookeeper-prod:2181 --partitions 3 --replication-factor 2 --topic notification-event && \ | |
kafka-topics --create --if-not-exists --zookeeper zookeeper-prod:2181 --partitions 3 --replication-factor 2 --topic zipkin'" | |
environment: | |
KAFKA_BROKER_ID: ignored | |
KAFKA_ZOOKEEPER_CONNECT: ignored | |
kafka-ui-prod: | |
image: obsidiandynamics/kafdrop:3.27.0 | |
restart: on-failure | |
depends_on: | |
- kafka-prod-1 | |
- kafka-prod-2 | |
- kafka-prod-3 | |
environment: | |
KAFKA_BROKERCONNECT: http://kafka-prod-1:29092, http://kafka-prod-2:29092, http://kafka-prod-3:29092 | |
SERVER_SERVLET_CONTEXTPATH: / | |
SCHEMAREGISTRY_CONNECT: http://schema-registry-prod:8081 | |
JVM_OPTS: "-Xms32M -Xmx64M" | |
ports: | |
- 9000:9000 | |
gateway-prod: | |
image: fragaly/gateway | |
restart: on-failure | |
depends_on: | |
- config-server-prod | |
producer-prod: | |
image: fragaly/producer | |
restart: on-failure | |
depends_on: | |
- schema-registry-prod | |
- gateway-prod | |
consumer-prod: | |
image: fragaly/consumer | |
restart: on-failure | |
depends_on: | |
- schema-registry-prod | |
- gateway-prod | |
web-prod: | |
image: fragaly/web | |
restart: on-failure | |
ports: | |
- 80:80 | |
elasticsearch: | |
image: openzipkin/zipkin-elasticsearch7:2.22.2 | |
container_name: elasticsearch | |
zipkin: | |
image: openzipkin/zipkin:2.23.2 | |
container_name: zipkin | |
ports: | |
- 9411:9411 | |
environment: | |
- STORAGE_TYPE=elasticsearch | |
- ES_HOSTS=elasticsearch:9200 | |
- KAFKA_BOOTSTRAP_SERVERS=http://kafka-prod-1:29092, http://kafka-prod-2:29092, http://kafka-prod-3:29092 | |
depends_on: | |
- elasticsearch |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private fun String.beautify() = this.removeSuffix("]").removePrefix("[") | |
@RestControllerAdvice | |
class ExceptionHandler { | |
data class Information(val status: HttpStatus, val code: Int, val message: String) | |
@ExceptionHandler(IllegalArgumentException::class) | |
@ResponseStatus(BAD_REQUEST) | |
fun handleIllegalArgumentException(exception: IllegalArgumentException) = | |
Information(BAD_REQUEST, BAD_REQUEST.value(), exception.toString()) | |
@ExceptionHandler(MethodArgumentNotValidException::class) | |
@ResponseStatus(BAD_REQUEST) | |
fun handleMethodArgumentNotValidException(exception: MethodArgumentNotValidException) = | |
Information( | |
BAD_REQUEST, | |
BAD_REQUEST.value(), | |
exception.bindingResult.allErrors.map { it.defaultMessage }.joinToString(separator = ", ") | |
) | |
@ExceptionHandler(HttpMessageNotReadableException::class) | |
@ResponseStatus(BAD_REQUEST) | |
fun handleHttpMessageNotReadableException(exception: HttpMessageNotReadableException) = | |
Information(BAD_REQUEST, BAD_REQUEST.value(), exception.localizedMessage.beautify()) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Entity | |
@Table(name = "notification") | |
data class Notification( | |
@Id @GeneratedValue(generator = "UUID") | |
@GenericGenerator(name = "UUID", strategy = "org.hibernate.id.UUIDGenerator") | |
@Column(name = "id", updatable = false, nullable = false) | |
@Type(type = "org.hibernate.type.PostgresUUIDType") | |
var id: UUID? = null, | |
@Column(nullable = false, length = 25) | |
var sender: String = "", | |
@Column(nullable = false, length = 25) | |
var receiver: String = "" | |
) | |
@Repository | |
interface NotificationRepository : CrudRepository<Notification, UUID> | |
@Service | |
class NotificationService(private val repository: NotificationRepository) { | |
private val logger = LoggerFactory.getLogger(javaClass) | |
fun create(notification: Notification) = repository.save(notification) | |
.apply { logger.debug("[CONSUMER] The notification with id [${this.id}] has been created")} | |
fun update(notification: Notification) = | |
if (notification.id == null) throw IllegalArgumentException("The notification should not be null for update request") | |
else repository.findById(notification.id!!).orElseThrow { RuntimeException("Notification not found") } | |
.run { repository.save(notification) } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
enum class EventType { CREATE_NOTIFICATION, UPDATE_NOTIFICATION } | |
data class NotificationEvent(val key: Long? = null, val notification: Notification, val type: EventType) | |
@Entity | |
@Table(name = "notification") | |
data class Notification( | |
@Id @GeneratedValue(generator = "UUID") | |
@GenericGenerator(name = "UUID", strategy = "org.hibernate.id.UUIDGenerator") | |
@Column(name = "id", updatable = false, nullable = false) | |
@Type(type = "org.hibernate.type.PostgresUUIDType") | |
var id: UUID? = null, | |
@Column(nullable = false, length = 25) | |
var sender: String = "", | |
@Column(nullable = false, length = 25) | |
var receiver: String = "" | |
) | |
@Repository | |
interface NotificationRepository : CrudRepository<Notification, UUID> | |
@Service | |
class NotificationService(private val repository: NotificationRepository) { | |
private val logger = LoggerFactory.getLogger(javaClass) | |
fun create(notification: Notification) = repository.save(notification) | |
.apply { logger.debug("[CONSUMER] The notification with id [${this.id}] has been created")} | |
fun update(notification: Notification) = | |
if (notification.id == null) throw IllegalArgumentException("The notification should not be null for update request") | |
else repository.findById(notification.id!!).orElseThrow { RuntimeException("Notification not found") } | |
.run { repository.save(notification) } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@EnableKafka | |
class NotificationEventsConsumer { | |
@Bean | |
fun factory( | |
errorHandler: KafkaErrorHandlingLogger, | |
configurer: ConcurrentKafkaListenerContainerFactoryConfigurer, | |
consumerFactory: ConsumerFactory<Any, Any>, | |
retryTemplate: RetryTemplate | |
): ConcurrentKafkaListenerContainerFactory<Any, Any> { | |
val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>() | |
configurer.configure(factory, consumerFactory) | |
factory.setConcurrency(1) | |
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL | |
factory.setErrorHandler(errorHandler) | |
factory.setRetryTemplate(retryTemplate) | |
return factory | |
} | |
@Bean | |
fun retry(backOffPolicy: FixedBackOffPolicy, retryPolicy: SimpleRetryPolicy): RetryTemplate { | |
val template = RetryTemplate() | |
template.setBackOffPolicy(backOffPolicy) | |
template.setRetryPolicy(retryPolicy) | |
return template | |
} | |
@Bean | |
fun retryPolicy() = SimpleRetryPolicy(3) | |
@Bean | |
fun backOffPolicy(): FixedBackOffPolicy { | |
val policy = FixedBackOffPolicy() | |
policy.backOffPeriod = 1000L //default value | |
return policy | |
} | |
} | |
@Component | |
class KafkaErrorHandlingLogger : ErrorHandler { | |
private val logger = LoggerFactory.getLogger(javaClass) | |
override fun handle(exception: java.lang.Exception, data: ConsumerRecord<*, *>?) { | |
logger.error("[CONSUMER] Error during processing the topic [${data?.topic()}] message[key = [${data?.key()}], value = [${data?.value()}]] from partition [${data?.partition()}] with offset [${data?.offset()}]: $exception") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
enum class EventType { CREATE_NOTIFICATION, UPDATE_NOTIFICATION } | |
data class Notification( | |
val id: UUID?, | |
@field:NotBlank(message = "The sender should not be blank") val sender: String, | |
@field:NotBlank(message = "The receiver should not be blank") val receiver: String | |
) | |
data class NotificationEvent( | |
@field:NotNull(message = "The event id should not be null") val key: Long, | |
@field:Valid val notification: Notification, | |
val type: EventType? | |
) | |
@CrossOrigin(origins = ["http://localhost"]) | |
@RequestMapping("/api/notifications") | |
interface Api { | |
@PostMapping(consumes = [MediaType.APPLICATION_JSON_VALUE]) | |
@ResponseStatus(HttpStatus.NO_CONTENT) | |
fun create(@NotNull(message = "The event cannot be null") @Valid @RequestBody event: NotificationEvent): ResponseEntity<Unit> | |
@PutMapping(value = ["/{id}"], consumes = [MediaType.APPLICATION_JSON_VALUE]) | |
@ResponseStatus(HttpStatus.NO_CONTENT) | |
fun update( | |
@NotNull(message = "The notification id cannot be null") @PathVariable id: UUID, | |
@NotNull(message = "The event cannot be null") @Valid @RequestBody event: NotificationEvent | |
): ResponseEntity<Unit> | |
} | |
@RestController | |
class Controller(private val service: NotificationService) : Api { | |
override fun create(event: NotificationEvent) = | |
service.create(event).run { ResponseEntity.noContent().build<Unit>() } | |
override fun update(id: UUID, event: NotificationEvent) = | |
service.update(id, event).run { ResponseEntity.noContent().build<Unit>() } | |
} | |
@Service | |
class NotificationService(private val producer: EventProducer) { | |
fun create(event: NotificationEvent) = producer.produce( | |
event.copy(type = EventType.CREATE_NOTIFICATION, notification = event.notification.copy(id = null)) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@EnableTransactionManagement | |
class NotificationProducer | |
fun main(args: Array<String>) { | |
runApplication<NotificationProducer>(*args) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile | |
import org.jetbrains.kotlin.util.parseSpaceSeparatedArgs | |
plugins { | |
application | |
id("org.springframework.boot") version "2.5.2" | |
id("io.spring.dependency-management") version "1.0.11.RELEASE" | |
kotlin("jvm") version "1.5.20" | |
kotlin("plugin.spring") version "1.5.20" | |
id("com.google.cloud.tools.jib") version "3.1.1" | |
} | |
springBoot { | |
buildInfo() | |
} | |
group = "by.vk" | |
version = "0.0.1" | |
java.sourceCompatibility = JavaVersion.VERSION_11 | |
application { | |
mainClass.set("com.kafka.producer.demo.NotificationProducerKt") | |
applicationName = "kafka-producer" | |
} | |
repositories { | |
mavenCentral() | |
} | |
extra["springCloudVersion"] = "2020.0.3" | |
configurations.all { | |
exclude(group = "org.springframework.boot", module ="spring-boot-starter-tomcat") | |
exclude(group = "org.junit.vintage", module = "junit-vintage-engine") | |
} | |
dependencies { | |
implementation("org.springframework.boot:spring-boot-starter-web") | |
implementation("com.fasterxml.jackson.module:jackson-module-kotlin") | |
implementation("org.springframework.boot:spring-boot-starter-undertow") | |
implementation("org.springframework.cloud:spring-cloud-config-client") | |
implementation("org.springframework.cloud:spring-cloud-starter-sleuth") | |
implementation("org.springframework.cloud:spring-cloud-sleuth-zipkin") | |
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka") | |
implementation("org.springframework.boot:spring-boot-starter-validation") | |
implementation("org.jetbrains.kotlin:kotlin-reflect") | |
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8") | |
implementation("org.springframework.kafka:spring-kafka") | |
implementation("ch.qos.logback.contrib:logback-json-classic:0.1.5") | |
implementation("ch.qos.logback.contrib:logback-jackson:0.1.5") | |
testImplementation("org.springframework.boot:spring-boot-starter-test") | |
testImplementation("org.springframework.kafka:spring-kafka-test") | |
} | |
dependencyManagement { | |
imports { | |
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}") | |
} | |
} | |
tasks.withType<Test> { | |
useJUnitPlatform() | |
} | |
tasks.withType<KotlinCompile> { | |
kotlinOptions { | |
freeCompilerArgs = listOf("-Xjsr305=strict") | |
jvmTarget = "11" | |
} | |
} | |
kotlin { | |
sourceSets["test"].apply { | |
kotlin.srcDirs("src/test/kotlin/unit", "src/test/kotlin/integration") | |
} | |
} | |
object DockerProps { | |
const val BASE_IMAGE = "gcr.io/distroless/java:11" | |
const val APP_PORT = "8083" | |
const val DEBUG_PORT = "5083" | |
const val JMX_PORT = "38083" | |
} | |
object JVMProps { | |
const val XMX = "512m" | |
const val XMS = "128m" | |
const val MAX_METASPACE_SIZE = "128m" | |
const val MAX_DIRECT_MEMORY_SIZE = "256m" | |
const val HEAPDUMP_PATH = "/opt/tmp/heapdump.bin" | |
} | |
jib { | |
from { | |
image = DockerProps.BASE_IMAGE | |
} | |
container { | |
jvmFlags = parseSpaceSeparatedArgs("-noverify -XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -XX:InitialRAMPercentage=50.0 -XX:+OptimizeStringConcat -XX:+UseStringDeduplication -XX:+ExitOnOutOfMemoryError -XX:+AlwaysActAsServerClassMachine -Xmx${JVMProps.XMX} -Xms${JVMProps.XMS} -XX:MaxMetaspaceSize=${JVMProps.MAX_METASPACE_SIZE} -XX:MaxDirectMemorySize=${JVMProps.MAX_DIRECT_MEMORY_SIZE} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${JVMProps.HEAPDUMP_PATH} -Djava.rmi.server.hostname=localhost -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=${DockerProps.JMX_PORT} -Dcom.sun.management.jmxremote.rmi.port=${DockerProps.JMX_PORT} -Dspring.profiles.active=prod") | |
ports = listOf(DockerProps.APP_PORT, DockerProps.DEBUG_PORT, DockerProps.JMX_PORT) | |
labels.set(mapOf("maintainer" to "Vadzim Kavalkou <vadzim.kavalkou@gmail.com>", | |
"app-name" to application.applicationName, | |
"service-version" to version.toString())) | |
creationTime = "USE_CURRENT_TIMESTAMP" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment