Skip to content

Instantly share code, notes, and snippets.

@sinujohn
Created April 26, 2020 18:58
Show Gist options
  • Save sinujohn/b1466e9be6a55eb7f9c848a23a2825ba to your computer and use it in GitHub Desktop.
Save sinujohn/b1466e9be6a55eb7f9c848a23a2825ba to your computer and use it in GitHub Desktop.
Cloud Stream App with No Partitioning.
spring.cloud.function.definition=timeSupplier;personConsumer;uppercase
spring.cloud.stream.bindings.timeSupplier-out-0.destination=time
spring.cloud.stream.bindings.personConsumer-in-0.destination=people
spring.cloud.stream.bindings.uppercase-in-0.destination=upper-input
spring.cloud.stream.bindings.uppercase-out-0.destination=upper-output
spring.cloud.stream.source=personSupplier
spring.cloud.stream.bindings.personSupplier-out-0.destination=people
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseStatus;
import java.util.function.Consumer;
import java.util.function.Function;
@SpringBootApplication
@Controller
public class StreamDemoApplication {
public static void main(String[] args) {
SpringApplication.run(StreamDemoApplication.class, args);
}
// °º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`
// Basics
// °º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`
//Uncommenting following will publish event every second
/*@Bean
public Supplier<String> timeSupplier() {
return () -> "Time is " + System.currentTimeMillis();
}*/
@Bean
public Consumer<Person> personConsumer() {
return person -> {
System.out.println("Received: " + person);
};
}
@Bean
public Function<String, String> uppercase() {
return str -> {
System.out.println("Got this: " + str);
return str.toUpperCase();
};
}
// °º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`
// Foreign Event driven producer
// °º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`°º¤ø,¸¸,ø¤º°`
@Autowired
private StreamBridge streamBridge;
@GetMapping("/send/{name}")
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@PathVariable("name") final String name) {
final Person p = new Person();
p.setName(name);
System.out.println("Sending person: " + p);
streamBridge.send("personSupplier-out-0", p);
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment