Skip to content

Instantly share code, notes, and snippets.

@mayankcpdixit
Last active December 13, 2018 07:27
Show Gist options
  • Save mayankcpdixit/8d38cbe0741a192e6a97be589d156842 to your computer and use it in GitHub Desktop.
Save mayankcpdixit/8d38cbe0741a192e6a97be589d156842 to your computer and use it in GitHub Desktop.
Getting started with SpringBoot and kafka
  1. Install Kafka and start zkServer and kafka-server
  2. Uncomment listeners & advertised.listerners in server.properties.
  3. SpringBoot App for kafka (sample)
  4. Run kafka in local
  5. gradle build this repo
  6. Start the app using: java -jar build/libs/spring-kafka-0.0.1-SNAPSHOT.jar
  7. Publish using http://localhost:8080/greetings?message=HelloManThisIsPublished
  8. Check terminal logs for publish and polling logs. Cheers!

App Directory structure. Download from git repo.

├── build.gradle
└── src
    └── main
        ├── java
        │   └── com
        │       └── springkafka
        │           └── streamkafka
        │               ├── StreamkafkaApplication.java (Entry point)
        │               ├── config
        │               │   └── StreamsConfig.java
        │               ├── model
        │               │   └── Greetings.java
        │               ├── service
        │               │   ├── GreetingsListener.java
        │               │   └── GreetingsService.java
        │               ├── stream
        │               │   └── GreetingsStreams.java
        │               └── web
        │                   └── GreetingsController.java
        └── resources
            └── application.yaml

resources/application.yaml

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        greetings-in:
          destination: greetings
          contentType: application/json
        greetings-out:
          destination: greetings
          contentType: application/json

build.gradle

buildscript {
	ext {
		springBootVersion = '2.1.1.RELEASE'
	}
	repositories {
		mavenCentral()
	}
	dependencies {
		classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
	}
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.springkafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
	mavenCentral()
	maven { url "https://repo.spring.io/milestone" }
}

ext['springCloudVersion'] = 'Greenwich.M3'
// High chance you might run into compatibility issue: Chnage springCloudVersion after verifying her: http://start.spring.io/actuator/info

dependencies {
  compile("org.springframework.boot:spring-boot-starter-web")
	implementation('org.springframework.boot:spring-boot-starter-actuator')
	implementation('org.springframework.cloud:spring-cloud-stream')
	implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka')
	implementation('org.springframework.kafka:spring-kafka')
	runtimeOnly('org.springframework.boot:spring-boot-devtools')
	compileOnly('org.projectlombok:lombok')
	testImplementation('org.springframework.boot:spring-boot-starter-test')
	testImplementation('org.springframework.cloud:spring-cloud-stream-test-support')
	testImplementation('org.springframework.kafka:spring-kafka-test')
}

dependencyManagement {
	imports {
		mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
	}
}

StreamkafkaApplication.java

package com.springkafka.streamkafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamkafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(StreamkafkaApplication.class, args);
	}
}

config/StreamsConfig.java

package com.springkafka.streamkafka.config;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

model/Greetings.java

package com.springkafka.streamkafka.model;
// lombok autogenerates getters, setters, toString() and a builder (see https://projectlombok.org/):
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter @Setter @ToString @Builder
public class Greetings {
    private long timestamp;
    private String message;
}

service/GreetingsListener.java

package com.springkafka.streamkafka.service;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class GreetingsListener {
    @StreamListener(GreetingsStreams.INPUT)
    public void handleGreetings(@Payload Greetings greetings) {
        log.info("Received greetings: {}", greetings);
    }
}

service/GreetingsService.java

package com.springkafka.streamkafka.service;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.stream.GreetingsStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
@Service
@Slf4j
public class GreetingsService {
    private final GreetingsStreams greetingsStreams;
    public GreetingsService(GreetingsStreams greetingsStreams) {
        this.greetingsStreams = greetingsStreams;
    }
    public void sendGreeting(final Greetings greetings) {
        log.info("Sending greetings {}", greetings);
        MessageChannel messageChannel = greetingsStreams.outboundGreetings();
        messageChannel.send(MessageBuilder
                .withPayload(greetings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

stream/GreetingsStreams.java

package com.springkafka.streamkafka.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  
public interface GreetingsStreams {
    String INPUT = "greetings-in";
    String OUTPUT = "greetings-out";
    @Input(INPUT)
    SubscribableChannel inboundGreetings();
    @Output(OUTPUT)
    MessageChannel outboundGreetings();
}

web/GreetingsController.java

package com.springkafka.streamkafka.web;
import com.springkafka.streamkafka.model.Greetings;
import com.springkafka.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController; 
@RestController
public class GreetingsController {
    private final GreetingsService greetingsService;
    public GreetingsController(GreetingsService greetingsService) {
        this.greetingsService = greetingsService;
    }
    @GetMapping("/greetings")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") String message) {
        Greetings greetings = Greetings.builder()
            .message(message)
            .timestamp(System.currentTimeMillis())
            .build();
        greetingsService.sendGreeting(greetings);
    }
}

PS: blog Tags: java-kafka, springboot, kafka, kafka in 5 minutes, quick start kafka, kafka and java, kafka and springboot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment