Skip to content

Instantly share code, notes, and snippets.

@andrask
Last active August 29, 2015 14:01
Show Gist options
  • Save andrask/fc06abfd70daa6f91edb to your computer and use it in GitHub Desktop.
Save andrask/fc06abfd70daa6f91edb to your computer and use it in GitHub Desktop.
package rx.behaviorsubject;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.subjects.BehaviorSubject;
public class BehaviorSubjectSubscribeAndSendConcurrently {
ExecutorService EXECUTOR = Executors.newCachedThreadPool();
@Test
public void t() throws InterruptedException, ExecutionException {
final BehaviorSubject<String> subj = BehaviorSubject.create((String) null);
final Observable<String> filtered = subj.filter(new Func1<String, Boolean>() {
@Override
public Boolean call(String t1) {
return t1 != null;
}
});
final CountDownLatch enterLatch = new CountDownLatch(2);
final CountDownLatch completedLatch = new CountDownLatch(1);
EXECUTOR.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (enterLatch) {
enterLatch.countDown();
enterLatch.wait();
}
// Thread.sleep(1);
filtered.subscribe(new Action1<String>() {
@Override
public void call(String t1) {
completedLatch.countDown();
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
EXECUTOR.submit(new Runnable() {
@Override
public void run() {
try {
synchronized (enterLatch) {
enterLatch.countDown();
enterLatch.wait();
}
subj.onNext("first");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
if (!enterLatch.await(1, TimeUnit.SECONDS))
throw new IllegalStateException("enter failed");
synchronized (enterLatch) {
enterLatch.notifyAll();
}
if (!completedLatch.await(1, TimeUnit.SECONDS))
throw new IllegalStateException("count failed");
}
@Test
public void m() throws InterruptedException, ExecutionException {
for (int i = 0; i < 1000000; i++) {
t();
if (i % 100 == 0)
System.out.print(".");
if (i % 10000 == 0)
System.out.println();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment