Skip to content

Instantly share code, notes, and snippets.

@damithadayananda
Created September 18, 2020 12:12
Show Gist options
  • Save damithadayananda/70c5c3556a9194e67abbf58c6e7bd6f1 to your computer and use it in GitHub Desktop.
Save damithadayananda/70c5c3556a9194e67abbf58c6e7bd6f1 to your computer and use it in GitHub Desktop.
package main.reactiveAPI;
public class Employee {
private int id;
private String fname;
private String sname;
private String fullName;
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public String getSname() {
return sname;
}
public void setSname(String sname) {
this.sname = sname;
}
public Employee(String fname, String sname) {
this.fname = fname;
this.sname = sname;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getFName() {
return fname;
}
public void setFName(String name) {
this.fname = name;
}
@Override
public String toString() {
return "Employee{" +
"id=" + id +
", name='" + fname + '\'' +
'}';
}
}
package main.reactiveAPI;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class MyProcessor extends SubmissionPublisher<Employee> implements Flow.Processor<Employee, Employee> {
private Flow.Subscription subscription;
private Function<Employee, Employee> function;
public MyProcessor(Function<Employee, Employee> function){
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Employee item) {
submit((Employee) function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
package main.reactiveAPI;
import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<Employee> {
private Flow.Subscription subscription;
int count =0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("Subscribed for employees");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(Employee item) {
System.out.println("Processing employee:"+item);
count++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Error occurred");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All processing done");
}
public int getCounter(){
return count;
}
}
package main.reactiveAPI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
public class ReactiveAPIDemo {
public ReactiveAPIDemo() {
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
MyProcessor transformProcessor = new MyProcessor(s -> {
s.setFullName(s.getFName()+" "+s.getSname());
return s;
});
MySubscriber subs = new MySubscriber();
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subs);
List<Employee> emps = new ArrayList<>(Arrays.asList(
new Employee("D","d"),
new Employee("A","a"),
new Employee("B","b"),
new Employee("C","c"),
new Employee("E", "e")
));
System.out.println("Publishing items");
emps.stream().forEach(
i -> publisher.submit(i));
while (emps.size() != subs.getCounter()){
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
publisher.close();
transformProcessor.close();
System.out.println("done");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment