Skip to content

Instantly share code, notes, and snippets.

@graben1437
Last active October 23, 2015 12:30
Show Gist options
  • Save graben1437/94c63caaeff97d8c368a to your computer and use it in GitHub Desktop.
Save graben1437/94c63caaeff97d8c368a to your computer and use it in GitHub Desktop.
Data Loader to load sample ratings movie data (movies, users, ratings) into a Titan Graph
# titan 1.0.0
storage.backend=hbase
storage.hostname=<zookeeper server ips here>
#IMPORTANT - must match zookeeper.znode.parent property
# of your hadoop cluster
storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure
cache.db-cache = true
cache.db-cache-clean-wait = 20
cache.db-cache-time = 180000
cache.db-cache-size = 0.5
import com.thinkaurelius.titan.core.schema.TitanManagement
import com.thinkaurelius.titan.core.schema.TitanGraphIndex
import com.thinkaurelius.titan.core.TitanGraph
// Supports Titan 1.0.0
// credit: original version was written for TP3 by Daniel Kuppitz
class MovieLensParser {
static Map occupations
static {
occupations = [0: "other", 1: "academic/educator", 2: "artist",
3: "clerical/admin", 4: "college/grad student", 5: "customer service",
6: "doctor/health care", 7: "executive/managerial", 8: "farmer",
9: "homemaker", 10: "K-12 student", 11: "lawyer", 12: "programmer",
13: "retired", 14: "sales/marketing", 15: "scientist", 16: "self-employed",
17: "technician/engineer", 18: "tradesman/craftsman", 19: "unemployed", 20: "writer"]
}
public static void parse(final TitanGraph graph, final String dataDirectory) {
def g = graph.traversal()
println 'Processing movies.dat...'
// MovieID::Title::Genres
new File(dataDirectory + '/movies.dat').eachLine { final String line ->
def components = line.split("::")
def movieId = components[0].toInteger()
def movieTitleYear = components[1] =~ /(.*\b)\s*\((\d+)\)/
if (!movieTitleYear.find()) return
def movieTitle = movieTitleYear.group(1)
def movieYear = movieTitleYear.group(2).toInteger()
def genres = components[2]
def movieVertex = graph.addVertex(label, 'movie', 'uid', 'm' + movieId, 'movieId', movieId, 'name', movieTitle, 'year', movieYear)
genres.split('\\|').each { def genre ->
def genreVertex = g.V().has('uid', 'g' + genre).tryNext().orElseGet {graph.addVertex(label, 'genre', 'uid', 'g' + genre, 'name', genre)}
movieVertex.addEdge('hasGenre', genreVertex)
}
}
graph.tx().commit()
println 'Processing users.dat...'
// UserID::Gender::Age::Occupation::Zip-code
new File(dataDirectory + '/users.dat').eachLine { final String line ->
def components = line.split("::")
def userId = components[0].toInteger()
def userGender = components[1]
def userAge = components[2].toInteger()
def occupationId = components[3].toInteger()
def userZipcode = components[4]
def userVertex = graph.addVertex(label, 'person', 'uid', 'u' + userId, 'userId', userId, 'gender', userGender, 'age', userAge, 'zipcode', userZipcode)
def occupationVertex = g.V().has('uid', 'o' + occupationId).tryNext().orElseGet {
graph.addVertex(label, 'occupation', 'uid', 'o' + occupationId, 'jobId', occupationId, 'name', occupations.get(occupationId))
}
userVertex.addEdge('hasOccupation', occupationVertex)
}
graph.tx().commit()
int cnt = 0;
int total = 0;
println 'Processing ratings.dat...'
// UserID::MovieID::Rating::Timestamp
new File(dataDirectory + '/ratings.dat').eachLine { final String line ->
def components = line.split("::")
def userId = components[0].toInteger()
def movieId = components[1].toInteger()
def stars = components[2].toInteger()
def time = components[3].toLong()
def userTraversal = g.V().has('uid', 'u' + userId)
def movieTraversal = g.V().has('uid', 'm' + movieId)
if (userTraversal.hasNext() && movieTraversal.hasNext()) {
userTraversal.next().addEdge('rated', movieTraversal.next(), 'stars', stars, 'time', time)
}
if (cnt > 1000) {
total = total + cnt
println ("added " + total + " ratings.")
cnt = 0;
graph.tx().commit()
}
cnt++
}
}
public static void load(final TitanGraph graph, final String dataDirectory) {
// graph.createIndex('uid', Vertex.class)
def mgmt = graph.openManagement();
def uidkey = mgmt.makePropertyKey("uid").dataType(String.class).make();
def uidkeyidx1 = mgmt.buildIndex("uidkeyidx1", Vertex.class).addKey(uidkey).unique().buildCompositeIndex();
mgmt.commit()
def start = System.currentTimeMillis()
parse(graph, dataDirectory)
println "Loading took (ms): " + (System.currentTimeMillis() - start)
}
}
gremlin> :load ./ml.groovy
==><classpath information cut out here>
==>true
==>true
==>true
==>true
gremlin> graph=TitanFactory.open('./hbase.properties')
==>standardtitangraph[hbase:[10.20.30.88, 10.20.30.93]]
gremlin> MovieLensParser.load(graph, '/home/graphie/data/ml-1m')
Processing movies.dat...
Processing users.dat...
Processing ratings.dat...
added 1001 ratings.
added 2002 ratings.
......
added 999999 ratings.
Loading took (ms): 332876
==>null
@dkuppitz
Copy link

Nice one, but there should be another graph.tx().commit() between line 89 and 90.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment