Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active October 27, 2018 16:02
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/8f1a3e31882c076c704f to your computer and use it in GitHub Desktop.
Save akarnokd/8f1a3e31882c076c704f to your computer and use it in GitHub Desktop.
package hu.akarnokd.reactiveflowbridge;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.Function;
/**
* Bridge between Reactive-Streams API and the Java 9 Flow API.
*/
public final class ReactiveFlowBridge {
/** Utility class. */
private ReactiveFlowBridge() {
throw new IllegalStateException("No instances!");
}
/**
* Converts a Flow Publisher into a Reactive Publisher.
* @param <T> the value type
* @param flowPublisher the source Flow Publisher to convert
* @return the equivalent Reactive Publisher
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toReactive(
Flow.Publisher<? extends T> flowPublisher) {
Objects.requireNonNull(flowPublisher, "flowPublisher");
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
}
if (flowPublisher instanceof FlowPublisherFromReactive) {
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactive);
}
return new ReactivePublisherFromFlow<>(flowPublisher);
}
/**
* Converts a Reactive Publisher into a Flow Publisher.
* @param <T> the value type
* @param reactivePublisher the source Reactive Publisher to convert
* @return the equivalent Flow Publisher
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Publisher<T> toFlow(
org.reactivestreams.Publisher<? extends T> reactivePublisher
) {
Objects.requireNonNull(reactivePublisher, "reactivePublisher");
if (reactivePublisher instanceof org.reactivestreams.Publisher) {
return (Flow.Publisher<T>)reactivePublisher;
}
if (reactivePublisher instanceof ReactivePublisherFromFlow) {
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactivePublisher).flow);
}
return new FlowPublisherFromReactive<>(reactivePublisher);
}
/**
* Converts a Flow Processor into a Reactive Processor.
* @param <T> the input value type
* @param <U> the output value type
* @param flowProcessor the source Flow Processor to convert
* @return the equivalent Reactive Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> org.reactivestreams.Processor<T, U> toReactive(
Flow.Processor<? super T, ? extends U> flowProcessor
) {
Objects.requireNonNull(flowProcessor, "flowProcessor");
if (flowProcessor instanceof org.reactivestreams.Processor) {
return (org.reactivestreams.Processor<T, U>)flowProcessor;
}
if (flowProcessor instanceof FlowToReactiveProcessor) {
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactive);
}
return new ReactiveToFlowProcessor<>(flowProcessor);
}
/**
* Converts a Reactive Processor into a Flow Processor.
* @param <T> the input value type
* @param <U> the output value type
* @param reactiveProcessor the source Reactive Processor to convert
* @return the equivalent Flow Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> Flow.Processor<T, U> toFlow(
org.reactivestreams.Processor<? super T, ? extends U> reactiveProcessor
) {
Objects.requireNonNull(reactiveProcessor, "reactiveProcessor");
if (reactiveProcessor instanceof org.reactivestreams.Processor) {
return (Flow.Processor<T, U>)reactiveProcessor;
}
if (reactiveProcessor instanceof ReactiveToFlowProcessor) {
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveProcessor).flow);
}
return new FlowToReactiveProcessor<>(reactiveProcessor);
}
/**
* Converts a Reactive Publisher into a Flow Publisher via a function then
* applies the returned Flow Publisher to a transformer function that returns a value.
* <p>
* This allows a more inlined conversion from a Reactive Publisher to a
* fluent Flow-based API:
* <pre><code>
* org.reactivestreams.Publisher&lt;T> source = ...
*
* ReactiveFlowBridge.to(
* source,
* ReactiveFlowBridge::toFlow,
* FluentFlow::from)
* .map(v -> v)
* .subscribe(...);
* </code></pre>
* @param <T> the element type
* @param <U> the output type
* @param reactivePublisher the source Reactive Publisher
* @param toFlow the function that converts from Reactive Publisher into Flow Publisher
* @param transformer the transformer that takes a Flow Publisher and returns some transformation of it
* @return the transformation result
*/
public static <T, U> U to(
org.reactivestreams.Publisher<? extends T> reactivePublisher,
Function<? super org.reactivestreams.Publisher<? extends T>, ? extends Flow.Publisher<? extends T>> toFlow,
Function<Flow.Publisher<? extends T>, U> transformer) {
return transformer.apply(toFlow.apply(reactivePublisher));
}
/**
* Converts a Flow Publisher into a Reactive Publisher via a function then
* applies the returned Reactive Publisher to a transformer function that returns a value.
* <p>
* This allows a more inlined conversion from a Flow Publisher to a
* fluent Reactive-based API:
* <pre><code>
* Flow.Publisher&lt;T> source = ...
*
* ReactiveFlowBridge.to(
* source,
* ReactiveFlowBridge::toFlow,
* Observable::fromPublisher)
* .map(v -> v)
* .subscribe(...);
* </code></pre>
* @param <T> the element type
* @param <U> the output type
* @param flowPublisher the source Reactive Publisher
* @param toReactive the function that converts from Reactive Publisher into Flow Publisher
* @param transformer the transformer that takes a Flow Publisher and returns some transformation of it
* @return the transformation result
*/
public static <T, U> U to(
Flow.Publisher<? extends T> flowPublisher,
Function<? super Flow.Publisher<? extends T>, ? extends org.reactivestreams.Publisher<? extends T>> toReactive,
Function<org.reactivestreams.Publisher<? extends T>, U> transformer) {
return transformer.apply(toReactive.apply(flowPublisher));
}
/**
* Wraps a Reactive Subscription and converts the calls to a Flow Subscription.
*/
static final class FlowToReactiveSubscription implements Flow.Subscription {
private final org.reactivestreams.Subscription reactive;
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
this.reactive = reactive;
}
@Override
public void request(long n) {
reactive.request(n);
}
@Override
public void cancel() {
reactive.cancel();
}
}
/**
* Wraps a Flow Subscription and converts the calls to a Reactive Subscription.
*/
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
private final Flow.Subscription flow;
public ReactiveToFlowSubscription(Flow.Subscription flow) {
this.flow = flow;
}
@Override
public void request(long n) {
flow.request(n);
}
@Override
public void cancel() {
flow.cancel();
}
}
/**
* Wraps a Reactive Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the value type
*/
static final class FlowToReactiveSubscriber<T>
implements Flow.Subscriber<T> {
private final org.reactivestreams.Subscriber<? super T> reactive;
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
this.reactive = reactive;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
reactive.onSubscribe(new ReactiveToFlowSubscription(subscription));
}
@Override
public void onNext(T item) {
reactive.onNext(item);
}
@Override
public void onError(Throwable throwable) {
reactive.onError(throwable);
}
@Override
public void onComplete() {
reactive.onComplete();
}
}
/**
* Wraps a Reactive Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the value type
*/
static final class ReactiveToFlowSubscriber<T>
implements org.reactivestreams.Subscriber<T> {
private final Flow.Subscriber<? super T> flow;
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe(new FlowToReactiveSubscription(subscription));
}
@Override
public void onNext(T item) {
flow.onNext(item);
}
@Override
public void onError(Throwable throwable) {
flow.onError(throwable);
}
@Override
public void onComplete() {
flow.onComplete();
}
}
/**
* Wraps a Flow Processor and forwards methods of the Reactive Processor to it.
* @param <T> the input type
* @param <U> the output type
*/
static final class ReactiveToFlowProcessor<T, U>
implements org.reactivestreams.Processor<T, U> {
private final Flow.Processor<? super T, ? extends U> flow;
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
flow.onSubscribe(new FlowToReactiveSubscription(s));
}
@Override
public void onNext(T t) {
flow.onNext(t);
}
@Override
public void onError(Throwable t) {
flow.onError(t);
}
@Override
public void onComplete() {
flow.onComplete();
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
flow.subscribe(new FlowToReactiveSubscriber<>(s));
}
}
/**
* Wraps a Reactive Processor and forwards methods of the Flow Processor to it.
* @param <T> the input type
* @param <U> the output type
*/
static final class FlowToReactiveProcessor<T, U>
implements Flow.Processor<T, U> {
private final org.reactivestreams.Processor<? super T, ? extends U> reactive;
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
this.reactive = reactive;
}
@Override
public void onSubscribe(Flow.Subscription s) {
reactive.onSubscribe(new ReactiveToFlowSubscription(s));
}
@Override
public void onNext(T t) {
reactive.onNext(t);
}
@Override
public void onError(Throwable t) {
reactive.onError(t);
}
@Override
public void onComplete() {
reactive.onComplete();
}
@Override
public void subscribe(Flow.Subscriber<? super U> s) {
reactive.subscribe(new ReactiveToFlowSubscriber<>(s));
}
}
/**
* Reactive Publisher that wraps a Flow Publisher.
* @param <T> the value type
*/
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
final Flow.Publisher<? extends T> flow;
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
this.flow = flowPublisher;
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
flow.subscribe(new FlowToReactiveSubscriber<>(reactive));
}
}
/**
* Flow Publisher that wraps a Reactive Publisher.
* @param <T> the value type
*/
static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
final org.reactivestreams.Publisher<? extends T> reactive;
public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
this.reactive = reactivePublisher;
}
@Override
public void subscribe(Flow.Subscriber<? super T> flow) {
reactive.subscribe(new ReactiveToFlowSubscriber<>(flow));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment