Skip to content

Instantly share code, notes, and snippets.

@dwursteisen
Created April 2, 2015 12:56
Show Gist options
  • Save dwursteisen/0d80e3ce4d8c2071e0eb to your computer and use it in GitHub Desktop.
Save dwursteisen/0d80e3ce4d8c2071e0eb to your computer and use it in GitHub Desktop.
This code snippet show how MongoDB Async driver can be used with RxJava
import java.util.concurrent.CountDownLatch;
import com.mongodb.CursorType;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
import org.bson.Document;
import rx.Observable;
/**
* This code snippet show how MongoDB Async driver can be used
* with RxJava to consume a capped collection with Tailable cursor.
* <p/>
* Each time a document will be inserted into the collection,
* you'll be notified.
*/
public class MongoExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
MongoDatabase db = MongoClients.create().getDatabase("test");
MongoCollection<Document> collection = db.getCollection("helloworld");
Observable.create(subscriber ->
collection.find()
.cursorType(CursorType.TailableAwait)
.forEach(subscriber::onNext, (aVoid, throwable) -> {
if (throwable == null) {
subscriber.onCompleted();
} else {
subscriber.onError(throwable);
}
})
).subscribe(System.out::println, Throwable::printStackTrace, latch::countDown);
latch.await();
System.out.println("Completed");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment