Skip to content

Instantly share code, notes, and snippets.

@doapp-jeremy
Created April 18, 2014 15:34
Show Gist options
  • Save doapp-jeremy/11050303 to your computer and use it in GitHub Desktop.
Save doapp-jeremy/11050303 to your computer and use it in GitHub Desktop.
Observable buffer losing items when onNext called in another thread
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import rx.subjects.PublishSubject;
/**
* BufferMissingTest.java
* mln-feed-processing-feed-loading
*
* Created by jeremy on Apr 18, 2014
* DoApp, Inc. owns and reserves all rights to the intellectual
* property and design of the following application.
*
* Copyright 2014 - All rights reserved. Created by DoApp, Inc.
*/
/**
* @author jeremy
*
*/
public class BufferMissingTest {
public static AtomicInteger createdCount = new AtomicInteger(0);
public static AtomicInteger scheduledCount = new AtomicInteger(0);
public static AtomicInteger scheduledAfterCount = new AtomicInteger(0);
public static AtomicInteger firstSubjectCount = new AtomicInteger(0);
public static AtomicInteger firstSubjectBufferCount = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(100);
PublishSubject<Integer> firstSubject = PublishSubject.create();
firstSubject.buffer(100, TimeUnit.MILLISECONDS, 10).subscribe(list -> {
firstSubjectBufferCount.addAndGet(list.size());
});
firstSubject.subscribe(i -> firstSubjectCount.incrementAndGet());
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> {
for (int i = 0; i < 8000; ++i) {
final int j = i;
createdCount.incrementAndGet();
scheduler.schedule(() -> {
scheduledCount.incrementAndGet();
firstSubject.onNext(j);
scheduledAfterCount.incrementAndGet();
}, 10, TimeUnit.MILLISECONDS);
}
try {
Thread.sleep(5000);
} catch (Exception e) {}
}, 0, 10, TimeUnit.SECONDS);
while (true) {
System.out.println("**************** " + System.currentTimeMillis() + " *******************");
System.out.println("Created: " + createdCount);
System.out.println("ScheduledCount: " + scheduledCount);
System.out.println("ScheduledAfterCount: " + scheduledAfterCount);
System.out.println("FirstSubject: " + firstSubjectCount);
System.out.println("FirstSubjectBuffer: " + firstSubjectBufferCount);
Thread.sleep(5000);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment