Skip to content

Instantly share code, notes, and snippets.

@sorsaffari
Created October 23, 2018 16:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sorsaffari/6e26c6c3ce5f12788e017f3cd67bf038 to your computer and use it in GitHub Desktop.
Save sorsaffari/6e26c6c3ce5f12788e017f3cd67bf038 to your computer and use it in GitHub Desktop.
Migrating JSON data into Grakn using the Java client
package ai.grakn.examples;
import ai.grakn.GraknTxType;
import ai.grakn.Keyspace;
import ai.grakn.client.Grakn;
import ai.grakn.util.SimpleURI;
/**
* reads a JSON encoded value as a stream of tokens,
* @see <a href="https://google.github.io/gson/apidocs/com/google/gson/stream/JsonReader.html">JsonReader</a>
*/
import com.google.gson.stream.JsonReader;
/**
* a lean JSON Library for Java,
* @see <a href="https://bolerio.github.io/mjson/">mjson</a>
*/
import mjson.Json;
import java.io.*;
import java.util.ArrayList;
import java.util.Collection;
public class JsonMigration {
/**
* representation of Input object that links an input file to its own templating function,
* which is used to map a Json object to Graql query string
*/
abstract static class Input {
String path;
public Input(String path) {
this.path = path;
}
String getDataPath(){ return path;}
abstract String template(Json data);
}
public static void main(String[] args) {
Collection<Input> inputs = initialiseInputs();
connectAndMigrate(inputs);
}
static Collection<Input> initialiseInputs() {
Collection<Input> inputs = new ArrayList<>();
// define template for constructing a company Graql insert query
inputs.add(new Input("data/companies") {
@Override
public String template(Json company) {
return "insert $company isa company has name " + company.at("name") + ";";
}
});
// define template for constructing a person Graql insert query
inputs.add(new Input("data/people") {
@Override
public String template(Json person) {
// insert person
String graqlInsertQuery = "insert $person isa person has phone-number " + person.at("phone_number");
if (! person.has("first_name")) {
// person is not a customer
graqlInsertQuery += " has is-customer false";
} else {
// person is a customer
graqlInsertQuery += " has is-customer true";
graqlInsertQuery += " has first-name " + person.at("first_name");
graqlInsertQuery += " has last-name " + person.at("last_name");
graqlInsertQuery += " has city " + person.at("city");
graqlInsertQuery += " has age " + person.at("age").asInteger();
}
graqlInsertQuery += ";";
return graqlInsertQuery;
}
});
// define template for constructing a contract Graql insert query
inputs.add(new Input("data/contracts") {
@Override
public String template(Json contract) {
// match company
String graqlInsertQuery = "match $company isa company has name " + contract.at("company_name") + ";";
// match person
graqlInsertQuery += " $customer isa person has phone-number " + contract.at("person_id") + ";";
// insert contract
graqlInsertQuery += " insert (provider: $company, customer: $customer) isa contract;";
return graqlInsertQuery;
}
});
// define template for constructing a call Graql insert query
inputs.add(new Input("data/calls") {
@Override
public String template(Json call) {
// match caller
String graqlInsertQuery = "match $caller isa person has phone-number " + call.at("caller_id") + ";";
// match callee
graqlInsertQuery += " $callee isa person has phone-number " + call.at("callee_id") + ";";
// insert call
graqlInsertQuery += " insert $call(caller: $caller, callee: $callee) isa call;" +
" $call has started-at " + call.at("started_at").asString() + ";" +
" $call has duration " + call.at("duration").asInteger() + ";";
return graqlInsertQuery;
}
});
return inputs;
}
/**
* 1. creates a Grakn instance
* 2. creates a session to the targeted keyspace
* 3. loads the csv data to Grakn for each file
* 4. closes the session
*/
static void connectAndMigrate(Collection<Input> inputs) {
SimpleURI localGrakn = new SimpleURI("localhost", 48555);
Grakn grakn = new Grakn(localGrakn); // 1
Keyspace keyspace = Keyspace.of("phone_calls");
Grakn.Session session = grakn.session(keyspace); // 2
inputs.forEach(input -> {
System.out.println("Loading from [" + input.getDataPath() + "] into Grakn ...");
try {
loadDataIntoGrakn(input, session); // 3
} catch (IOException e) {
e.printStackTrace();
}
});
session.close(); // 4
}
/**
* loads the json data into our Grakn phone_calls keyspace:
* 1. gets the data items as a list of json objects
* 2. for each json object
* a. creates a Grakn transaction
* b. constructs the corresponding Graql insert query
* c. runs the query
* d. commits the transaction
*
* @param input contains details required to parse the data
* @param session off of which a transaction will be created
* @throws UnsupportedEncodingException
*/
static void loadDataIntoGrakn(Input input, Grakn.Session session) throws IOException {
ArrayList<Json> items = parseDataToJson(input); // 1
items.forEach(item -> {
Grakn.Transaction transaction = session.transaction(GraknTxType.WRITE); // 2a
String graqlInsertQuery = input.template(item); // 2b
System.out.println("Executing Graql Query: " + graqlInsertQuery);
transaction.graql().parse(graqlInsertQuery).execute(); // 2c
transaction.commit(); // 2d
});
System.out.println("\nInserted " + items.size() + " items from [ " + input.getDataPath() + "] into Grakn.\n");
}
/**
* 1. reads a json file through a stream
* 2. parses each json object found in the file to a json object
* 3. adds the json object to the list of items
*
* @param input used to get the path to the data file, minus the format
* @return the list of json objects
* @throws IOException
*/
static ArrayList<Json> parseDataToJson(Input input) throws IOException {
ArrayList<Json> items = new ArrayList<>();
JsonReader jsonReader = new JsonReader(getReader(input.getDataPath() + ".json")); // 1
jsonReader.beginArray();
while (jsonReader.hasNext()) {
jsonReader.beginObject();
Json item = Json.object();
while (jsonReader.hasNext()) {
String key = jsonReader.nextName();
switch (jsonReader.peek()) {
case STRING:
item.set(key, jsonReader.nextString()); // 2
break;
case NUMBER:
item.set(key, jsonReader.nextInt()); // 2
break;
}
}
jsonReader.endObject();
items.add(item); // 3
}
jsonReader.endArray();
return items;
}
public static Reader getReader(String relativePath) throws UnsupportedEncodingException {
return new InputStreamReader(JsonMigration.class.getClassLoader().getResourceAsStream(relativePath), "UTF-8");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment