Skip to content

Instantly share code, notes, and snippets.

@sinujohn
Created April 26, 2020 19:03
Show Gist options
  • Save sinujohn/e2708943e884118b3440350778b2f9b0 to your computer and use it in GitHub Desktop.
Save sinujohn/e2708943e884118b3440350778b2f9b0 to your computer and use it in GitHub Desktop.
Stream Messaging Application with Partitioning - http://sinujohn.com/
# Producer Properties
spring.cloud.stream.source=personSupplier
spring.cloud.stream.bindings.personSupplier-out-0.destination=people
spring.cloud.stream.bindings.personSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.personSupplier-out-0.producer.partitionCount=2
# Consumer Properties
spring.cloud.function.definition=personConsumer
spring.cloud.stream.bindings.personConsumer-in-0.destination=people
spring.cloud.stream.bindings.personConsumer-in-0.group=peoplegroup
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseStatus;
import java.util.function.Consumer;
@SpringBootApplication
@Controller
public class StreamPartitionDemoApplication {
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(StreamPartitionDemoApplication.class, args);
}
@Bean
public Consumer<Person> personConsumer() {
return person -> {
System.out.println("Received: " + person);
};
}
@GetMapping("/send/{id}/{name}")
@ResponseStatus(HttpStatus.ACCEPTED)
public void delegateToSupplier(@PathVariable("id") final int id, @PathVariable("name") final String name) {
final Person p = new Person(id, name);
System.out.println("Sending person: " + p);
streamBridge.send("personSupplier-out-0", p);
}
@Getter
@Setter
@AllArgsConstructor
public static class Person {
private int id;
private String name;
public String toString() {
return "[" + this.id + ", " + this.name + "]";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment