Skip to content

Instantly share code, notes, and snippets.

@hihellobolke
Last active April 12, 2019 07:08
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save hihellobolke/dd2dc0fcebba485975d1 to your computer and use it in GitHub Desktop.
Spark GraphX Pattern Matching
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.lib._
/*
* Alice, Dave and Bob are share traders,
* Each maintains atleast 1 trading account in their banks
* Trading accounts have shares, which are bought/sold in a transaction..
*
* E.g.
* (Alice) --[has_account]--> (HSBC) --[has_shares]--> (APPL) --[sell]--> (Transaction) --[buy]--> (APPL) <--[has_shares]-- (GS) <--[has_account]-- (Dave)
*
*
*/
val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
//Alice her bank is HSBC, she has APPL shares
(1000L, ("human", "Alice")),
(1001L, ("bank", "HSBC")),
(1002L, ("shares", "APPL")),
//Transaction vertices
(9001L, ("transaction", "t1")),
(9002L, ("transaction", "t2")),
//Dave and his Goldman Sachs & shares
(2000L, ("human", "Dave")),
(2001L, ("account", "GS")),
(2002L, ("shares", "MSFT")),
(2003L, ("shares", "APPL")),
(2004L, ("shares", "FB")),
//Transaction vertices
(9002L, ("transaction", "t2")),
(9003L, ("transaction", "t3")),
(9004L, ("transaction", "t4")),
//Bob's got two trading accounts,
// and APPL & CSCO shares
(3000L, ("human", "Bob")),
(3001L, ("account", "BOA")),
(3002L, ("account", "GS")),
(3003L, ("shares", "APPL")),
(3004L, ("shares", "APPL")),
(3005L, ("shares", "CSCO"))
))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
//Alice Account & shares in that
Edge(1000L, 1001L, "has_account"), Edge(1001L, 1002L, "has_shares"),
//Dave has many diff shares..
Edge(2000L, 2001L, "has_account"), Edge(2001L, 2002L, "has_shares"), Edge(2001L, 2003L, "has_shares"), Edge(2001L, 2004L, "has_shares"),
//Bob got shares in multiple acct
Edge(3000L, 3001L, "has_account"), Edge(3001L, 3003L, "has_shares"), Edge(3001L, 3004L, "has_shares"),
Edge(3000L, 3002L, "has_account"), Edge(3002L, 3005L, "has_shares"),
//Transactions......
//Alice sells APPL shares to Dave - transact t1
Edge(1002L, 9001L, "sell"), Edge(9001L, 2003L, "buy"),
//Alice sells APPL shares to Bob - transact t2
Edge(1002L, 9002L, "sell"), Edge(9002L, 3004L, "buy"),
//Bob sells APPL shres to Dave - t3
Edge(3003L, 9003L, "sell"), Edge(9003L, 2003L, "buy"),
//Dave sells APPL shares to Alice - t4
Edge(2003L, 9004L, "sell"), Edge(9004L, 1002L, "buy")))
val graph = Graph(users, relationships)
val cs = PatternMatching.run(graph, List(
EdgePattern("has_account", false),
EdgePattern("has_shares", false),
EdgePattern("sell", false))
).collect.toList
println("List cs:\n" + cs)
println("Count cs:\n" + cs.length)
//This match runs continously.... never returns
val cs = PatternMatching.run(graph, List(
EdgePattern("has_account", false),
EdgePattern("has_shares", false),
EdgePattern("sell", false),
EdgePattern("buy",false))
).collect.toList
println("List cs:\n" + cs)
println("Count cs:\n" + cs.length)
/* wish EdgePattern could take a func,
* once could match paths, then
* with conditions like where say transaction amount is > 1000$
* or src vertex attr satisifies some condition...
*/
@ksemer
Copy link

ksemer commented Sep 4, 2017

Where is PatternMatching.run() located?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment