Skip to content

Instantly share code, notes, and snippets.

@hihellobolke
Last active August 29, 2015 14:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hihellobolke/c8e6c97cefed714258ad to your computer and use it in GitHub Desktop.
Save hihellobolke/c8e6c97cefed714258ad to your computer and use it in GitHub Desktop.
// Create Vertices
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
//Alice her bank is HSBC, she has APPL shares
(1000L, ("human", "Alice")),
(1001L, ("bank", "HSBC")),
(1002L, ("shares", "APPL")),
//Transaction vertices
(9001L, ("transaction", "t1")),
(9002L, ("transaction", "t2")),
//Dave and his Goldman Sachs & shares
(2000L, ("human", "Dave")),
(2001L, ("bank", "GS")),
(2002L, ("shares", "MSFT")),
(2003L, ("shares", "APPL")),
(2004L, ("shares", "FB")),
//Transaction vertices
(9002L, ("transaction", "t2")),
(9003L, ("transaction", "t3")),
(9004L, ("transaction", "t4")),
//Bob's got two trading accounts,
// and APPL & CSCO shares
(3000L, ("human", "Bob")),
(3001L, ("bank", "BOA")),
(3002L, ("bank", "GS")),
(3003L, ("shares", "APPL")),
(3004L, ("shares", "APPL")),
(3005L, ("shares", "CSCO"))))
// Create Edges
val edges: RDD[Edge[String]] = sc.parallelize(Array(
//Alice has bank account & shares in that
Edge(1000L, 1001L, "has_account"), Edge(1001L, 1002L, "has_shares"),
//Dave has many diff shares..
Edge(2000L, 2001L, "has_account"), Edge(2001L, 2002L, "has_shares"), Edge(2001L, 2003L, "has_shares"), Edge(2001L, 2004L, "has_shares"),
//Bob got shares in multiple acct
Edge(3000L, 3001L, "has_account"), Edge(3001L, 3003L, "has_shares"), Edge(3001L, 3004L, "has_shares"),
Edge(3000L, 3002L, "has_account"), Edge(3002L, 3005L, "has_shares"),
//Transactions......
//Alice sells APPL shares to Dave - transact t1
Edge(1002L, 9001L, "sell"), Edge(2003L, 9001L, "buy"),
//Alice sells APPL shares to Bob - transact t2
Edge(1002L, 9002L, "sell"), Edge(3004L, 9002L, "buy"),
//Bob sells APPL shres to Dave - t3
Edge(3003L, 9003L, "sell"), Edge(2003L, 9003L, "buy"),
//Dave sells APPL shares to Alice - t4
Edge(2003L, 9004L, "sell"), Edge(1002L, 9004L, "buy")))
//Generate the graph
val graph = Graph(users, edges)
//Just a function generator,
// Generates a function which is used for matching
// et.srcAttr, et.dstAttr, et.attr
// The et is edge triplet
def filterFuncGenerator(
srcString: String = "*",
dstString: String = "*",
edgeString: String = "*") = {
(e: EdgeTriplet[
((String, String),
Set[PartialPathMatch[(String, String), (String)
]]),
(String)
]) => {
val edgeMatched = e.attr match {
case _ if (`edgeString` == "*") => true;
case `edgeString` => true;
case _ => false;
}
val vertexSrcMatched = e.srcAttr._1._1 match {
case _ if (`srcString` == "*") => true;
case `srcString` => true;
case _ => false;
}
val vertexDstMatched = e.dstAttr._1._1 match {
case _ if (`dstString` == "*") => true;
case `dstString` => true;
case _ => false;
}
edgeMatched && vertexSrcMatched && vertexDstMatched
}
}
// The new Graph generated...
val newGraph = MergePatternPath.run(graph, List(
TripletPattern(filterFuncGenerator(edgeString = "has_account")),
TripletPattern(filterFuncGenerator(edgeString = "has_shares")),
TripletPattern(filterFuncGenerator(edgeString = "sell", srcString = "APPL")),
TripletPattern(filterFuncGenerator(edgeString = "buy"), true),
TripletPattern(filterFuncGenerator(edgeString = "has_shares", dstString = "APPL"), matchDstFirst = true),
TripletPattern(filterFuncGenerator(edgeString = "has_account"), matchDstFirst = true)
),
"tradePartner")
newGraph.triplets.collect.foreach(et => {println("(" + et.srcAttr._2 + ") --[" + et.attr + "]--> (" + et.dstAttr._2 + ")")} )
/*
(Alice) --[tradePartner]--> (Dave)
(Alice) --[tradePartner]--> (Bob)
(Dave) --[tradePartner]--> (Alice)
(Bob) --[tradePartner]--> (Dave)
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment