Skip to content

Instantly share code, notes, and snippets.

@vicety
Created June 17, 2022 12:11
Show Gist options
  • Save vicety/0a8aa4cf6f8d0e50f8e789266277f4f7 to your computer and use it in GitHub Desktop.
Save vicety/0a8aa4cf6f8d0e50f8e789266277f4f7 to your computer and use it in GitHub Desktop.
package io.github.vicety;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class Main {
public static void main(String[] args) throws InterruptedException {
// Create End Publisher
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
SubmissionPublisher<Employee> publisher2 = new SubmissionPublisher<>();
// Create Processor
MyProcessor transformProcessor = new MyProcessor(s -> {
return new Freelancer(s.getId(), s.getId() + 100, s.getName());
});
//Create End Subscriber
MyFreelancerSubscriber subs = new MyFreelancerSubscriber();
//Create chain of publisher, processor and subscriber
publisher.subscribe(transformProcessor); // publisher to processor
publisher2.subscribe(transformProcessor);
transformProcessor.subscribe(subs); // processor to subscriber
List<Employee> emps = EmpHelper.getEmps();
List<Employee> emps2 = EmpHelper.getEmps2();
Thread.sleep(1000);
// Publish items
System.out.println("Publishing Items to Subscriber");
emps.stream().forEach(i -> {
publisher.submit(i);
});
emps2.stream().forEach(i -> publisher2.submit(i));
// Logic to wait for messages processing to finish
// while (emps.size() != subs.getCounter()) {
Thread.sleep(1000);
// }
// Closing publishers
publisher.close();
publisher2.close();
transformProcessor.close();
System.out.println("Exiting the app");
}
}
class EmpHelper {
public static List<Employee> getEmps() {
Employee e1 = new Employee(1, "Pankaj");
Employee e2 = new Employee(2, "David");
Employee e3 = new Employee(3, "Lisa");
Employee e4 = new Employee(4, "Ram");
Employee e5 = new Employee(5, "Anupam");
List<Employee> emps = new ArrayList<>();
emps.add(e1);
emps.add(e2);
emps.add(e3);
emps.add(e4);
emps.add(e5);
return emps;
}
public static List<Employee> getEmps2() {
Employee e1 = new Employee(1, "qqqqq");
List<Employee> emps = new ArrayList<>();
emps.add(e1);
return emps;
}
}
class MyFreelancerSubscriber implements Flow.Subscriber<Freelancer> {
private Flow.Subscription subscription;
private int counter = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
// System.out.println("Subscribed for Freelancer");
this.subscription = subscription;
this.subscription.request(1); //requesting data from publisher
System.out.println("Freelancer onSubscribe requested 1 item");
}
@Override
public void onNext(Freelancer item) {
System.out.println("Processing Freelancer " + item);
counter++;
this.subscription.request(1);
}
@Override
public void onError(Throwable e) {
System.out.println("Some error happened in MyFreelancerSubscriber");
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("All Processing Done for MyFreelancerSubscriber");
}
public int getCounter() {
return counter;
}
}
class MyProcessor extends SubmissionPublisher<Freelancer> implements Flow.Processor<Employee, Freelancer> {
private Flow.Subscription subscription;
private Function<Employee, Freelancer> function;
public MyProcessor(Function<Employee, Freelancer> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Processor onSubscribe requested 1 item");
subscription.request(1);
}
@Override
public void onNext(Employee emp) {
System.out.println("processor onNext 1 item " + emp.getName());
submit((Freelancer) function.apply(emp));
subscription.request(1);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}
class Employee {
private int id;
private String name;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Employee(int i, String s) {
this.id = i;
this.name = s;
}
public Employee() {
}
@Override
public String toString() {
return "[id=" + id + ",name=" + name + "]";
}
}
class Freelancer extends Employee {
private int fid;
public int getFid() {
return fid;
}
public void setFid(int fid) {
this.fid = fid;
}
public Freelancer(int id, int fid, String name) {
super(id, name);
this.fid = fid;
}
@Override
public String toString() {
return "[id=" + super.getId() + ",name=" + super.getName() + ",fid=" + fid + "]";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment