Skip to content

Instantly share code, notes, and snippets.

@martin-cowie
Created May 17, 2016 09:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martin-cowie/3a70fed41e11be03e33b5f812447be57 to your computer and use it in GitHub Desktop.
Save martin-cowie/3a70fed41e11be03e33b5f812447be57 to your computer and use it in GitHub Desktop.
Building the Adapter
/**
* Build an Adapter.
* <P>
*
* @param session Configured Diffusion session.
* @param collection Source of documents for reflection.
* @param oplog Collection holding the MongoDB replication operation log.
* @param topicRoot Topic under which other topics are to be created.
* @return a ready-to-use Adapter
* @throws InterruptedException if the current thread was interrupted while waiting for update rights
* @throws TimeoutException if the adapter takes more than 10s to exclusive update rights on the topic tree
*/
public static Adapter build(Session session,
MongoCollection<Document> collection,
MongoCollection<Document> oplog,
String topicRoot) throws InterruptedException, TimeoutException {
final Exchanger<ValueUpdater<JSON>> exchanger = new Exchanger<>();
final TopicUpdateControl topicUpdateControl =
session.feature(TopicUpdateControl.class);
topicUpdateControl.registerUpdateSource(topicRoot,
new UpdateSource.Default() {
@Override
public void onActive(String topicPath, Updater updater) {
try {
exchanger.exchange(updater.valueUpdater(JSON.class));
}
catch (InterruptedException ex) {
throw new AssertionError(ex);
}
}
});
final ValueUpdater<JSON> valueUpdater =
exchanger.exchange(null, 10, TimeUnit.SECONDS);
final TopicControl topicControl = session.feature(TopicControl.class);
topicControl.removeTopicsWithSession(topicRoot,
new TopicTreeHandler.Default());
final MongoNamespace namespace =
new MongoNamespace(collection.getNamespace().getDatabaseName(),
"$cmd");
return new Adapter(topicControl,
valueUpdater,
collection,
oplog,
namespace.toString(),
topicRoot);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment