Skip to content

Instantly share code, notes, and snippets.

@mitchi
Created October 1, 2019 12:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mitchi/edd9637687cf47fac2616bb72932f8e7 to your computer and use it in GitHub Desktop.
Save mitchi/edd9637687cf47fac2616bb72932f8e7 to your computer and use it in GitHub Desktop.
Problem with RDD.cache()
/*
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
*/
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
{
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
output
}
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
}
}
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
Random.setSeed(System.nanoTime())
tiebreakers = Random.shuffle(tiebreakers)
tiebreakers
}
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph)
loop
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = graphRDD.map(n =>
{
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
}
}
}
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
}
else {
n._2.color += 1
}
n
//return the modified object
})
graphRDD.cache()
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
})
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
loop
}
//Return the colors of the graph
graphRDD.map(node => node._2.color).collect()
}
}
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
})
println
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
}
/*
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
*/
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
{
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
output
}
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
}
}
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
Random.setSeed(System.nanoTime())
tiebreakers = Random.shuffle(tiebreakers)
tiebreakers
}
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph).cache()
loop
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = graphRDD.map(n =>
{
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
}
}
}
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
}
else {
n._2.color += 1
}
n
//return the modified object
})
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
})
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
loop
}
//Return the colors of the graph
graphRDD.map(node => node._2.color).collect()
}
}
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
.setMaster("local[*]")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
})
println
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
}
/*
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
*/
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
{
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
output
}
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
}
}
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
Random.setSeed(System.nanoTime())
tiebreakers = Random.shuffle(tiebreakers)
tiebreakers
}
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph,1)
loop
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = graphRDD.map(n =>
{
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
}
}
}
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
}
else {
n._2.color += 1
}
n
//return the modified object
})
println("Check pour fin")
graphRDD.cache()
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
})
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
loop
}
//Return the colors of the graph
graphRDD.map(node => node._2.color).collect()
}
}
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
.setMaster("local[1]")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
})
println
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
}
/*
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
*/
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
{
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
output
}
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
}
}
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
Random.setSeed(System.nanoTime())
tiebreakers = Random.shuffle(tiebreakers)
tiebreakers
}
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph,1).cache()
loop
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = graphRDD.map(n =>
{
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
}
}
}
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
}
else {
n._2.color += 1
}
n
//return the modified object
})
println("Check pour fin")
//graphRDD.cache()
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
})
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
loop
}
//Return the colors of the graph
graphRDD.map(node => node._2.color).collect()
}
}
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
.setMaster("local[1]")
val sc = SparkContext.getOrCreate(conf)
sc.setLogLevel("ERROR")
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
})
println
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment