Created
April 7, 2017 18:44
-
-
Save robbieknuth/67e46195b30c7f99b1d742e5af415887 to your computer and use it in GitHub Desktop.
vertx hazelcast clustemanager asyncmap repr
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.hazelcast.config.Config; | |
import com.hazelcast.config.NetworkConfig; | |
import com.hazelcast.core.*; | |
import io.vertx.core.*; | |
import io.vertx.core.logging.Logger; | |
import io.vertx.core.logging.LoggerFactory; | |
import io.vertx.core.shareddata.AsyncMap; | |
import io.vertx.ext.web.Router; | |
import io.vertx.ext.web.RoutingContext; | |
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; | |
import java.util.concurrent.Executor; | |
import static io.vertx.core.Future.failedFuture; | |
import static io.vertx.core.Future.succeededFuture; | |
public class EntryPoint { | |
static Logger logger = LoggerFactory.getLogger(EntryPoint.class); | |
public static void main(String[] args) { | |
try { | |
Config config = new Config(); | |
NetworkConfig networkConfig = config.getNetworkConfig(); | |
networkConfig | |
.setPort(5701) | |
.setPortAutoIncrement(true) | |
.getInterfaces() | |
.setEnabled(true) | |
.addInterface("127.0.0.1"); | |
networkConfig | |
.getJoin() | |
.getMulticastConfig() | |
.setEnabled(true) | |
.setLoopbackModeEnabled(true); | |
HazelcastInstance hci = Hazelcast.newHazelcastInstance(config); | |
HazelcastClusterManager hazelcastClusterManager = new HazelcastClusterManager(hci); | |
VertxOptions vertxOptions = new VertxOptions().setClusterManager(hazelcastClusterManager); | |
Vertx.clusteredVertx(vertxOptions, res -> { | |
if (!res.succeeded()) | |
throw new RuntimeException("vertx failed to start"); | |
res.result().deployVerticle(new Verticle(hci), deployHandler -> { | |
if (!deployHandler.succeeded()) | |
throw new RuntimeException("verticle deployment failed."); | |
logger.info("Deployment succeeded"); | |
}); | |
}); | |
} catch (Exception e) { | |
System.err.println(e); | |
} | |
} | |
static class Verticle extends AbstractVerticle { | |
HazelcastInstance hazelCastInstance; | |
public Verticle(HazelcastInstance hci) { | |
this.hazelCastInstance = hci; | |
} | |
@Override | |
public void start(Future<Void> startFuture) throws Exception { | |
Router router = Router.router(this.vertx); | |
router.post("/with").handler(routingContext -> this.middleWare(routingContext, true)); | |
router.post("/without").handler(routingContext -> this.middleWare(routingContext, false)); | |
router.post("/with").handler(this::actualRoute); | |
router.post("/without").handler(this::actualRoute); | |
this.vertx.createHttpServer() | |
.requestHandler(router::accept) | |
.listen(9000); | |
startFuture.complete(null); | |
} | |
void middleWare(RoutingContext routingContext, boolean useContext) { | |
this.<String, Boolean>getAsyncMap(this.hazelCastInstance, "mapName", useContext, | |
getMapResult -> { | |
if (getMapResult.succeeded()) { | |
AsyncMap<String, Boolean> asyncMap = getMapResult.result(); | |
asyncMap.get("keyName", getValueResult -> { | |
if (getValueResult.succeeded()) { | |
Boolean value = getValueResult.result(); | |
if (value == null) { | |
logger.info("cache miss"); | |
asyncMap.put("keyName", true, putKeyResult -> { | |
if (putKeyResult.succeeded()) { | |
logger.info("Key placed."); | |
routingContext.put("value", "some cached value"); | |
routingContext.next(); | |
} else { | |
logger.error("Failed to put key."); | |
routingContext.fail(putKeyResult.cause()); | |
} | |
}); | |
} else { | |
logger.info("cache hit"); | |
routingContext.put("value", "some cached value"); | |
routingContext.next(); | |
} | |
} else { | |
logger.error("getting value failed"); | |
routingContext.fail(getValueResult.cause()); | |
} | |
}); | |
} else { | |
logger.error("getting async map failed"); | |
routingContext.fail(getMapResult.cause()); | |
} | |
}); | |
} | |
void actualRoute(RoutingContext routingContext) { | |
routingContext.request() | |
.endHandler(v -> | |
{ | |
String value = routingContext.get("value"); | |
routingContext | |
.response() | |
.setStatusCode(200) | |
.setChunked(true) | |
.write("Some data: " + value) | |
.end(); | |
}) | |
.handler(b -> {}); | |
} | |
public <K, V> void getAsyncMap(HazelcastInstance hci, String name, boolean useContext, Handler<AsyncResult<AsyncMap<K, V>>> resultHandler) { | |
if (useContext) { | |
vertx.executeBlocking( | |
fut -> fut.complete(new HazelcastInternalAsyncMap<>(vertx, hci.<K, V>getMap(name), useContext)), | |
resultHandler); | |
} | |
else { | |
resultHandler.handle(Future.succeededFuture(new HazelcastInternalAsyncMap<>(vertx, hci.getMap(name), useContext))); | |
} | |
} | |
} | |
static class HazelcastInternalAsyncMap<K, V> implements AsyncMap<K, V> { | |
Vertx vertx; | |
IMap<K, V> map; | |
boolean useContext; | |
public HazelcastInternalAsyncMap(Vertx vertx, IMap<K, V> map, boolean useContext) { | |
this.vertx = vertx; | |
this.map = map; | |
this.useContext = useContext; | |
} | |
public void get(K k, Handler<AsyncResult<V>> asyncResultHandler) { | |
if (useContext) { | |
executeAsync((ICompletableFuture<V>)map.getAsync(convertParam(k)), asyncResultHandler); | |
} | |
else { | |
asyncResultHandler.handle(Future.succeededFuture(map.get(k))); | |
} | |
} | |
@Override | |
public void put(K k, V v, Handler<AsyncResult<Void>> completionHandler) { | |
K kk = convertParam(k); | |
V vv = convertParam(v); | |
if (useContext) { | |
executeAsyncVoid((ICompletableFuture<Void>)map.putAsync(kk, vv), completionHandler); | |
} | |
else { | |
map.putAsync(kk, vv); | |
completionHandler.handle(Future.succeededFuture(null)); | |
} | |
} | |
@Override | |
public void put(K k, V v, long ttl, Handler<AsyncResult<Void>> completionHandler) { } | |
@Override | |
public void putIfAbsent(K k, V v, Handler<AsyncResult<V>> completionHandler) { } | |
@Override | |
public void putIfAbsent(K k, V v, long ttl, Handler<AsyncResult<V>> completionHandler) { } | |
@Override | |
public void remove(K k, Handler<AsyncResult<V>> asyncResultHandler) { } | |
@Override | |
public void removeIfPresent(K k, V v, Handler<AsyncResult<Boolean>> resultHandler) { } | |
@Override | |
public void replace(K k, V v, Handler<AsyncResult<V>> asyncResultHandler) { } | |
@Override | |
public void replaceIfPresent(K k, V oldValue, V newValue, Handler<AsyncResult<Boolean>> resultHandler) { } | |
@Override | |
public void clear(Handler<AsyncResult<Void>> resultHandler) { } | |
@Override | |
public void size(Handler<AsyncResult<Integer>> resultHandler) { } | |
private <T> void executeAsync( | |
ICompletableFuture<T> future, | |
Handler<AsyncResult<T>> resultHandler) { | |
future.andThen( | |
new HandlerCallBackAdapter(resultHandler), | |
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext()) | |
); | |
} | |
private void executeAsyncVoid( | |
ICompletableFuture<Void> future, | |
Handler<AsyncResult<Void>> resultHandler) { | |
future.andThen( | |
new VoidHandlerCallBackAdapter(resultHandler), | |
VertxExecutorAdapter.getOrCreate(vertx.getOrCreateContext()) | |
); | |
} | |
} | |
static class VoidHandlerCallBackAdapter<T> extends HandlerCallBackAdapter<T> { | |
public VoidHandlerCallBackAdapter(Handler<AsyncResult<Void>> asyncResultHandler) { | |
super((Handler)asyncResultHandler); | |
} | |
@Override | |
public void onResponse(T v) { | |
setResult(Future.succeededFuture()); | |
} | |
} | |
static class HandlerCallBackAdapter<V> implements ExecutionCallback<V> { | |
private final Handler<AsyncResult<V>> asyncResultHandler; | |
public HandlerCallBackAdapter(Handler<AsyncResult<V>> asyncResultHandler) { | |
this.asyncResultHandler = asyncResultHandler; | |
} | |
@Override | |
public void onResponse(V v) { | |
setResult(succeededFuture(convertReturn(v))); | |
} | |
@Override | |
public void onFailure(Throwable throwable) { | |
setResult(failedFuture(throwable)); | |
} | |
protected void setResult(AsyncResult<V> object) { | |
asyncResultHandler.handle(object); | |
} | |
} | |
@SuppressWarnings("unchecked") | |
public static <T> T convertReturn(Object obj) { | |
return (T) obj; | |
} | |
@SuppressWarnings("unchecked") | |
public static <T> T convertParam(T obj) { | |
return obj; | |
} | |
final static class VertxExecutorAdapter implements Executor { | |
private static final String CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER = "io.vertx.spi.cluster.hazelcast.impl.VertxExecutorAdapter"; | |
private final Context context; | |
private VertxExecutorAdapter(Context context) { | |
this.context = context; | |
} | |
@Override | |
public void execute(Runnable command) { | |
context.runOnContext(aVoid -> command.run()); | |
} | |
public static VertxExecutorAdapter getOrCreate(Context context) { | |
// Context.contextData() is a ConcurrentHashMap so we are not doing any external synchronisation here | |
VertxExecutorAdapter vertxExecutorAdapter = context.get(CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER); | |
if (vertxExecutorAdapter != null) { | |
return vertxExecutorAdapter; | |
} else { | |
vertxExecutorAdapter = new VertxExecutorAdapter(context); | |
context.put(CONTEXT_KEY_HZ_VERTX_EXECUTOR_ADAPTER, vertxExecutorAdapter); | |
return vertxExecutorAdapter; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment