Skip to content

Instantly share code, notes, and snippets.

@xenji
Created February 29, 2016 19:46
Show Gist options
  • Save xenji/e469abde2e0b80aadc4e to your computer and use it in GitHub Desktop.
Save xenji/e469abde2e0b80aadc4e to your computer and use it in GitHub Desktop.
package com.trivago.ckg.config.titan;
import com.thinkaurelius.titan.core.TitanGraph;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV1d0;
import org.apache.tinkerpop.gremlin.server.GraphManager;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.util.ThreadFactoryUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.HashMap;
import java.util.concurrent.ThreadFactory;
/**
* @author mmueller
* @since 2016-02-29 13:11.
*/
@Configuration
public class GremlinServerConfigurator
{
@Autowired
private TitanGraph graph;
private GremlinServer server;
/**
* The exception should be properly handled in here.
* It's bad practice to throw exceptions in @PostConstruct
* annotated methods. Do not copy this.
* @throws Exception
*/
@PostConstruct
@SuppressWarnings("unchecked")
public void init() throws Exception
{
// Setup settings, add titan plugin
Settings settings = new Settings();
settings.plugins.add("aurelius.titan");
// The gremlin shell sends weird offset errors and null-pointers, when we not add the config below
Settings.ScriptEngineSettings scriptEngineSettings = new Settings.ScriptEngineSettings();
settings.scriptEngines.put("gremlin-groovy", scriptEngineSettings);
final Settings.SerializerSettings gryoSerializerSettings = new Settings.SerializerSettings();
gryoSerializerSettings.className = GryoMessageSerializerV1d0.class.getName();
gryoSerializerSettings.config = new HashMap<>();
// This config ...
gryoSerializerSettings.config.put("serializeResultToString", true);
settings.serializers.add(gryoSerializerSettings);
// Create the graph manager and add the pre-configured titan graph
GraphManager gm = new GraphManager(settings);
gm.getGraphs().put("graph", graph);
// When we pass the ServerGremlinExecutor, we need to create this ThreadFactory on our own
// This is stolen from the convenience constructor of the GremlinServer
final ThreadFactory threadFactoryWorker = ThreadFactoryUtil.create("worker-%d");
NioEventLoopGroup workerGroup = new NioEventLoopGroup(settings.threadPoolWorker, threadFactoryWorker);
// All default, except the last argument, which is the GraphManager
ServerGremlinExecutorCustom<EventLoopGroup> serverGremlinExecutor = new ServerGremlinExecutorCustom<>(
settings,
null,
workerGroup,
EventLoopGroup.class,
gm
);
// The unchecked assignment is supressed
server = new GremlinServer(serverGremlinExecutor);
server.start();
}
@PreDestroy
public void destruct()
{
if (server != null)
{
server.stop();
}
}
}
package com.trivago.ckg.config.titan;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.server.GraphManager;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.util.LifeCycleHook;
import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor;
import org.apache.tinkerpop.gremlin.server.util.ThreadFactoryUtil;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* This is an extension to the original Executor that overrides _everything_ due to the
* constructor work in the parent class.
*
* @author mmueller
* @since 2016-02-29 17:12.
*/
public class ServerGremlinExecutorCustom<T extends ScheduledExecutorService> extends ServerGremlinExecutor
{
private static final Logger logger = LoggerFactory.getLogger(ServerGremlinExecutor.class);
private final GraphManager graphManager;
private final Settings settings;
private final List<LifeCycleHook> hooks;
private final T scheduledExecutorService;
private final ExecutorService gremlinExecutorService;
private final GremlinExecutor gremlinExecutor;
/**
* Create a new object from {@link Settings} where thread pools are internally created. Note that the
* {@code scheduleExecutorServiceClass} will be created via
* {@link Executors#newScheduledThreadPool(int, ThreadFactory)}.
*/
public ServerGremlinExecutorCustom(
final Settings settings,
final Class<T> scheduleExecutorServiceClass,
GraphManager graphManager
)
{
this(settings, null, null, scheduleExecutorServiceClass, graphManager);
}
/**
* Create a new object from {@link Settings} where thread pools are internally created. Note that if the
* {@code scheduleExecutorServiceClass} is set to {@code null} it will be created via
* {@link Executors#newScheduledThreadPool(int, ThreadFactory)}. If either of the {@link ExecutorService}
* instances are supplied, the {@link Settings#gremlinPool} value will be ignored for the pool size.
*/
public ServerGremlinExecutorCustom(
final Settings settings,
final ExecutorService gremlinExecutorService,
final T scheduledExecutorService,
final Class<T> scheduleExecutorServiceClass,
GraphManager gm
)
{
// We pass the ScheduledThreadPoolExecutor.class manually instead of the
// parameter. This prevents the class-cast exception in the super's constructor.
super(settings, null, null, ScheduledThreadPoolExecutor.class);
try
{
// Close down the pools from the parent context. They won't be used anyway
super.getGremlinExecutor().close();
super.getGremlinExecutorService().shutdown();
super.getScheduledExecutorService().shutdown();
}
catch (Exception e)
{
logger.warn("{}", e.getMessage(), e);
}
this.settings = settings;
if (null == gremlinExecutorService)
{
final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("exec-%d");
this.gremlinExecutorService = Executors.newFixedThreadPool(settings.gremlinPool, threadFactoryGremlin);
}
else
{
this.gremlinExecutorService = gremlinExecutorService;
}
if (null == scheduledExecutorService)
{
final ThreadFactory threadFactoryGremlin = ThreadFactoryUtil.create("worker-%d");
this.scheduledExecutorService = scheduleExecutorServiceClass.cast(
Executors.newScheduledThreadPool(settings.threadPoolWorker, threadFactoryGremlin));
}
else
{
this.scheduledExecutorService = scheduledExecutorService;
}
// initialize graphs from paramter
this.graphManager = gm;
logger.info("Initialized Gremlin thread pool. Threads in pool named with pattern gremlin-*");
final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
.scriptEvaluationTimeout(settings.scriptEvaluationTimeout)
.afterFailure((b, e) -> graphManager
.rollbackAll())
.beforeEval(b -> graphManager
.rollbackAll())
.afterTimeout(b -> graphManager
.rollbackAll())
.enabledPlugins(new HashSet<>(settings.plugins))
.globalBindings(graphManager
.getAsBindings())
.executorService(this.gremlinExecutorService)
.scheduledExecutorService(this.scheduledExecutorService);
settings.scriptEngines.forEach((k, v) -> {
// make sure that server related classes are available at init
v.imports.add(LifeCycleHook.class.getCanonicalName());
v.imports.add(LifeCycleHook.Context.class.getCanonicalName());
gremlinExecutorBuilder.addEngineSettings(k, v.imports, v.staticImports, v.scripts, v.config);
});
gremlinExecutor = gremlinExecutorBuilder.create();
// @FIXME: is this really needed? does the global bindings stuff the rest?
gremlinExecutor.getGlobalBindings().put("g", gm.getGraphs().get("graph").traversal());
logger.info("Initialized GremlinExecutor and configured ScriptEngines.");
// script engine init may have altered the graph bindings or maybe even created new ones - need to
// re-apply those references back
gremlinExecutor.getGlobalBindings().entrySet().stream()
.filter(kv -> kv.getValue() instanceof Graph)
.forEach(kv -> graphManager.getGraphs().put(kv.getKey(), (Graph) kv.getValue()));
// script engine init may have constructed the TraversalSource bindings - store them in Graphs object
gremlinExecutor.getGlobalBindings().entrySet().stream()
.filter(kv -> kv.getValue() instanceof TraversalSource)
.forEach(kv -> {
logger.info(
"A {} is now bound to [{}] with {}",
kv.getValue().getClass().getSimpleName(),
kv.getKey(),
kv.getValue()
);
graphManager.getTraversalSources().put(kv.getKey(), (TraversalSource) kv.getValue());
});
// determine if the initialization scripts introduced LifeCycleHook objects - if so we need to gather them
// up for execution
hooks = gremlinExecutor.getGlobalBindings().entrySet().stream()
.filter(kv -> kv.getValue() instanceof LifeCycleHook)
.map(kv -> (LifeCycleHook) kv.getValue())
.collect(Collectors.toList());
}
public T getScheduledExecutorService()
{
return scheduledExecutorService;
}
public GremlinExecutor getGremlinExecutor()
{
return gremlinExecutor;
}
public ExecutorService getGremlinExecutorService()
{
return gremlinExecutorService;
}
public GraphManager getGraphManager()
{
return graphManager;
}
public Settings getSettings()
{
return settings;
}
public List<LifeCycleHook> getHooks()
{
return hooks;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment