Created
October 23, 2018 16:37
-
-
Save sorsaffari/6e26c6c3ce5f12788e017f3cd67bf038 to your computer and use it in GitHub Desktop.
Migrating JSON data into Grakn using the Java client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ai.grakn.examples; | |
import ai.grakn.GraknTxType; | |
import ai.grakn.Keyspace; | |
import ai.grakn.client.Grakn; | |
import ai.grakn.util.SimpleURI; | |
/** | |
* reads a JSON encoded value as a stream of tokens, | |
* @see <a href="https://google.github.io/gson/apidocs/com/google/gson/stream/JsonReader.html">JsonReader</a> | |
*/ | |
import com.google.gson.stream.JsonReader; | |
/** | |
* a lean JSON Library for Java, | |
* @see <a href="https://bolerio.github.io/mjson/">mjson</a> | |
*/ | |
import mjson.Json; | |
import java.io.*; | |
import java.util.ArrayList; | |
import java.util.Collection; | |
public class JsonMigration { | |
/** | |
* representation of Input object that links an input file to its own templating function, | |
* which is used to map a Json object to Graql query string | |
*/ | |
abstract static class Input { | |
String path; | |
public Input(String path) { | |
this.path = path; | |
} | |
String getDataPath(){ return path;} | |
abstract String template(Json data); | |
} | |
public static void main(String[] args) { | |
Collection<Input> inputs = initialiseInputs(); | |
connectAndMigrate(inputs); | |
} | |
static Collection<Input> initialiseInputs() { | |
Collection<Input> inputs = new ArrayList<>(); | |
// define template for constructing a company Graql insert query | |
inputs.add(new Input("data/companies") { | |
@Override | |
public String template(Json company) { | |
return "insert $company isa company has name " + company.at("name") + ";"; | |
} | |
}); | |
// define template for constructing a person Graql insert query | |
inputs.add(new Input("data/people") { | |
@Override | |
public String template(Json person) { | |
// insert person | |
String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number"); | |
if (! person.has("first_name")) { | |
// person is not a customer | |
graqlInsertQuery += " has is-customer false"; | |
} else { | |
// person is a customer | |
graqlInsertQuery += " has is-customer true"; | |
graqlInsertQuery += " has first-name " + person.at("first_name"); | |
graqlInsertQuery += " has last-name " + person.at("last_name"); | |
graqlInsertQuery += " has city " + person.at("city"); | |
graqlInsertQuery += " has age " + person.at("age").asInteger(); | |
} | |
graqlInsertQuery += ";"; | |
return graqlInsertQuery; | |
} | |
}); | |
// define template for constructing a contract Graql insert query | |
inputs.add(new Input("data/contracts") { | |
@Override | |
public String template(Json contract) { | |
// match company | |
String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";"; | |
// match person | |
graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";"; | |
// insert contract | |
graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;"; | |
return graqlInsertQuery; | |
} | |
}); | |
// define template for constructing a call Graql insert query | |
inputs.add(new Input("data/calls") { | |
@Override | |
public String template(Json call) { | |
// match caller | |
String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";"; | |
// match callee | |
graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";"; | |
// insert call | |
graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" + | |
" $call has started-at " + call.at("started_at").asString() + ";" + | |
" $call has duration " + call.at("duration").asInteger() + ";"; | |
return graqlInsertQuery; | |
} | |
}); | |
return inputs; | |
} | |
/** | |
* 1. creates a Grakn instance | |
* 2. creates a session to the targeted keyspace | |
* 3. loads the csv data to Grakn for each file | |
* 4. closes the session | |
*/ | |
static void connectAndMigrate(Collection<Input> inputs) { | |
SimpleURI localGrakn = new SimpleURI("localhost", 48555); | |
Grakn grakn = new Grakn(localGrakn); // 1 | |
Keyspace keyspace = Keyspace.of("phone_calls"); | |
Grakn.Session session = grakn.session(keyspace); // 2 | |
inputs.forEach(input -> { | |
System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ..."); | |
try { | |
loadDataIntoGrakn(input, session); // 3 | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
}); | |
session.close(); // 4 | |
} | |
/** | |
* loads the json data into our Grakn phone_calls keyspace: | |
* 1. gets the data items as a list of json objects | |
* 2. for each json object | |
* a. creates a Grakn transaction | |
* b. constructs the corresponding Graql insert query | |
* c. runs the query | |
* d. commits the transaction | |
* | |
* @param input contains details required to parse the data | |
* @param session off of which a transaction will be created | |
* @throws UnsupportedEncodingException | |
*/ | |
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws IOException { | |
ArrayList<Json> items = parseDataToJson(input); // 1 | |
items.forEach(item -> { | |
Grakn.Transaction transaction = session.transaction(GraknTxType.WRITE); // 2a | |
String graqlInsertQuery = input.template(item); // 2b | |
System.out.println("Executing Graql Query: " + graqlInsertQuery); | |
transaction.graql().parse(graqlInsertQuery).execute(); // 2c | |
transaction.commit(); // 2d | |
}); | |
System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n"); | |
} | |
/** | |
* 1. reads a json file through a stream | |
* 2. parses each json object found in the file to a json object | |
* 3. adds the json object to the list of items | |
* | |
* @param input used to get the path to the data file, minus the format | |
* @return the list of json objects | |
* @throws IOException | |
*/ | |
static ArrayList<Json> parseDataToJson(Input input) throws IOException { | |
ArrayList<Json> items = new ArrayList<>(); | |
JsonReader jsonReader = new JsonReader(getReader(input.getDataPath() + ".json")); // 1 | |
jsonReader.beginArray(); | |
while (jsonReader.hasNext()) { | |
jsonReader.beginObject(); | |
Json item = Json.object(); | |
while (jsonReader.hasNext()) { | |
String key = jsonReader.nextName(); | |
switch (jsonReader.peek()) { | |
case STRING: | |
item.set(key, jsonReader.nextString()); // 2 | |
break; | |
case NUMBER: | |
item.set(key, jsonReader.nextInt()); // 2 | |
break; | |
} | |
} | |
jsonReader.endObject(); | |
items.add(item); // 3 | |
} | |
jsonReader.endArray(); | |
return items; | |
} | |
public static Reader getReader(String relativePath) throws UnsupportedEncodingException { | |
return new InputStreamReader(JsonMigration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment