Skip to content

Instantly share code, notes, and snippets.

@anandnalya
Last active June 13, 2016 01:33
Show Gist options
  • Save anandnalya/36f6a47c50e515f1ee1a4f7d1b32b37f to your computer and use it in GitHub Desktop.
Save anandnalya/36f6a47c50e515f1ee1a4f7d1b32b37f to your computer and use it in GitHub Desktop.
conf = pyspark.SparkConf()
conf.set("spark.sql.tungsten.enabled", "false")
sc = getOrCreateSparkContext(conf)
from __future__ import print_function
import os
import graphframes
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
from pyspark.sql.window import Window
def graph_coalesce(g, numPartitions):
return graphframes.GraphFrame(
g.vertices.coalesce(numPartitions),
g.edges.coalesce(numPartitions)
)
def load_graphframe(sqlContext, dir_name, numPartitions=None):
fn_vertices = os.path.join(dir_name, 'vertices.parquet')
fn_edges = os.path.join(dir_name, 'edges.parquet')
vertices = sqlContext.read.parquet(fn_vertices)
edges = sqlContext.read.parquet(fn_edges)
ret = graphframes.GraphFrame(vertices, edges)
if numPartitions is not None:
ret = graph_coalesce(ret, numPartitions)
return ret
def getOrCreateSparkContext(conf=None):
# Based on
# <a href="https://href.li/?http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html" rel="nofollow noreferrer">http://www.eecs.berkeley.edu/~jegonzal/pyspark/_modules/pyspark/context.html</a>
# pyspark version that we currently use (1.5) doesn't provide this method.
# Thus, implementing it here.
# Note: If we use `with pyspark.SparkContext._lock:`, as in the linked code,
# the program freezes infinitely. Right now, we don't create threads within the
# main script. Thus, this code seems to be pretty safe. In the future, we will
# have to deal with the locking issue
if pyspark.SparkContext._active_spark_context is None:
pyspark.SparkContext(conf=conf or pyspark.SparkConf())
return pyspark.SparkContext._active_spark_context
sc = getOrCreateSparkContext()
sqlContext = pyspark.HiveContext(sc)
path_input_graph = '/user/anandnalya/network'
grph = load_graphframe(sqlContext, path_input_graph, 128)
vertices = grph.vertices.withColumn('cost', F.pow(F.col('pagerank'), -1.0))
edges = grph.edges.withColumn('cost', F.pow(F.col('count'), -1.0))
grph = graphframes.GraphFrame(vertices, edges)
path_search_query = '''(u0)-[likes_post00]->(p0);
(a0)-[is_author0]->(p0);
(u1)-[likes_post10]->(p0);
(u1)-[likes_post11]->(p1);
(a1)-[is_author1]->(p1)
'''
path_search_filter_statement = """u0.id IN ('1,2,3,...') AND
is_author0.edge_type = 'IS_AUTHOR' AND
is_author1.edge_type = 'IS_AUTHOR' AND
likes_post00.edge_type = 'LIKES_POST' AND
likes_post10.edge_type = 'LIKES_POST' AND
likes_post11.edge_type = 'LIKES_POST' AND
a0.node_type = 'USER' AND
a1.node_type = 'USER' AND
u0.node_type = 'USER' AND
u1.node_type = 'USER' AND
p0.node_type = 'POST' AND
p1.node_type = 'POST' AND
a0.id != u0.id AND
a0.id != u1.id AND
a1.id != u0.id AND
a1.id != u1.id AND
a0.id != a1.id AND
p0.id != p1.id AND
a0.id != 'USER__' AND
a1.id != 'USER__'"""
path_search = grph.find(
path_search_query
).filter(
path_search_filter_statement
)
package com.automattic.network
import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveContext
import org.graphframes._
object Main extends App {
def graph_coalesce(g: GraphFrame, numPartitions: Int) = GraphFrame(g.vertices.coalesce(numPartitions), g.edges.coalesce(numPartitions))
def load_graphframe(sqlContext: SQLContext, dir_name: String, numPartitions: Option[Int]) = {
val fn_vertices = dir_name + "/vertices.parquet"
val fn_edges = dir_name + "/edges.parquet"
val vertices = sqlContext.read.parquet(fn_vertices)
val edges = sqlContext.read.parquet(fn_edges)
val ret = GraphFrame(vertices, edges)
if (numPartitions.isDefined)
graph_coalesce(ret, numPartitions.get)
else ret
}
// Setup Spark
val conf = new SparkConf
conf.setAppName("anand network test")
val sc = new SparkContext(conf)
val sqlContext = new HiveContext( sc )
val path_input_graph = "/user/anandnalya/network"
var grph = load_graphframe(sqlContext, path_input_graph, Option(128))
val vertices = grph.vertices.withColumn("cost", pow("pagerank", -1.0))
val edges = grph.edges.withColumn("cost", pow("count", -1.0))
grph = GraphFrame(vertices, edges)
val path_search_query = """(u0)-[likes_post00]->(p0);
(a0)-[is_author0]->(p0);
(u1)-[likes_post10]->(p0);
(u1)-[likes_post11]->(p1);
(a1)-[is_author1]->(p1)
"""
val path_search_filter_statement = """u0.id IN ('1,2,3,...') AND
is_author0.edge_type = 'IS_AUTHOR' AND
is_author1.edge_type = 'IS_AUTHOR' AND
likes_post00.edge_type = 'LIKES_POST' AND
likes_post10.edge_type = 'LIKES_POST' AND
likes_post11.edge_type = 'LIKES_POST' AND
a0.node_type = 'USER' AND
a1.node_type = 'USER' AND
u0.node_type = 'USER' AND
u1.node_type = 'USER' AND
p0.node_type = 'POST' AND
p1.node_type = 'POST' AND
a0.id != u0.id AND
a0.id != u1.id AND
a1.id != u0.id AND
a1.id != u1.id AND
a0.id != a1.id AND
p0.id != p1.id AND
a0.id != 'USER__' AND
a1.id != 'USER__'"""
val path_search = grph.find(
path_search_query
).filter(
path_search_filter_statement
)
def _path_cost_user(likes_post_00: Double, p0: Double, likes_post_10: Double, u1: Double, likes_post11: Double, p1: Double) =
likes_post_00 + p0 + likes_post_10 + u1 + likes_post11 + p1
val path_cost_function = spark.sql.functions.udf(_path_cost_user(_:Double,_:Double,_:Double,_:Double,_:Double,_:Double))
val rec_paths = path_search.select(
path_search("u0.id").alias("user_id"),
path_search("p1.id").alias("post_recommended"),
split(path_search("p1.id"), "[_-]")(1).alias("blog_recommended"),
path_search("a1.id").alias("author_recommended"),
path_cost_function(
path_search("likes_post00.cost"),
path_search("p0.cost"),
path_search("likes_post10.cost"),
path_search("u1.cost"),
path_search("likes_post11.cost"),
path_search("p1.cost")
).alias("path_cost")
).cache()
println(rec_paths.count())
rec_paths.show()
rec_paths.write.parquet("/user/anandnalya/network_out")
}
Job aborted due to stage failure: Task 57 in stage 19.0 failed 4 times, most recent failure: Lost task 57.3 in stage 19.0 (TID 1545, node-A16-R14-33.hadoop.dfw.wordpress.com): java.io.IOException: Unable to acquire 16777216 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.<init>(UnsafeExternalSorter.java:138)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.<init>(UnsafeExternalRowSorter.java:68)
at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146)
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169)
at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.collect(Traversable.scala:105)
at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:249)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:208)
def _path_cost_user(likes_post_00_cost, p0_cost, likes_post_10_cost, u1_cost, likes_post11_cost, p1_cost):
ret = likes_post_00_cost + p0_cost + likes_post_10_cost + u1_cost + likes_post11_cost + p1_cost
return ret
path_cost_function = F.udf(_path_cost_user, T.FloatType())
rec_paths = path_search.select(
F.col('u0.id').alias('user_id'),
F.col('p1.id').alias('post_recommended'),
F.split(F.col('p1.id'), '[_-]')[1].alias('blog_recommended'),
F.col('a1.id').alias('author_recommended'),
path_cost_function('likes_post00.cost', 'p0.cost', 'likes_post10.cost', 'u1.cost', 'likes_post11.cost','p1.cost').alias'path_cost')
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment