Skip to content

Instantly share code, notes, and snippets.

@unixunion
Created March 4, 2016 08:56
Show Gist options
  • Save unixunion/c5095872ae6bb0e3a049 to your computer and use it in GitHub Desktop.
Save unixunion/c5095872ae6bb0e3a049 to your computer and use it in GitHub Desktop.
package com.deblox.clustering;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
/**
* Created by keghol on 18/11/14.
*
* awaits a certain number of "callbacks" and then responds to a message with results from each callee in a JsonArray
*
* instantiate this with a count, and the original message to reply to, each subsequent EventBus message
* who's response you are interested in should call this.resultUpdate()
*
* Once callback count == expectedResponseCount, the original message is sent the result set.
*
* TODO FIXME buffering the responses to save memory.
*
* e.g:
*/
public class ResponseConcentrator {
private Integer expectedResponseCount; // number of responses expected
private JsonArray resultArray = new JsonArray(); // array to store results in from callee's
private Message originalMessage; // the original message we will reply to with all the results
private Method method;
private HttpServerRequest originalRequest;
public enum Method {
MESSAGE, NONE, HTTP
}
public void setNoContext() {
this.method = Method.NONE;
}
public void setOriginalMessage(Message originalMessage) {
this.originalMessage = originalMessage;
this.method = Method.MESSAGE;
}
public void setOriginalHttpRequest(HttpServerRequest originalRequet) {
this.originalRequest = originalRequet;
this.method = Method.HTTP;
}
public void setExpectedResponseCount(Integer expectedResponseCount) {
this.expectedResponseCount = expectedResponseCount;
}
/**
* update the results array, and call back the handler with the array upon completion.
*
* @param r
* @param handler
*/
public void resultUpdate(JsonObject r, Handler<JsonArray> handler) {
resultArray.add(r);
if (resultArray.size() >= expectedResponseCount) {
switch (method) {
case HTTP:
originalRequest.response().end(new JsonObject().put("list", resultArray).toString());
handler.handle(resultArray);
break;
case MESSAGE:
originalMessage.reply(new JsonObject().put("list", resultArray));
handler.handle(resultArray);
break;
case NONE:
handler.handle(resultArray);
break;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment