Skip to content

Instantly share code, notes, and snippets.

@mimiyan
Created August 3, 2018 14:03
Show Gist options
  • Save mimiyan/031f3c25455792d92a01f83ac415a94f to your computer and use it in GitHub Desktop.
Save mimiyan/031f3c25455792d92a01f83ac415a94f to your computer and use it in GitHub Desktop.
This piece of test code will lead to Spark WARN "Managed memory leak detected"
package test
import com.holdenkarau.spark.testing.{ DataFrameSuiteBase }
import org.apache.spark.sql.{ DataFrame }
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{ array, col, collect_set, udf }
import org.graphframes.GraphFrame
import org.graphframes.lib.AggregateMessages
import org.scalatest.FunSuite
object TestMemoryTest {
val spark = SparkSession.builder.master("local").config("spark.sql.shuffle.partitions", 8).getOrCreate()
//val spark = SparkSession.builder.getOrCreate()
val sparkContext = spark.sparkContext
import spark.implicits._
// Create edges DataFrame with columns: tenant_id, from, to, and count.
def createEdgesDF(): DataFrame = {
val edges = Seq(
("u1", "u2"),
("u1", "u3"),
("u3", "u6"),
("u6", "u7"),
("u4", "u5"),
("u5", "u7"),
("u5", "u6"),
("u6", "u1")
)
edges.toDF("src", "dst")
}
def getGraphFrame(edgesDF: DataFrame): GraphFrame = {
val v = edgesDF.withColumnRenamed("src", "id").drop("dst")
.union(edgesDF.withColumnRenamed("dst", "id").drop("src"))
.distinct
.withColumn("egoSubgraph", array(col("id")))
GraphFrame(v, edgesDF)
}
def getEgo(): DataFrame = {
val edgesDF = createEdgesDF()
val G = getGraphFrame(edgesDF)
var g = G
val flattenUdf = udf((x: Seq[Seq[String]]) => x.flatten.distinct)
val msgToSrc1 = AggregateMessages.dst("egoSubgraph")
var agg: DataFrame = null
/**
* if there is empty partition in edges or vertices,
* the following code will lead to Spark warning info:
* org.apache.spark.executor.Executor....[Executor task launch worker for task 100] Managed memory leak detected; size = 262144 bytes, TID = 100
*/
for (i <- 0 to 1) {
agg = g.aggregateMessages
.sendToSrc(msgToSrc1)
.agg(flattenUdf(collect_set(AggregateMessages.msg)).as("egoSubgraph"))
g = GraphFrame(agg, g.edges)
}
g.vertices
}
}
class TestMemoryTest extends FunSuite with DataFrameSuiteBase {
test("test managed memory leakage") {
val v = TestMemoryTest.getEgo()
v.count
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment