Last active
June 13, 2016 01:33
-
-
Save anandnalya/36f6a47c50e515f1ee1a4f7d1b32b37f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
conf = pyspark.SparkConf() | |
conf.set("spark.sql.tungsten.enabled", "false") | |
sc = getOrCreateSparkContext(conf) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os | |
import graphframes | |
import pyspark | |
import pyspark.sql.functions as F | |
import pyspark.sql.types as T | |
import re | |
from pyspark.sql.window import Window | |
def graph_coalesce(g, numPartitions): | |
return graphframes.GraphFrame( | |
g.vertices.coalesce(numPartitions), | |
g.edges.coalesce(numPartitions) | |
) | |
def load_graphframe(sqlContext, dir_name, numPartitions=None): | |
fn_vertices = os.path.join(dir_name, 'vertices.parquet') | |
fn_edges = os.path.join(dir_name, 'edges.parquet') | |
vertices = sqlContext.read.parquet(fn_vertices) | |
edges = sqlContext.read.parquet(fn_edges) | |
ret = graphframes.GraphFrame(vertices, edges) | |
if numPartitions is not None: | |
ret = graph_coalesce(ret, numPartitions) | |
return ret | |
def getOrCreateSparkContext(conf=None): | |
# Based on | |
# <a href="https://href.li/?http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html" rel="nofollow noreferrer">http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html</a> | |
# pyspark version that we currently use (1.5) doesn't provide this method. | |
# Thus, implementing it here. | |
# Note: If we use `with pyspark.SparkContext._lock:`, as in the linked code, | |
# the program freezes infinitely. Right now, we don't create threads within the | |
# main script. Thus, this code seems to be pretty safe. In the future, we will | |
# have to deal with the locking issue | |
if pyspark.SparkContext._active_spark_context is None: | |
pyspark.SparkContext(conf=conf or pyspark.SparkConf()) | |
return pyspark.SparkContext._active_spark_context | |
sc = getOrCreateSparkContext() | |
sqlContext = pyspark.HiveContext(sc) | |
path_input_graph = '/user/anandnalya/network' | |
grph = load_graphframe(sqlContext, path_input_graph, 128) | |
vertices = grph.vertices.withColumn('cost', F.pow(F.col('pagerank'), -1.0)) | |
edges = grph.edges.withColumn('cost', F.pow(F.col('count'), -1.0)) | |
grph = graphframes.GraphFrame(vertices, edges) | |
path_search_query = '''(u0)-[likes_post00]->(p0); | |
(a0)-[is_author0]->(p0); | |
(u1)-[likes_post10]->(p0); | |
(u1)-[likes_post11]->(p1); | |
(a1)-[is_author1]->(p1) | |
''' | |
path_search_filter_statement = """u0.id IN ('1,2,3,...') AND | |
is_author0.edge_type = 'IS_AUTHOR' AND | |
is_author1.edge_type = 'IS_AUTHOR' AND | |
likes_post00.edge_type = 'LIKES_POST' AND | |
likes_post10.edge_type = 'LIKES_POST' AND | |
likes_post11.edge_type = 'LIKES_POST' AND | |
a0.node_type = 'USER' AND | |
a1.node_type = 'USER' AND | |
u0.node_type = 'USER' AND | |
u1.node_type = 'USER' AND | |
p0.node_type = 'POST' AND | |
p1.node_type = 'POST' AND | |
a0.id != u0.id AND | |
a0.id != u1.id AND | |
a1.id != u0.id AND | |
a1.id != u1.id AND | |
a0.id != a1.id AND | |
p0.id != p1.id AND | |
a0.id != 'USER__' AND | |
a1.id != 'USER__'""" | |
path_search = grph.find( | |
path_search_query | |
).filter( | |
path_search_filter_statement | |
) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.automattic.network | |
import org.apache.spark | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.apache.spark.sql._ | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.hive.HiveContext | |
import org.graphframes._ | |
object Main extends App { | |
def graph_coalesce(g: GraphFrame, numPartitions: Int) = GraphFrame(g.vertices.coalesce(numPartitions), g.edges.coalesce(numPartitions)) | |
def load_graphframe(sqlContext: SQLContext, dir_name: String, numPartitions: Option[Int]) = { | |
val fn_vertices = dir_name + "/vertices.parquet" | |
val fn_edges = dir_name + "/edges.parquet" | |
val vertices = sqlContext.read.parquet(fn_vertices) | |
val edges = sqlContext.read.parquet(fn_edges) | |
val ret = GraphFrame(vertices, edges) | |
if (numPartitions.isDefined) | |
graph_coalesce(ret, numPartitions.get) | |
else ret | |
} | |
// Setup Spark | |
val conf = new SparkConf | |
conf.setAppName("anand network test") | |
val sc = new SparkContext(conf) | |
val sqlContext = new HiveContext( sc ) | |
val path_input_graph = "/user/anandnalya/network" | |
var grph = load_graphframe(sqlContext, path_input_graph, Option(128)) | |
val vertices = grph.vertices.withColumn("cost", pow("pagerank", -1.0)) | |
val edges = grph.edges.withColumn("cost", pow("count", -1.0)) | |
grph = GraphFrame(vertices, edges) | |
val path_search_query = """(u0)-[likes_post00]->(p0); | |
(a0)-[is_author0]->(p0); | |
(u1)-[likes_post10]->(p0); | |
(u1)-[likes_post11]->(p1); | |
(a1)-[is_author1]->(p1) | |
""" | |
val path_search_filter_statement = """u0.id IN ('1,2,3,...') AND | |
is_author0.edge_type = 'IS_AUTHOR' AND | |
is_author1.edge_type = 'IS_AUTHOR' AND | |
likes_post00.edge_type = 'LIKES_POST' AND | |
likes_post10.edge_type = 'LIKES_POST' AND | |
likes_post11.edge_type = 'LIKES_POST' AND | |
a0.node_type = 'USER' AND | |
a1.node_type = 'USER' AND | |
u0.node_type = 'USER' AND | |
u1.node_type = 'USER' AND | |
p0.node_type = 'POST' AND | |
p1.node_type = 'POST' AND | |
a0.id != u0.id AND | |
a0.id != u1.id AND | |
a1.id != u0.id AND | |
a1.id != u1.id AND | |
a0.id != a1.id AND | |
p0.id != p1.id AND | |
a0.id != 'USER__' AND | |
a1.id != 'USER__'""" | |
val path_search = grph.find( | |
path_search_query | |
).filter( | |
path_search_filter_statement | |
) | |
def _path_cost_user(likes_post_00: Double, p0: Double, likes_post_10: Double, u1: Double, likes_post11: Double, p1: Double) = | |
likes_post_00 + p0 + likes_post_10 + u1 + likes_post11 + p1 | |
val path_cost_function = spark.sql.functions.udf(_path_cost_user(_:Double,_:Double,_:Double,_:Double,_:Double,_:Double)) | |
val rec_paths = path_search.select( | |
path_search("u0.id").alias("user_id"), | |
path_search("p1.id").alias("post_recommended"), | |
split(path_search("p1.id"), "[_-]")(1).alias("blog_recommended"), | |
path_search("a1.id").alias("author_recommended"), | |
path_cost_function( | |
path_search("likes_post00.cost"), | |
path_search("p0.cost"), | |
path_search("likes_post10.cost"), | |
path_search("u1.cost"), | |
path_search("likes_post11.cost"), | |
path_search("p1.cost") | |
).alias("path_cost") | |
).cache() | |
println(rec_paths.count()) | |
rec_paths.show() | |
rec_paths.write.parquet("/user/anandnalya/network_out") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Job aborted due to stage failure: Task 57 in stage 19.0 failed 4 times, most recent failure: Lost task 57.3 in stage 19.0 (TID 1545, node-A16-R14-33.hadoop.dfw.wordpress.com): java.io.IOException: Unable to acquire 16777216 bytes of memory | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138) | |
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) | |
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68) | |
at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) | |
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) | |
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) | |
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82) | |
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) | |
at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278) | |
at scala.collection.immutable.List.foreach(List.scala:318) | |
at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278) | |
at scala.collection.AbstractTraversable.collect(Traversable.scala:105) | |
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82) | |
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) | |
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) | |
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) | |
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249) | |
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699) | |
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _path_cost_user(likes_post_00_cost, p0_cost, likes_post_10_cost, u1_cost, likes_post11_cost, p1_cost): | |
ret = likes_post_00_cost + p0_cost + likes_post_10_cost + u1_cost + likes_post11_cost + p1_cost | |
return ret | |
path_cost_function = F.udf(_path_cost_user, T.FloatType()) | |
rec_paths = path_search.select( | |
F.col('u0.id').alias('user_id'), | |
F.col('p1.id').alias('post_recommended'), | |
F.split(F.col('p1.id'), '[_-]')[1].alias('blog_recommended'), | |
F.col('a1.id').alias('author_recommended'), | |
path_cost_function('likes_post00.cost', 'p0.cost', 'likes_post10.cost', 'u1.cost', 'likes_post11.cost','p1.cost').alias'path_cost') | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment