Skip to content

Instantly share code, notes, and snippets.

@jazzido
Created May 20, 2015 02:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jazzido/9369d7134a8acb64be54 to your computer and use it in GitHub Desktop.
Save jazzido/9369d7134a8acb64be54 to your computer and use it in GitHub Desktop.
package org.neo4j.elasticsearch;
import io.searchbox.action.BulkableAction;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
import io.searchbox.client.JestResultHandler;
import io.searchbox.core.Bulk;
import io.searchbox.core.Delete;
import io.searchbox.core.Index;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.event.TransactionData;
import org.neo4j.graphdb.event.TransactionEventHandler;
import org.neo4j.kernel.impl.util.StringLogger;
import com.graphaware.tx.event.improved.api.Change;
import com.graphaware.tx.event.improved.api.ImprovedTransactionData;
import com.graphaware.tx.event.improved.api.LazyTransactionData;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author mh
* @since 25.04.15
*/
class ElasticSearchEventHandler implements TransactionEventHandler<Collection<BulkableAction>>, JestResultHandler<JestResult> {
private final JestClient client;
private final StringLogger logger;
private final GraphDatabaseService gds;
private final Map<Label, List<ElasticSearchIndexSpec>> indexSpecs;
private final Set<Label> indexLabels;
private boolean useAsyncJest = true;
private AtomicInteger cnt = new AtomicInteger();
public ElasticSearchEventHandler(JestClient client, Map<Label, List<ElasticSearchIndexSpec>> indexSpec, StringLogger logger, GraphDatabaseService gds) {
this.client = client;
this.indexSpecs = indexSpec;
this.indexLabels = indexSpec.keySet();
this.logger = logger;
this.gds = gds;
}
@Override
public Collection<BulkableAction> beforeCommit(TransactionData transactionData) throws Exception {
System.out.println("beforeCommit call count: " + cnt.incrementAndGet());
ImprovedTransactionData improvedTransactionData = new LazyTransactionData(transactionData);
Map<IndexId, BulkableAction> actions = new HashMap<>(1000);
for (Node node: improvedTransactionData.getAllCreatedNodes()) {
System.out.println(" created node");
if (hasLabel(node)) {
actions.putAll(indexRequests(node));
}
}
for (Node node: improvedTransactionData.getAllDeletedNodes()) {
System.out.println(" deleted node");
if (hasLabel(node)) {
actions.putAll(deleteRequests(node));
}
}
for (Change<Node> nodeChange: improvedTransactionData.getAllChangedNodes()) {
System.out.println(" node change");
Set<Label> s = improvedTransactionData.removedLabels(nodeChange.getCurrent());
if (!s.isEmpty()) {
for (Label removedLabel: s) {
System.out.println(" removed label: " + removedLabel.name());
actions.putAll(deleteRequests(nodeChange.getCurrent(), removedLabel));
}
}
else {
System.out.println(" indexing node change");
actions.putAll(indexRequests(nodeChange.getCurrent()));
}
}
System.out.println("---");
return actions.isEmpty() ? Collections.<BulkableAction>emptyList() : actions.values();
}
public void setUseAsyncJest(boolean useAsyncJest) {
this.useAsyncJest = useAsyncJest;
}
@Override
public void afterCommit(TransactionData transactionData, Collection<BulkableAction> actions) {
if (actions.isEmpty()) return;
try {
Bulk bulk = new Bulk.Builder()
.addAction(actions).build();
if (useAsyncJest) {
client.executeAsync(bulk, this);
}
else {
client.execute(bulk);
}
} catch (Exception e) {
logger.warn("Error updating ElasticSearch ", e);
}
}
private boolean hasLabel(Node node) {
for (Label l: node.getLabels()) {
if (indexLabels.contains(l)) return true;
}
return false;
}
private Map<IndexId, Index> indexRequests(Node node) {
HashMap<IndexId, Index> reqs = new HashMap<>();
for (Label l: node.getLabels()) {
if (!indexLabels.contains(l)) continue;
for (ElasticSearchIndexSpec spec: indexSpecs.get(l)) {
String id = id(node), indexName = spec.getIndexName();
reqs.put(new IndexId(indexName, id), new Index.Builder(nodeToJson(node, spec.getProperties()))
.type(l.name())
.index(indexName)
.id(id)
.build());
}
}
return reqs;
}
private Map<IndexId, Delete> deleteRequests(Node node, Label removedLabel) {
HashMap<IndexId, Delete> reqs = new HashMap<>();
if (!indexSpecs.containsKey(removedLabel)) {
return reqs;
}
for (ElasticSearchIndexSpec spec: indexSpecs.get(removedLabel)) {
String id = id(node), indexName = spec.getIndexName();
reqs.put(new IndexId(indexName, id),
new Delete.Builder(id).index(indexName).type(removedLabel.name()).build());
}
return reqs;
}
private Map<IndexId, Delete> deleteRequests(Node node) {
HashMap<IndexId, Delete> reqs = new HashMap<>();
for (Label l: node.getLabels()) {
if (!indexLabels.contains(l)) continue;
for (ElasticSearchIndexSpec spec: indexSpecs.get(l)) {
String id = id(node), indexName = spec.getIndexName();
reqs.put(new IndexId(indexName, id),
new Delete.Builder(id).index(indexName).type(l.name()).build());
}
}
return reqs;
}
private String id(Node node) {
return String.valueOf(node.getId());
}
private Map nodeToJson(Node node, Set<String> properties) {
Map<String,Object> json = new LinkedHashMap<>();
json.put("id", id(node));
json.put("labels", labels(node));
for (String prop : properties) {
if (node.hasProperty(prop)) {
Object value = node.getProperty(prop);
json.put(prop, value);
}
}
return json;
}
private String[] labels(Node node) {
List<String> result=new ArrayList<>();
for (Label label : node.getLabels()) {
result.add(label.name());
}
return result.toArray(new String[result.size()]);
}
@Override
public void afterRollback(TransactionData transactionData, Collection<BulkableAction> actions) {
}
@Override
public void completed(JestResult jestResult) {
if (jestResult.isSucceeded() && jestResult.getErrorMessage() == null) {
logger.debug("ElasticSearch Update Success");
} else {
logger.warn("ElasticSearch Update Failed: " + jestResult.getErrorMessage());
}
}
@Override
public void failed(Exception e) {
logger.warn("Problem Updating ElasticSearch ",e);
}
private class IndexId {
final String indexName, id;
public IndexId(String indexName, String id) {
this.indexName = indexName;
this.id = id;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getOuterType().hashCode();
result = prime * result + ((id == null) ? 0 : id.hashCode());
result = prime * result
+ ((indexName == null) ? 0 : indexName.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (!(obj instanceof IndexId))
return false;
IndexId other = (IndexId) obj;
if (!getOuterType().equals(other.getOuterType()))
return false;
if (id == null) {
if (other.id != null)
return false;
} else if (!id.equals(other.id))
return false;
if (indexName == null) {
if (other.indexName != null)
return false;
} else if (!indexName.equals(other.indexName))
return false;
return true;
}
private ElasticSearchEventHandler getOuterType() {
return ElasticSearchEventHandler.this;
}
@Override
public String toString() {
return "IndexId [indexName=" + indexName + ", id=" + id + "]";
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment