Created
June 17, 2022 12:11
-
-
Save vicety/0a8aa4cf6f8d0e50f8e789266277f4f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.github.vicety; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Flow; | |
import java.util.concurrent.SubmissionPublisher; | |
import java.util.function.Function; | |
public class Main { | |
public static void main(String[] args) throws InterruptedException { | |
// Create End Publisher | |
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); | |
SubmissionPublisher<Employee> publisher2 = new SubmissionPublisher<>(); | |
// Create Processor | |
MyProcessor transformProcessor = new MyProcessor(s -> { | |
return new Freelancer(s.getId(), s.getId() + 100, s.getName()); | |
}); | |
//Create End Subscriber | |
MyFreelancerSubscriber subs = new MyFreelancerSubscriber(); | |
//Create chain of publisher, processor and subscriber | |
publisher.subscribe(transformProcessor); // publisher to processor | |
publisher2.subscribe(transformProcessor); | |
transformProcessor.subscribe(subs); // processor to subscriber | |
List<Employee> emps = EmpHelper.getEmps(); | |
List<Employee> emps2 = EmpHelper.getEmps2(); | |
Thread.sleep(1000); | |
// Publish items | |
System.out.println("Publishing Items to Subscriber"); | |
emps.stream().forEach(i -> { | |
publisher.submit(i); | |
}); | |
emps2.stream().forEach(i -> publisher2.submit(i)); | |
// Logic to wait for messages processing to finish | |
// while (emps.size() != subs.getCounter()) { | |
Thread.sleep(1000); | |
// } | |
// Closing publishers | |
publisher.close(); | |
publisher2.close(); | |
transformProcessor.close(); | |
System.out.println("Exiting the app"); | |
} | |
} | |
class EmpHelper { | |
public static List<Employee> getEmps() { | |
Employee e1 = new Employee(1, "Pankaj"); | |
Employee e2 = new Employee(2, "David"); | |
Employee e3 = new Employee(3, "Lisa"); | |
Employee e4 = new Employee(4, "Ram"); | |
Employee e5 = new Employee(5, "Anupam"); | |
List<Employee> emps = new ArrayList<>(); | |
emps.add(e1); | |
emps.add(e2); | |
emps.add(e3); | |
emps.add(e4); | |
emps.add(e5); | |
return emps; | |
} | |
public static List<Employee> getEmps2() { | |
Employee e1 = new Employee(1, "qqqqq"); | |
List<Employee> emps = new ArrayList<>(); | |
emps.add(e1); | |
return emps; | |
} | |
} | |
class MyFreelancerSubscriber implements Flow.Subscriber<Freelancer> { | |
private Flow.Subscription subscription; | |
private int counter = 0; | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
// System.out.println("Subscribed for Freelancer"); | |
this.subscription = subscription; | |
this.subscription.request(1); //requesting data from publisher | |
System.out.println("Freelancer onSubscribe requested 1 item"); | |
} | |
@Override | |
public void onNext(Freelancer item) { | |
System.out.println("Processing Freelancer " + item); | |
counter++; | |
this.subscription.request(1); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.println("Some error happened in MyFreelancerSubscriber"); | |
e.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println("All Processing Done for MyFreelancerSubscriber"); | |
} | |
public int getCounter() { | |
return counter; | |
} | |
} | |
class MyProcessor extends SubmissionPublisher<Freelancer> implements Flow.Processor<Employee, Freelancer> { | |
private Flow.Subscription subscription; | |
private Function<Employee, Freelancer> function; | |
public MyProcessor(Function<Employee, Freelancer> function) { | |
super(); | |
this.function = function; | |
} | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
this.subscription = subscription; | |
System.out.println("Processor onSubscribe requested 1 item"); | |
subscription.request(1); | |
} | |
@Override | |
public void onNext(Employee emp) { | |
System.out.println("processor onNext 1 item " + emp.getName()); | |
submit((Freelancer) function.apply(emp)); | |
subscription.request(1); | |
} | |
@Override | |
public void onError(Throwable e) { | |
e.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println("Done"); | |
} | |
} | |
class Employee { | |
private int id; | |
private String name; | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
public String getName() { | |
return name; | |
} | |
public void setName(String name) { | |
this.name = name; | |
} | |
public Employee(int i, String s) { | |
this.id = i; | |
this.name = s; | |
} | |
public Employee() { | |
} | |
@Override | |
public String toString() { | |
return "[id=" + id + ",name=" + name + "]"; | |
} | |
} | |
class Freelancer extends Employee { | |
private int fid; | |
public int getFid() { | |
return fid; | |
} | |
public void setFid(int fid) { | |
this.fid = fid; | |
} | |
public Freelancer(int id, int fid, String name) { | |
super(id, name); | |
this.fid = fid; | |
} | |
@Override | |
public String toString() { | |
return "[id=" + super.getId() + ",name=" + super.getName() + ",fid=" + fid + "]"; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment