Skip to content

Instantly share code, notes, and snippets.

@maxdemarzi
Created November 8, 2017 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxdemarzi/21dd90418d4f0883d9246f761e666556 to your computer and use it in GitHub Desktop.
Save maxdemarzi/21dd90418d4f0883d9246f761e666556 to your computer and use it in GitHub Desktop.
Importer for Swapnil
package com.maxdemarzi.imports;
import com.maxdemarzi.results.StringResult;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import org.neo4j.procedure.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class Imports {
@Context
public GraphDatabaseAPI db;
@Context
public Log log;
@Procedure(name = "com.maxdemarzi.import.swapnil", mode = Mode.WRITE)
@Description("CALL com.maxdemarzi.import.swapnil(file)")
public Stream<StringResult> importSwapnil(@Name("file") String file) throws InterruptedException {
long start = System.nanoTime();
Thread t1 = new Thread(new ImportSwapnilRunnable(file, db, log));
t1.start();
t1.join();
return Stream.of(new StringResult("Locations data in " + TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) + " seconds"));
}
}
package com.maxdemarzi.imports;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.maxdemarzi.schema.Labels;
import com.maxdemarzi.schema.RelationshipTypes;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVRecord;
import org.neo4j.graphdb.*;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
public class ImportSwapnilRunnable implements Runnable {
private static final int TRANSACTION_LIMIT = 100;
private String file;
private static GraphDatabaseAPI db;
private static Log log;
public ImportSwapnilRunnable(String file, GraphDatabaseAPI db, Log log) {
this.file = file;
this.db = db;
this.log = log;
}
private static LoadingCache<String, Long> nodeIds = Caffeine.newBuilder()
.maximumSize(1_000_000)
.build(value -> getNodeId(Labels.User, "uid", value));
private static Long getNodeId(Label label, String key, String value) {
Node node = db.findNode(label, key, value);
if (log.isDebugEnabled()) {
log.debug("Node does not exists: " + key + " `%s`", value);
}
return node.getId();
}
@Override
public void run() {
// Step 1: Get the file. We are assuming this file is ordered by the starting user id
Reader in;
Iterable<CSVRecord> records = null;
try {
in = new FileReader("/" + file);
records = CSVFormat.EXCEL.withHeader().parse(in);
} catch (FileNotFoundException e) {
e.printStackTrace();
log.error("ImportSwapnilRunnable - File not found: " + file);
} catch (IOException e) {
e.printStackTrace();
log.error("ImportSwapnilRunnable - IO Exception: " + file);
}
// Step 2: Setup our temporary variables, that will change when the starting node id changes.
String lastUserId = "";
Node currentNode = null;
// We are going to be storing the node id of the other node in the relationship as the key
Map<Long, Relationship> otherUserRelMap = new HashMap<>();
// Step 3: Start a transaction manually, we are going to commit every TRANSACTION_LIMIT users (not lines)
Transaction tx = db.beginTx();
try {
int count = 0;
assert records != null;
for (CSVRecord record : records) {
// Step 4: If we haven't seen this starting id, reset the temporary variables
if (!record.get("startUid").equals(lastUserId)) {
count++;
currentNode = db.getNodeById(nodeIds.get(record.get("startUid")));
// Load the per node relationship cache
otherUserRelMap.clear();
for (Relationship r : currentNode.getRelationships(Direction.OUTGOING, RelationshipTypes.FRIEND)) {
otherUserRelMap.put(r.getEndNodeId(), r);
}
}
Long otherUserId = nodeIds.get(record.get("endUid"));
if (otherUserRelMap.containsKey(otherUserId)) {
otherUserRelMap.get(otherUserId).setProperty("d_chat", record.get("chatVol"));
} else {
Node otherNode = db.getNodeById(otherUserId);
Relationship r = currentNode.createRelationshipTo(otherNode, RelationshipTypes.FRIEND);
r.setProperty("d_chat", record.get("chatVol"));
}
if (count % TRANSACTION_LIMIT == 0) {
tx.success();
tx.close();
tx = db.beginTx();
}
}
tx.success();
} finally {
tx.close();
}
}
}
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.2</version>
</dependency>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment