Last active
November 21, 2015 10:35
-
-
Save akarnokd/3427a078255cca898c84 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package osgi; | |
import java.io.*; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.*; | |
import org.osgi.util.promise.*; | |
import org.reactivestreams.*; | |
import io.reactivex.*; | |
import io.reactivex.disposables.CompositeDisposable; | |
import io.reactivex.internal.subscriptions.EmptySubscription; | |
import io.reactivex.internal.util.BackpressureHelper; | |
import io.reactivex.plugins.RxJavaPlugins; | |
import io.reactivex.schedulers.Schedulers; | |
public enum AsyncEventStream { | |
; | |
public interface PushEvent<T> { | |
public enum EventType { | |
CLOSE, | |
DATA, | |
ERROR | |
} | |
EventType getType(); | |
T getData(); | |
Throwable getFailure(); | |
boolean isTerminal(); | |
static <T> PushEvent<T> data(T value) { | |
return new PushEvent<T>() { | |
@Override | |
public EventType getType() { | |
return EventType.DATA; | |
} | |
@Override | |
public T getData() { | |
return value; | |
} | |
@Override | |
public Throwable getFailure() { | |
return null; | |
} | |
@Override | |
public boolean isTerminal() { | |
return false; | |
} | |
}; | |
} | |
static <T> PushEvent<T> error(Throwable e) { | |
return new PushEvent<T>() { | |
@Override | |
public EventType getType() { | |
return EventType.ERROR; | |
} | |
@Override | |
public T getData() { | |
return null; | |
} | |
@Override | |
public Throwable getFailure() { | |
return e; | |
} | |
@Override | |
public boolean isTerminal() { | |
return false; | |
} | |
}; | |
} | |
enum PushEventClose implements PushEvent<Object> { | |
INSTANCE; | |
@Override | |
public EventType getType() { | |
return EventType.CLOSE; | |
} | |
@Override | |
public Object getData() { | |
return null; | |
} | |
@Override | |
public Throwable getFailure() { | |
return null; | |
} | |
@Override | |
public boolean isTerminal() { | |
return true; | |
} | |
} | |
@SuppressWarnings("unchecked") | |
static <T> PushEvent<T> close() { | |
return (PushEvent<T>)PushEventClose.INSTANCE; | |
} | |
} | |
@FunctionalInterface | |
public interface PushEventConsumer<T> { | |
long ABORT = -1L; | |
long CONTINUE = 0L; | |
long accept(PushEvent<T> e) throws Exception; | |
} | |
public interface PushEventSource<T> { | |
Closeable open(PushEventConsumer<? super T> consumer) throws Exception; | |
} | |
public static <T> PushEventSource<T> from( | |
Publisher<? extends T> publisher, Scheduler scheduler) { | |
return c -> { | |
CompositeDisposable cd = new CompositeDisposable(); | |
Scheduler.Worker w = scheduler.createWorker(); | |
cd.add(w); | |
publisher.subscribe(new Subscriber<T>() { | |
Subscription s; | |
@Override | |
public void onSubscribe(Subscription s) { | |
this.s = s; | |
cd.add(s::cancel); | |
if (!cd.isDisposed()) { | |
s.request(1); | |
} | |
} | |
@Override | |
public void onNext(T t) { | |
long backpressure; | |
try { | |
backpressure = c.accept(PushEvent.data(t)); | |
} catch (Exception e) { | |
onError(e); | |
return; | |
} | |
if (backpressure <= PushEventConsumer.ABORT) { | |
cd.dispose(); | |
} else | |
if (backpressure == PushEventConsumer.CONTINUE) { | |
s.request(1); | |
} else { | |
w.schedule(() -> s.request(1), backpressure, TimeUnit.MILLISECONDS); | |
} | |
} | |
@Override | |
public void onError(Throwable t) { | |
cd.dispose(); | |
try { | |
c.accept(PushEvent.error(t)); | |
} catch (Exception ex) { | |
RxJavaPlugins.onError(ex); | |
} | |
} | |
@Override | |
public void onComplete() { | |
cd.dispose(); | |
try { | |
c.accept(PushEvent.close()); | |
} catch (Exception ex) { | |
RxJavaPlugins.onError(ex); | |
} | |
} | |
}); | |
return cd::dispose; | |
}; | |
} | |
public static <T> Observable<T> to(PushEventSource<? extends T> source, long backpressure) { | |
return Observable.create(new Publisher<T>() { | |
@Override | |
public void subscribe(Subscriber<? super T> s) { | |
CompositeDisposable cd = new CompositeDisposable(); | |
AtomicLong requested = new AtomicLong(); | |
s.onSubscribe(new Subscription() { | |
@Override | |
public void request(long n) { | |
BackpressureHelper.add(requested, n); | |
} | |
@Override | |
public void cancel() { | |
cd.dispose(); | |
} | |
}); | |
try { | |
Closeable c = source.open(new PushEventConsumer<T>() { | |
@Override | |
public long accept(PushEvent<T> c) throws Exception { | |
if (cd.isDisposed()) { | |
return ABORT; | |
} | |
switch (c.getType()) { | |
case DATA: | |
s.onNext(c.getData()); | |
for (;;) { | |
long r = requested.get(); | |
if (r == 0) { | |
return backpressure; | |
} | |
if (requested.compareAndSet(r, r - 1)) { | |
return r > 0L ? 0 : backpressure; | |
} | |
} | |
case ERROR: | |
cd.dispose(); | |
s.onError(c.getFailure()); | |
return ABORT; | |
case CLOSE: | |
cd.dispose(); | |
s.onComplete(); | |
return ABORT; | |
} | |
return 0L; | |
} | |
}); | |
cd.add(() -> { | |
try { | |
c.close(); | |
} catch (IOException ex) { | |
RxJavaPlugins.onError(ex); | |
} | |
}); | |
} catch (Exception ex) { | |
s.onError(ex); | |
} | |
} | |
}); | |
} | |
public interface PushStream<T> extends Closeable { | |
<R> PushStream<R> flatMap(Function<? super T, ? extends PushStream<? extends R>> mapper); | |
static <T> PushStream<T> createStream(PushEventSource<T> source) { | |
return new PushStreamImpl<>(source); | |
} | |
Promise<Void> forEach(Consumer<? super T> consumer); | |
} | |
public static class PushStreamImpl<T> implements PushStream<T> { | |
protected final PushEventSource<T> source; | |
static final long DEFAULT_BACKPRESSURE = 1; | |
static final Scheduler DEFAULT_SCHEDULER = Schedulers.io(); | |
protected PushStreamImpl(PushEventSource<T> source) { | |
this.source = source; | |
} | |
@Override | |
public <R> PushStream<R> flatMap(Function<? super T, ? extends PushStream<? extends R>> mapper) { | |
Observable<R> result = to(source, DEFAULT_BACKPRESSURE) | |
.flatMap(v -> { | |
PushStream<? extends R> ps = mapper.apply(v); | |
if (ps instanceof PushStreamImpl) { | |
PushStreamImpl<? extends R> psi = (PushStreamImpl<? extends R>) ps; | |
return to(psi.source, DEFAULT_BACKPRESSURE).onBackpressureBuffer(); | |
} | |
throw new IllegalStateException("Unsupported PushStream: " + ps.getClass()); | |
}); | |
return PushStream.createStream(from(result, DEFAULT_SCHEDULER)); | |
} | |
@Override | |
public void close() throws IOException { | |
throw new UnsupportedOperationException("Doesn't make sense."); | |
} | |
@Override | |
public Promise<Void> forEach(Consumer<? super T> consumer) { | |
Deferred<Void> d = new Deferred<>(); | |
try { | |
source.open(c -> { | |
switch (c.getType()) { | |
case DATA: | |
consumer.accept(c.getData()); | |
break; | |
case ERROR: | |
d.fail(c.getFailure()); | |
break; | |
case CLOSE: | |
d.resolve(null); | |
break; | |
} | |
return 0L; | |
}); | |
} catch (Exception ex) { | |
d.fail(ex); | |
} | |
return d.getPromise(); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
PushEventSource<Integer> range10 = from(Observable.range(1, 10), Schedulers.computation()); | |
PushEventSource<Integer> range2 = from(Observable.range(1, 2), Schedulers.computation()); | |
PushStream<Integer> ps = PushStream.createStream(range10) | |
.flatMap(v -> PushStream.createStream(range2)); | |
ps.forEach(System.out::println).getValue(); | |
Thread.sleep(1000); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment