Skip to content

Instantly share code, notes, and snippets.

@robbieknuth
Created April 7, 2017 18:44
Show Gist options
  • Save robbieknuth/67e46195b30c7f99b1d742e5af415887 to your computer and use it in GitHub Desktop.
Save robbieknuth/67e46195b30c7f99b1d742e5af415887 to your computer and use it in GitHub Desktop.
vertx hazelcast clustemanager asyncmap repr
import com.hazelcast.config.Config;
import com.hazelcast.config.NetworkConfig;
import com.hazelcast.core.*;
import io.vertx.core.*;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import java.util.concurrent.Executor;
import static io.vertx.core.Future.failedFuture;
import static io.vertx.core.Future.succeededFuture;
public class EntryPoint {
static Logger logger = LoggerFactory.getLogger(EntryPoint.class);
public static void main(String[] args) {
try {
Config config = new Config();
NetworkConfig networkConfig = config.getNetworkConfig();
networkConfig
.setPort(5701)
.setPortAutoIncrement(true)
.getInterfaces()
.setEnabled(true)
.addInterface("127.0.0.1");
networkConfig
.getJoin()
.getMulticastConfig()
.setEnabled(true)
.setLoopbackModeEnabled(true);
HazelcastInstance hci = Hazelcast.newHazelcastInstance(config);
HazelcastClusterManager hazelcastClusterManager = new HazelcastClusterManager(hci);
VertxOptions vertxOptions = new VertxOptions().setClusterManager(hazelcastClusterManager);
Vertx.clusteredVertx(vertxOptions, res -> {
if (!res.succeeded())
throw new RuntimeException("vertx failed to start");
res.result().deployVerticle(new Verticle(hci), deployHandler -> {
if (!deployHandler.succeeded())
throw new RuntimeException("verticle deployment failed.");
logger.info("Deployment succeeded");
});
});
} catch (Exception e) {
System.err.println(e);
}
}
static class Verticle extends AbstractVerticle {
HazelcastInstance hazelCastInstance;
public Verticle(HazelcastInstance hci) {
this.hazelCastInstance = hci;
}
@Override
public void start(Future<Void> startFuture) throws Exception {
Router router = Router.router(this.vertx);
router.post("/with").handler(routingContext -> this.middleWare(routingContext, true));
router.post("/without").handler(routingContext -> this.middleWare(routingContext, false));
router.post("/with").handler(this::actualRoute);
router.post("/without").handler(this::actualRoute);
this.vertx.createHttpServer()
.requestHandler(router::accept)
.listen(9000);
startFuture.complete(null);
}
void middleWare(RoutingContext routingContext, boolean useContext) {
this.<String, Boolean>getAsyncMap(this.hazelCastInstance, "mapName", useContext,
getMapResult -> {
if (getMapResult.succeeded()) {
AsyncMap<String, Boolean> asyncMap = getMapResult.result();
asyncMap.get("keyName", getValueResult -> {
if (getValueResult.succeeded()) {
Boolean value = getValueResult.result();
if (value == null) {
logger.info("cache miss");
asyncMap.put("keyName", true, putKeyResult -> {
if (putKeyResult.succeeded()) {
logger.info("Key placed.");
routingContext.put("value", "some cached value");
routingContext.next();
} else {
logger.error("Failed to put key.");
routingContext.fail(putKeyResult.cause());
}
});
} else {
logger.info("cache hit");
routingContext.put("value", "some cached value");
routingContext.next();
}
} else {
logger.error("getting value failed");
routingContext.fail(getValueResult.cause());
}
});
} else {
logger.error("getting async map failed");
routingContext.fail(getMapResult.cause());
}
});
}
void actualRoute(RoutingContext routingContext) {
routingContext.request()
.endHandler(v ->
{
String value = routingContext.get("value");
routingContext
.response()
.setStatusCode(200)
.setChunked(true)
.write("Some data: " + value)
.end();
})
.handler(b -> {});
}
public <K, V> void getAsyncMap(HazelcastInstance hci, String name, boolean useContext, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) {
if (useContext) {
vertx.executeBlocking(
fut -> fut.complete(new HazelcastInternalAsyncMap<>(vertx, hci.<K, V>getMap(name), useContext)),
resultHandler);
}
else {
resultHandler.handle(Future.succeededFuture(new HazelcastInternalAsyncMap<>(vertx, hci.getMap(name), useContext)));
}
}
}
static class HazelcastInternalAsyncMap<K, V> implements AsyncMap<K, V> {
Vertx vertx;
IMap<K, V> map;
boolean useContext;
public HazelcastInternalAsyncMap(Vertx vertx, IMap<K, V> map, boolean useContext) {
this.vertx = vertx;
this.map = map;
this.useContext = useContext;
}
public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) {
if (useContext) {
executeAsync((ICompletableFuture<V>)map.getAsync(convertParam(k)), asyncResultHandler);
}
else {
asyncResultHandler.handle(Future.succeededFuture(map.get(k)));
}
}
@Override
public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) {
K kk = convertParam(k);
V vv = convertParam(v);
if (useContext) {
executeAsyncVoid((ICompletableFuture<Void>)map.putAsync(kk, vv), completionHandler);
}
else {
map.putAsync(kk, vv);
completionHandler.handle(Future.succeededFuture(null));
}
}
@Override
public void put(K k, V v, long ttl, Handler<AsyncResult<Void>> completionHandler) { }
@Override
public void putIfAbsent(K k, V v, Handler<AsyncResult<V>> completionHandler) { }
@Override
public void putIfAbsent(K k, V v, long ttl, Handler<AsyncResult<V>> completionHandler) { }
@Override
public void remove(K k, Handler<AsyncResult<V>> asyncResultHandler) { }
@Override
public void removeIfPresent(K k, V v, Handler<AsyncResult<Boolean>> resultHandler) { }
@Override
public void replace(K k, V v, Handler<AsyncResult<V>> asyncResultHandler) { }
@Override
public void replaceIfPresent(K k, V oldValue, V newValue, Handler<AsyncResult<Boolean>> resultHandler) { }
@Override
public void clear(Handler<AsyncResult<Void>> resultHandler) { }
@Override
public void size(Handler<AsyncResult<Integer>> resultHandler) { }
private <T> void executeAsync(
ICompletableFuture<T> future,
Handler<AsyncResult<T>> resultHandler) {
future.andThen(
new HandlerCallBackAdapter(resultHandler),
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext())
);
}
private void executeAsyncVoid(
ICompletableFuture<Void> future,
Handler<AsyncResult<Void>> resultHandler) {
future.andThen(
new VoidHandlerCallBackAdapter(resultHandler),
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext())
);
}
}
static class VoidHandlerCallBackAdapter<T> extends HandlerCallBackAdapter<T> {
public VoidHandlerCallBackAdapter(Handler<AsyncResult<Void>> asyncResultHandler) {
super((Handler)asyncResultHandler);
}
@Override
public void onResponse(T v) {
setResult(Future.succeededFuture());
}
}
static class HandlerCallBackAdapter<V> implements ExecutionCallback<V> {
private final Handler<AsyncResult<V>> asyncResultHandler;
public HandlerCallBackAdapter(Handler<AsyncResult<V>> asyncResultHandler) {
this.asyncResultHandler = asyncResultHandler;
}
@Override
public void onResponse(V v) {
setResult(succeededFuture(convertReturn(v)));
}
@Override
public void onFailure(Throwable throwable) {
setResult(failedFuture(throwable));
}
protected void setResult(AsyncResult<V> object) {
asyncResultHandler.handle(object);
}
}
@SuppressWarnings("unchecked")
public static <T> T convertReturn(Object obj) {
return (T) obj;
}
@SuppressWarnings("unchecked")
public static <T> T convertParam(T obj) {
return obj;
}
final static class VertxExecutorAdapter implements Executor {
private static final String CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER = "io.vertx.spi.cluster.hazelcast.impl.VertxExecutorAdapter";
private final Context context;
private VertxExecutorAdapter(Context context) {
this.context = context;
}
@Override
public void execute(Runnable command) {
context.runOnContext(aVoid -> command.run());
}
public static VertxExecutorAdapter getOrCreate(Context context) {
// Context.contextData() is a ConcurrentHashMap so we are not doing any external synchronisation here
VertxExecutorAdapter vertxExecutorAdapter = context.get(CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER);
if (vertxExecutorAdapter != null) {
return vertxExecutorAdapter;
} else {
vertxExecutorAdapter = new VertxExecutorAdapter(context);
context.put(CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER, vertxExecutorAdapter);
return vertxExecutorAdapter;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment