Skip to content

Instantly share code, notes, and snippets.

@Scottmitch
Last active April 4, 2020 01:49
Show Gist options
  • Save Scottmitch/79db65e9f70992616c7b4e5f991f35da to your computer and use it in GitHub Desktop.
Save Scottmitch/79db65e9f70992616c7b4e5f991f35da to your computer and use it in GitHub Desktop.
Attempt to provide an isolated reproducer for https://github.com/reactive-streams/reactive-streams-jvm/issues/477
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.reactivestreams.tck;
import org.jctools.queues.SpscLinkedQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static io.servicetalk.concurrent.reactivestreams.ReactiveStreamsAdapters.toReactiveStreamsPublisher;
@Test
public class AsyncPublisherTest extends PublisherVerification<Integer> {
private static ExecutorService exec;
private static io.servicetalk.concurrent.api.Executor stExec;
public AsyncPublisherTest() {
super(new TestEnvironment(200));
}
@BeforeClass
public static void before() {
exec = Executors.newCachedThreadPool();
stExec = io.servicetalk.concurrent.api.Executors.newCachedThreadExecutor();
}
@AfterClass
public static void after() throws ExecutionException, InterruptedException {
exec.shutdown();
stExec.closeAsync().toFuture().get();
}
@Override
public Publisher<Integer> createPublisher(long elements) {
return toReactiveStreamsPublisher(
io.servicetalk.concurrent.api.Publisher.from(items(elements))
// io.servicetalk.concurrent.api.Publisher.range(0, (int) elements)
.publishOnOverride(stExec));
// return new OffloadedSubscribePublisher<Integer>(exec, new RangePublisher(1, elements));
}
@Override
public Publisher<Integer> createFailedPublisher() {
// TODO Auto-generated method stub
return null;
}
private static Integer[] items(long count) {
int c = (int)count;
Integer[] result = new Integer[c];
for (int i = 0; i < c; i++) {
result[i] = i;
}
return result;
}
@Override
public long maxElementsFromPublisher() {
return 1024;
}
@Test(invocationCount = 1000)
public void loopTests() throws Throwable {
super.optional_spec309_requestNegativeNumberMaySignalIllegalArgumentExceptionWithSpecificMessage();
super.required_createPublisher1MustProduceAStreamOfExactly1Element();
super.required_spec101_subscriptionRequestMustResultInTheCorrectNumberOfProducedElements();
}
/**
* This isn't exactly what we do for our default offloading strategy, as we may hop threads between signals,
* but provides some motivation.
* @param <T> The type of onNext.
*/
public static final class OffloadedSubscribePublisher<T> implements Publisher<T> {
private static final Object ON_COMPLETE = new Object();
private static final Object NULL = new Object();
private static final Object CANCEL = new Object();
private final Executor executor;
private final Publisher<T> publisher;
public OffloadedSubscribePublisher(Executor executor, Publisher<T> publisher) {
this.executor = executor;
this.publisher = publisher;
}
@Override
public void subscribe(final Subscriber<? super T> s) {
if (s == null) { throw new NullPointerException(); }
final SpscLinkedQueue<Object> signals = new SpscLinkedQueue<Object>();
executor.execute(new Runnable() {
@Override
public void run() {
Object signal = null;
for (;;) {
try {
do {
signal = signals.poll();
} while (signal == null);
if (signal instanceof OnSubscribeWrapper) {
s.onSubscribe(((OnSubscribeWrapper) signal).subscription);
} else if (signal == ON_COMPLETE) {
s.onComplete();
break;
} else if (signal instanceof OnErrorWrapper) {
s.onError(((OnErrorWrapper) signal).throwable);
break;
} else if (signal == CANCEL) {
break;
} else {
@SuppressWarnings("unchecked")
T next = signal == NULL ? null : (T) signal;
s.onNext(next);
}
// } catch (InterruptedException e) {
// ignore for now
} catch (Throwable cause) {
if (signal == ON_COMPLETE || signal instanceof OnErrorWrapper) {
cause.printStackTrace(); // test code, log this error
} else {
s.onError(cause);
}
break;
}
}
}
});
publisher.subscribe(new Subscriber<T>() {
@Override
public void onSubscribe(final Subscription s) {
signals.offer(new OnSubscribeWrapper(new Subscription() {
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
try {
signals.offer(CANCEL);
} finally {
s.cancel();
}
}
}));
}
@Override
public void onNext(T t) {
signals.offer(t == null ? NULL : t);
}
@Override
public void onError(Throwable t) {
signals.offer(new OnErrorWrapper(t));
}
@Override
public void onComplete() {
signals.offer(ON_COMPLETE);
}
});
}
private static final class OnErrorWrapper {
final Throwable throwable;
private OnErrorWrapper(Throwable throwable) {
this.throwable = throwable;
}
}
private static final class OnSubscribeWrapper {
final Subscription subscription;
private OnSubscribeWrapper(Subscription subscription) {
this.subscription = subscription;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment