Skip to content

Instantly share code, notes, and snippets.

@Danushka96
Created August 21, 2019 11:12
Show Gist options
  • Save Danushka96/90f8d246573ed971e21eea19be40666b to your computer and use it in GitHub Desktop.
Save Danushka96/90f8d246573ed971e21eea19be40666b to your computer and use it in GitHub Desktop.
Flux Data stream methods in JAVA
package com.ustack.reactor.examples;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class streamData {
public static void main(String args[]){
List<Integer> elements = new ArrayList<>();
Flux.just(1,2,3,4)
.log()
.subscribe(elements::add);
elements.forEach(System.out::println);
// With Subscribe Interface directly
Flux.just(1,2,3,4)
.log()
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
System.out.println(integer);
}
@Override
public void onError(Throwable throwable) {
System.out.println("something went wrong");
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// With Backpressure
Flux.just(1,2,3,4)
.log()
.subscribe(new Subscriber<Integer>() {
private Subscription s;
int OnNextAmount;
@Override
public void onSubscribe(Subscription subscription) {
this.s = subscription;
s.request(2);
}
@Override
public void onNext(Integer integer) {
elements.add(integer);
this.OnNextAmount++;
if(this.OnNextAmount%2==0) s.request(2);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});
// Mapping Data in the Stream
Flux.just(1,2,3,4)
.log()
.map(i -> i*2)
.subscribe(elements::add);
elements.forEach(System.out::println);
// Combine two streams
Flux.just(1,2,3,4)
.log()
.map(i -> i*2)
.zipWith(Flux.range(0,Integer.MAX_VALUE),
(one, two) -> String.format("First Flux: %d, Second Flux %d", one, two))
.subscribe(System.out::println);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment