Skip to content

Instantly share code, notes, and snippets.

@angryTit
Created June 27, 2017 14:27
Show Gist options
  • Save angryTit/c9462c58a13619f700f18f1707b1ae53 to your computer and use it in GitHub Desktop.
Save angryTit/c9462c58a13619f700f18f1707b1ae53 to your computer and use it in GitHub Desktop.
package ru.angrytit;
import com.google.common.collect.Lists;
import com.opencsv.CSVReader;
import com.thinkaurelius.titan.core.TitanGraph;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import static java.util.Objects.nonNull;
import static java.util.stream.Collectors.toList;
import static ru.angrytit.Constants.CREATED_TIME;
import static ru.angrytit.Constants.USER_ID;
import static ru.angrytit.Constants.USER_LABEL;
/**
* @author Mikhail Tyamin <a href="mailto:mikhail.tiamine@gmail.com>mikhail.tiamine@gmail.com</a>
*/
public class RelationshipsMigrationService implements Migrate {
private final Logger log = LoggerFactory.getLogger(getClass());
private final int BATCH_SIZE = 50;
private final int PARTITION_SIZE = 8;
private final TitanGraph graph;
public RelationshipsMigrationService(TitanGraph graph) {
this.graph = graph;
}
@Override
public void startMigration(String filesPath, String relName) throws IOException {
List<String> files = allFiles(filesPath);
log.info("Files to migrate : {}", files.size());
int fileCounter = 1;
int howManyRels = 0;
for (String each : files) {
log.info("File number : {} at {}", fileCounter++, LocalDateTime.now());
howManyRels += migrateDataFromFile(each, relName);
}
log.info("Finish relationships migration with relType : {} and relNumber : {} at : {}",
relName, howManyRels, LocalDateTime.now());
}
private List<String> allFiles(String path) {
File folder = new File(path);
File[] listOfFiles = folder.listFiles();
return Arrays.asList(listOfFiles).stream()
.filter(each -> each.isFile())
.filter(each -> each.getName().endsWith("csv"))
.map(each -> each.getAbsolutePath())
.collect(toList());
}
private int migrateDataFromFile(String fileName, String relType) throws IOException {
CSVReader reader = new CSVReader(new FileReader(fileName));
List<String[]> lines = reader.readAll();
log.info("Finish reading file into memory at {}, start migration", LocalDateTime.now());
List<List<String[]>> partList = Lists.partition(lines, PARTITION_SIZE);
return partList.parallelStream().mapToInt(eachPart -> migrateEachList(eachPart, relType)).sum();
}
private int migrateEachList(List<String[]> lines, String relType) {
GraphTraversalSource traversalSource = graph.traversal();
int howManyRels = 0;
int counter = 0;
for (String[] nextLine : lines) {
String sourceId = nextLine[0];
String targetId = nextLine[1];
String createdTime = nextLine[2];
try {
Integer sourceUserId = Integer.valueOf(sourceId);
Integer targetUserId = Integer.valueOf(targetId);
Vertex source = traversalSource.V().has(USER_LABEL, USER_ID, sourceUserId).next();
if (nonNull(source)) {
Vertex target = traversalSource.V().has(USER_LABEL, USER_ID, targetUserId).next();
if (nonNull(target)) {
source.addEdge(relType, target, CREATED_TIME, createdTime);
howManyRels++;
counter++;
}
}
} catch (NumberFormatException e) {
log.error("Can not convert source or target id to integer, source : {}, target : {}", sourceId, targetId);
}
if (counter >= BATCH_SIZE) {
graph.tx().commit();
counter = 0;
}
}
graph.tx().commit();
return howManyRels;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment