Skip to content

Instantly share code, notes, and snippets.

@JaiHirsch
Created February 26, 2016 17:44
Show Gist options
  • Save JaiHirsch/6dca4fbd07f9a024c03a to your computer and use it in GitHub Desktop.
Save JaiHirsch/6dca4fbd07f9a024c03a to your computer and use it in GitHub Desktop.
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.bson.Document;
import rx.RxReactiveStreams;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import com.cfx.tail.ConnectionController;
import com.mongodb.CursorType;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
public class ReactiveOplogTailRunner {
public static void main(String[] args) throws InterruptedException {
List<MongoCollection<Document>> connection = ConnectionController
.getOplogConnection(null, null);
ExecutorService executor = Executors.newFixedThreadPool(connection
.size());
for (MongoCollection<Document> col : connection) {
FindPublisher<Document> publisher = col.find().filter(getFilterw())
.filter(getElectionFilter())
.sort(new Document("$natural", 1))
.cursorType(CursorType.TailableAwait);
RxReactiveStreams.toObservable(publisher)
.subscribeOn(Schedulers.from(executor))
.subscribe(new Action1<Document>() {
@Override
public void call(Document t) {
System.out.println(t);
}
});
}
executor.awaitTermination(365, TimeUnit.DAYS);
ConnectionController.close();
}
private static Document getElectionFilter() {
return new Document("op", new Document("$ne", "n"));
}
private static Document getFilterw() {
return new Document("fromMigrate", new Document("$exists", false));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment