Skip to content

Instantly share code, notes, and snippets.

Created October 1, 2019 12:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mitchi/edd9637687cf47fac2616bb72932f8e7 to your computer and use it in GitHub Desktop.
Save mitchi/edd9637687cf47fac2616bb72932f8e7 to your computer and use it in GitHub Desktop.
Problem with RDD.cache()
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
tiebreakers = Random.shuffle(tiebreakers)
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph)
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = =>
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
else {
n._2.color += 1
//return the modified object
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
//Return the colors of the graph => node._2.color).collect()
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
val sc = SparkContext.getOrCreate(conf)
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
tiebreakers = Random.shuffle(tiebreakers)
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph).cache()
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = =>
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
else {
n._2.color += 1
//return the modified object
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
//Return the colors of the graph => node._2.color).collect()
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
val sc = SparkContext.getOrCreate(conf)
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
tiebreakers = Random.shuffle(tiebreakers)
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph,1)
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = =>
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
else {
n._2.color += 1
//return the modified object
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
//Return the colors of the graph => node._2.color).collect()
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
val sc = SparkContext.getOrCreate(conf)
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
Edmond La Chance UQAC 2019
Exemple algorithme Fast Coloring avec table, et broadcast variables
package coloring
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
case class node(var color: Int = 1, var tiebreakvalue: Int = 0, len: Int = 0)
var adjlist = new Array[Int](len)
def printadjlist: String = {
var output = "adjlist : "
adjlist.foreach(output += _)
override def toString: String = {
s"color : $color, tiebreakvalue : $tiebreakvalue"
case class edge_data(src: Long, dst: Long)
object coloring extends Serializable {
//Return infered
def randomTieBreakers(graph: Array[(Int,node)]) = {
var tiebreakers = ArrayBuffer[Int]()
var count = graph.size
//Generate normal IDs
for (i <- 0 until count) tiebreakers += i
//Generate tiebreakers
tiebreakers = Random.shuffle(tiebreakers)
//Color using the FC2 algorithm
def fc2(graph: Array[(Int,node)], sc: SparkContext) = {
//Step 1 : We generate tiebreakers
var tiebreakers = randomTieBreakers(graph)
//Apply tiebreakers
for (i <- 0 until tiebreakers.size)
graph(i)._2.tiebreakvalue = tiebreakers(i)
println("Printing after tiebreakers")
graph foreach println
//Execute the algorithm
var graphRDD = sc.makeRDD(graph,1).cache()
def loop(): Unit = {
//On broadcast les couleurs, tiebreakers à tout le monde
val essentiels = graphRDD.flatMap(node => {
Some((node._1, node._2.tiebreakvalue))
}).collect.sortBy(_._1).map(elem => elem._2)
val bcast = sc.broadcast(essentiels)
graphRDD = =>
var toBeKnight = true
for (i <- 0 until n._2.adjlist.length) {
var adjacent = n._2.adjlist(i)
if (adjacent == 1) {
val tieBreakerVoisin = bcast.value(i)
if (tieBreakerVoisin != -1) {
if (tieBreakerVoisin < n._2.tiebreakvalue) //-1 = knight
toBeKnight = false
if (toBeKnight == true) {
n._2.tiebreakvalue = -1
else {
n._2.color += 1
//return the modified object
println("Check pour fin")
//DEBUG on check les couleurs
// graphRDD.collect.foreach( println(_))
//On check pour la fin. Fin = tout le monde est un knight
val result = graphRDD.filter(node => {
if (node._2.tiebreakvalue == -1) false
else true
//If every node is a knight we can return
if (result.isEmpty()) return
//Else we restart the loop
//Return the colors of the graph => node._2.color).collect()
object testAlgo extends App {
val conf = new SparkConf()
.setAppName("Petersen Graph version Examen")
val sc = SparkContext.getOrCreate(conf)
var edges = Array(
edge_data(1L, 2L), edge_data(1L, 3L), edge_data(1L, 6L),
edge_data(2L, 7L), edge_data(2L, 8L),
edge_data(3L, 4L), edge_data(3L, 9L),
edge_data(4L, 5L), edge_data(4L, 8L),
edge_data(5L, 6L), edge_data(5L, 7L),
edge_data(6L, 10L),
edge_data(7L, 9L),
edge_data(8L, 10L),
edge_data(9L, 10L)
//Créer une collection avec des yield
var graph = for (i <- 1 to 10) yield Tuple2(i-1, new node(1, 0, 10))
//Fill the adjmatrix
edges.foreach(e => {
val src = e.src.toInt
val dst = e.dst.toInt
graph(src - 1)._2.adjlist(dst - 1) = 1
graph(dst - 1)._2.adjlist(src - 1) = 1
graph foreach( e=> print(e._2.printadjlist+"\n") )
graph foreach println
//Execute the algorithm
val results = coloring.fc2(graph.toArray, sc)
println("\nColors of the graph\n")
results.foreach(color => print(color + " "))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment