Created
November 8, 2017 15:18
-
-
Save maxdemarzi/21dd90418d4f0883d9246f761e666556 to your computer and use it in GitHub Desktop.
Importer for Swapnil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.maxdemarzi.imports; | |
import com.maxdemarzi.results.StringResult; | |
import org.neo4j.kernel.internal.GraphDatabaseAPI; | |
import org.neo4j.logging.Log; | |
import org.neo4j.procedure.*; | |
import java.util.concurrent.TimeUnit; | |
import java.util.stream.Stream; | |
public class Imports { | |
@Context | |
public GraphDatabaseAPI db; | |
@Context | |
public Log log; | |
@Procedure(name = "com.maxdemarzi.import.swapnil", mode = Mode.WRITE) | |
@Description("CALL com.maxdemarzi.import.swapnil(file)") | |
public Stream<StringResult> importSwapnil(@Name("file") String file) throws InterruptedException { | |
long start = System.nanoTime(); | |
Thread t1 = new Thread(new ImportSwapnilRunnable(file, db, log)); | |
t1.start(); | |
t1.join(); | |
return Stream.of(new StringResult("Locations data in " + TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) + " seconds")); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.maxdemarzi.imports; | |
import com.github.benmanes.caffeine.cache.Caffeine; | |
import com.github.benmanes.caffeine.cache.LoadingCache; | |
import com.maxdemarzi.schema.Labels; | |
import com.maxdemarzi.schema.RelationshipTypes; | |
import org.apache.commons.csv.CSVFormat; | |
import org.apache.commons.csv.CSVRecord; | |
import org.neo4j.graphdb.*; | |
import org.neo4j.kernel.internal.GraphDatabaseAPI; | |
import org.neo4j.logging.Log; | |
import java.io.FileNotFoundException; | |
import java.io.FileReader; | |
import java.io.IOException; | |
import java.io.Reader; | |
import java.util.HashMap; | |
import java.util.Map; | |
public class ImportSwapnilRunnable implements Runnable { | |
private static final int TRANSACTION_LIMIT = 100; | |
private String file; | |
private static GraphDatabaseAPI db; | |
private static Log log; | |
public ImportSwapnilRunnable(String file, GraphDatabaseAPI db, Log log) { | |
this.file = file; | |
this.db = db; | |
this.log = log; | |
} | |
private static LoadingCache<String, Long> nodeIds = Caffeine.newBuilder() | |
.maximumSize(1_000_000) | |
.build(value -> getNodeId(Labels.User, "uid", value)); | |
private static Long getNodeId(Label label, String key, String value) { | |
Node node = db.findNode(label, key, value); | |
if (log.isDebugEnabled()) { | |
log.debug("Node does not exists: " + key + " `%s`", value); | |
} | |
return node.getId(); | |
} | |
@Override | |
public void run() { | |
// Step 1: Get the file. We are assuming this file is ordered by the starting user id | |
Reader in; | |
Iterable<CSVRecord> records = null; | |
try { | |
in = new FileReader("/" + file); | |
records = CSVFormat.EXCEL.withHeader().parse(in); | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
log.error("ImportSwapnilRunnable - File not found: " + file); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
log.error("ImportSwapnilRunnable - IO Exception: " + file); | |
} | |
// Step 2: Setup our temporary variables, that will change when the starting node id changes. | |
String lastUserId = ""; | |
Node currentNode = null; | |
// We are going to be storing the node id of the other node in the relationship as the key | |
Map<Long, Relationship> otherUserRelMap = new HashMap<>(); | |
// Step 3: Start a transaction manually, we are going to commit every TRANSACTION_LIMIT users (not lines) | |
Transaction tx = db.beginTx(); | |
try { | |
int count = 0; | |
assert records != null; | |
for (CSVRecord record : records) { | |
// Step 4: If we haven't seen this starting id, reset the temporary variables | |
if (!record.get("startUid").equals(lastUserId)) { | |
count++; | |
currentNode = db.getNodeById(nodeIds.get(record.get("startUid"))); | |
// Load the per node relationship cache | |
otherUserRelMap.clear(); | |
for (Relationship r : currentNode.getRelationships(Direction.OUTGOING, RelationshipTypes.FRIEND)) { | |
otherUserRelMap.put(r.getEndNodeId(), r); | |
} | |
} | |
Long otherUserId = nodeIds.get(record.get("endUid")); | |
if (otherUserRelMap.containsKey(otherUserId)) { | |
otherUserRelMap.get(otherUserId).setProperty("d_chat", record.get("chatVol")); | |
} else { | |
Node otherNode = db.getNodeById(otherUserId); | |
Relationship r = currentNode.createRelationshipTo(otherNode, RelationshipTypes.FRIEND); | |
r.setProperty("d_chat", record.get("chatVol")); | |
} | |
if (count % TRANSACTION_LIMIT == 0) { | |
tx.success(); | |
tx.close(); | |
tx = db.beginTx(); | |
} | |
} | |
tx.success(); | |
} finally { | |
tx.close(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<dependency> | |
<groupId>org.apache.commons</groupId> | |
<artifactId>commons-csv</artifactId> | |
<version>1.2</version> | |
</dependency> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment