Create a gist now

Instantly share code, notes, and snippets.

package osgi;
import java.io.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.*;
import org.osgi.util.promise.*;
import org.reactivestreams.*;
import io.reactivex.*;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public enum AsyncEventStream {
;
public interface PushEvent<T> {
public enum EventType {
CLOSE,
DATA,
ERROR
}
EventType getType();
T getData();
Throwable getFailure();
boolean isTerminal();
static <T> PushEvent<T> data(T value) {
return new PushEvent<T>() {
@Override
public EventType getType() {
return EventType.DATA;
}
@Override
public T getData() {
return value;
}
@Override
public Throwable getFailure() {
return null;
}
@Override
public boolean isTerminal() {
return false;
}
};
}
static <T> PushEvent<T> error(Throwable e) {
return new PushEvent<T>() {
@Override
public EventType getType() {
return EventType.ERROR;
}
@Override
public T getData() {
return null;
}
@Override
public Throwable getFailure() {
return e;
}
@Override
public boolean isTerminal() {
return false;
}
};
}
enum PushEventClose implements PushEvent<Object> {
INSTANCE;
@Override
public EventType getType() {
return EventType.CLOSE;
}
@Override
public Object getData() {
return null;
}
@Override
public Throwable getFailure() {
return null;
}
@Override
public boolean isTerminal() {
return true;
}
}
@SuppressWarnings("unchecked")
static <T> PushEvent<T> close() {
return (PushEvent<T>)PushEventClose.INSTANCE;
}
}
@FunctionalInterface
public interface PushEventConsumer<T> {
long ABORT = -1L;
long CONTINUE = 0L;
long accept(PushEvent<T> e) throws Exception;
}
public interface PushEventSource<T> {
Closeable open(PushEventConsumer<? super T> consumer) throws Exception;
}
public static <T> PushEventSource<T> from(
Publisher<? extends T> publisher, Scheduler scheduler) {
return c -> {
CompositeDisposable cd = new CompositeDisposable();
Scheduler.Worker w = scheduler.createWorker();
cd.add(w);
publisher.subscribe(new Subscriber<T>() {
Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
cd.add(s::cancel);
if (!cd.isDisposed()) {
s.request(1);
}
}
@Override
public void onNext(T t) {
long backpressure;
try {
backpressure = c.accept(PushEvent.data(t));
} catch (Exception e) {
onError(e);
return;
}
if (backpressure <= PushEventConsumer.ABORT) {
cd.dispose();
} else
if (backpressure == PushEventConsumer.CONTINUE) {
s.request(1);
} else {
w.schedule(() -> s.request(1), backpressure, TimeUnit.MILLISECONDS);
}
}
@Override
public void onError(Throwable t) {
cd.dispose();
try {
c.accept(PushEvent.error(t));
} catch (Exception ex) {
RxJavaPlugins.onError(ex);
}
}
@Override
public void onComplete() {
cd.dispose();
try {
c.accept(PushEvent.close());
} catch (Exception ex) {
RxJavaPlugins.onError(ex);
}
}
});
return cd::dispose;
};
}
public static <T> Observable<T> to(PushEventSource<? extends T> source, long backpressure) {
return Observable.create(new Publisher<T>() {
@Override
public void subscribe(Subscriber<? super T> s) {
CompositeDisposable cd = new CompositeDisposable();
AtomicLong requested = new AtomicLong();
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
BackpressureHelper.add(requested, n);
}
@Override
public void cancel() {
cd.dispose();
}
});
try {
Closeable c = source.open(new PushEventConsumer<T>() {
@Override
public long accept(PushEvent<T> c) throws Exception {
if (cd.isDisposed()) {
return ABORT;
}
switch (c.getType()) {
case DATA:
s.onNext(c.getData());
for (;;) {
long r = requested.get();
if (r == 0) {
return backpressure;
}
if (requested.compareAndSet(r, r - 1)) {
return r > 0L ? 0 : backpressure;
}
}
case ERROR:
cd.dispose();
s.onError(c.getFailure());
return ABORT;
case CLOSE:
cd.dispose();
s.onComplete();
return ABORT;
}
return 0L;
}
});
cd.add(() -> {
try {
c.close();
} catch (IOException ex) {
RxJavaPlugins.onError(ex);
}
});
} catch (Exception ex) {
s.onError(ex);
}
}
});
}
public interface PushStream<T> extends Closeable {
<R> PushStream<R> flatMap(Function<? super T, ? extends PushStream<? extends R>> mapper);
static <T> PushStream<T> createStream(PushEventSource<T> source) {
return new PushStreamImpl<>(source);
}
Promise<Void> forEach(Consumer<? super T> consumer);
}
public static class PushStreamImpl<T> implements PushStream<T> {
protected final PushEventSource<T> source;
static final long DEFAULT_BACKPRESSURE = 1;
static final Scheduler DEFAULT_SCHEDULER = Schedulers.io();
protected PushStreamImpl(PushEventSource<T> source) {
this.source = source;
}
@Override
public <R> PushStream<R> flatMap(Function<? super T, ? extends PushStream<? extends R>> mapper) {
Observable<R> result = to(source, DEFAULT_BACKPRESSURE)
.flatMap(v -> {
PushStream<? extends R> ps = mapper.apply(v);
if (ps instanceof PushStreamImpl) {
PushStreamImpl<? extends R> psi = (PushStreamImpl<? extends R>) ps;
return to(psi.source, DEFAULT_BACKPRESSURE).onBackpressureBuffer();
}
throw new IllegalStateException("Unsupported PushStream: " + ps.getClass());
});
return PushStream.createStream(from(result, DEFAULT_SCHEDULER));
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Doesn't make sense.");
}
@Override
public Promise<Void> forEach(Consumer<? super T> consumer) {
Deferred<Void> d = new Deferred<>();
try {
source.open(c -> {
switch (c.getType()) {
case DATA:
consumer.accept(c.getData());
break;
case ERROR:
d.fail(c.getFailure());
break;
case CLOSE:
d.resolve(null);
break;
}
return 0L;
});
} catch (Exception ex) {
d.fail(ex);
}
return d.getPromise();
}
}
public static void main(String[] args) throws Exception {
PushEventSource<Integer> range10 = from(Observable.range(1, 10), Schedulers.computation());
PushEventSource<Integer> range2 = from(Observable.range(1, 2), Schedulers.computation());
PushStream<Integer> ps = PushStream.createStream(range10)
.flatMap(v -> PushStream.createStream(range2));
ps.forEach(System.out::println).getValue();
Thread.sleep(1000);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment