Created
March 11, 2018 18:34
-
-
Save markus1189/e4c8719f217c928e298f5f194087504c to your computer and use it in GitHub Desktop.
Monoid
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -*- compile-command: "scalafmt Code.scala && amm Code.scala" -*- | |
/* | |
Code for the Java-Magazin Article | |
Author: Markus Hauck, codecentric AG | |
You can execute this using ammonite (http://ammonite.io/#Ammonite-REPL) | |
> amm Code.scala | |
Alternatively, you can take all $ivy imports and build an sbt project yourself | |
*/ | |
import $ivy.`org.apache.spark:spark-core_2.11:2.2.1` | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext | |
import org.apache.spark.SparkConf | |
import org.apache.spark.SparkContext | |
import org.apache.spark.rdd.RDD | |
case class Config(one: String, two: String) | |
object Listings extends App { | |
trait Monoid[A] { | |
def empty: A | |
def combine(lhs: A, rhs: A): A | |
} | |
object Monoid { | |
def apply[A: Monoid]: Monoid[A] = implicitly | |
} | |
implicit val stringMonoid: Monoid[String] = new Monoid[String] { | |
override val empty: String = | |
" " | |
override def combine(lhs: String, rhs: String) = lhs + rhs | |
} | |
implicit def listMonoid[A]: Monoid[List[A]] = new Monoid[List[A]] { | |
override val empty: List[A] = List() | |
override def combine(xs: List[A], ys: List[A]): List[A] = | |
xs ++ ys | |
} | |
object Examples1 { | |
val list = Monoid[List[Int]].combine(List(1, 2, 3), List(4, 5, 6)) | |
val str = Monoid[String].empty | |
val str2 = Monoid[String].combine("Hello, ", "World!") | |
} | |
implicit class MonoidOps[A](self: A) { | |
def |+|(other: A)(implicit m: Monoid[A]) = m.combine(self, other) | |
} | |
object Examples2 { | |
val lst = List(1) |+| List(2, 3) | |
val str = "Hello, " |+| "World!" | |
} | |
def functionMonoid[A, B: Monoid]: Monoid[A => B] = new Monoid[A => B] { | |
override val empty: A => B = _ => Monoid[B].empty | |
override def combine(f: A => B, g: A => B): A => B = | |
input => Monoid[B].combine(f(input), g(input)) | |
} | |
def futureMonoid[A: Monoid]( | |
implicit ec: ExecutionContext): Monoid[Future[A]] = | |
new Monoid[Future[A]] { | |
override val empty: Future[A] = Future.successful(Monoid[A].empty) | |
override def combine(lhs: Future[A], rhs: Future[A]): Future[A] = | |
for { | |
(x, y) ← lhs.zip(rhs) | |
} yield Monoid[A].combine(x, y) | |
} | |
object Patterns2 { | |
def sum(xs: Seq[Int]): Int = xs.foldLeft(0)(_ + _) | |
def product(xs: Seq[Int]): Int = xs.foldLeft(1)(_ * _) | |
def concat(xs: Seq[String]): String = xs.foldLeft("")(_ + _) | |
def fold[A: Monoid](xs: Seq[A]) = | |
xs.foldLeft(Monoid[A].empty)(Monoid[A].combine) | |
} | |
implicit def tupleMonoid[A: Monoid, B: Monoid]: Monoid[(A, B)] = | |
new Monoid[(A, B)] { | |
override val empty = (Monoid[A].empty, Monoid[B].empty) | |
override def combine(lhs: (A, B), rhs: (A, B)): (A, B) = | |
(lhs._1 |+| rhs._1, lhs._2 |+| rhs._2) | |
} | |
object Examples3 { | |
import Patterns2._ | |
implicit val intMonoid = new Monoid[Int] { | |
override val empty = 0 | |
override def combine(x: Int, y: Int): Int = x + y | |
} | |
val result = fold(Seq(1 -> "a", 2 -> "b", 3 -> "c")) | |
} | |
case class Add(value: Int) extends AnyVal | |
object Add { | |
implicit val addMonoid: Monoid[Add] = new Monoid[Add] { | |
override val empty = new Add(0) | |
override def combine(lhs: Add, rhs: Add) = | |
Add(lhs.value + rhs.value) | |
} | |
} | |
{ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
val m1 = (config: Config) => | |
Future { | |
Map(1 -> ("eins", config.one)) | |
} | |
val m2 = (config: Config) => | |
Future { | |
Map( | |
1 -> ("one", s"${config.one}!"), | |
2 -> ("zwei", Some(config.two)) | |
) | |
} | |
} | |
sealed trait Max | |
final case object NegInf extends Max | |
final case class Maximum(value: Int) extends Max | |
object Max { | |
implicit val maxMonoid: Monoid[Max] = new Monoid[Max] { | |
override val empty = NegInf | |
override def combine(lhs: Max, rhs: Max) = (lhs, rhs) match { | |
case (NegInf, y) => y | |
case (x, NegInf) => x | |
case ((Maximum(x)), Maximum(y)) => Maximum(x.max(y)) | |
} | |
} | |
} | |
sealed trait Min | |
final case object PosInf extends Min | |
final case class Minimum(value: Int) extends Min | |
object Min { | |
implicit val minMonoid: Monoid[Min] = new Monoid[Min] { | |
override val empty = PosInf | |
override def combine(lhs: Min, rhs: Min) = (lhs, rhs) match { | |
case (PosInf, y) => y | |
case (x, PosInf) => x | |
case ((Minimum(x)), Minimum(y)) => Minimum(x.max(y)) | |
} | |
} | |
} | |
implicit def tuple4Monoid[A: Monoid, B: Monoid, C: Monoid, D: Monoid] | |
: Monoid[(A, B, C, D)] = | |
new Monoid[(A, B, C, D)] { | |
override val empty = | |
(Monoid[A].empty, Monoid[B].empty, Monoid[C].empty, Monoid[D].empty) | |
override def combine(lhs: (A, B, C, D), rhs: (A, B, C, D)): (A, B, C, D) = | |
(lhs._1 |+| rhs._1, | |
lhs._2 |+| rhs._2, | |
lhs._3 |+| rhs._3, | |
lhs._4 |+| rhs._4) | |
} | |
implicit def tuple5Monoid[A: Monoid, | |
B: Monoid, | |
C: Monoid, | |
D: Monoid, | |
E: Monoid]: Monoid[(A, B, C, D, E)] = | |
new Monoid[(A, B, C, D, E)] { | |
override val empty = | |
(Monoid[A].empty, | |
Monoid[B].empty, | |
Monoid[C].empty, | |
Monoid[D].empty, | |
Monoid[E].empty) | |
override def combine(lhs: (A, B, C, D, E), | |
rhs: (A, B, C, D, E)): (A, B, C, D, E) = | |
(lhs._1 |+| rhs._1, | |
lhs._2 |+| rhs._2, | |
lhs._3 |+| rhs._3, | |
lhs._4 |+| rhs._4, | |
lhs._5 |+| rhs._5) | |
} | |
implicit def mapMonoid[A, B: Monoid]: Monoid[Map[A, B]] = | |
new Monoid[Map[A, B]] { | |
override val empty = Map() | |
override def combine(lhs: Map[A, B], rhs: Map[A, B]): Map[A, B] = { | |
lhs.foldLeft(rhs) { | |
case (acc, (k, v)) => | |
acc.updated( | |
k, | |
acc.get(k).map(vv => Monoid[B].combine(v, vv)).getOrElse(v)) | |
} | |
} | |
} | |
implicit class MonoidRDD[T](val rdd: RDD[T]) { | |
def combineAll(implicit M: Monoid[T]): T = | |
rdd.fold(M.empty)(M.combine(_, _)) | |
} | |
object SparkExample extends App { | |
val conf = new SparkConf() // not shown: how to setup config | |
val sc: SparkContext = new SparkContext(conf) | |
def expand(word: String): (Max, Min, Add, Add) = { | |
(Maximum(word.length), Minimum(word.length), Add(word.length), Add(1)) | |
} | |
val input = "Textanalyse mit Monoiden macht Spaß!".split("""\s+""") // (1) | |
val (max, min, chars, words) = // (2) | |
sc.parallelize(input).flatMap(_.split("""\s+""")).map(expand).combineAll | |
println(s"""Finished calculation: | |
| - max word length: ${max} | |
| - min word length: ${min} | |
| - total characters: ${chars.value} | |
| - total words: ${words.value} | |
| - average word length: ${chars.value / words.value} | |
|""".stripMargin) | |
} | |
object SparkExample2 extends App { | |
val conf = new SparkConf() // not shown: how to setup config | |
val sc: SparkContext = new SparkContext(conf) | |
def expand(word: String): (Max, Min, Add, Add, Map[String, Add]) = { // (1) | |
(Maximum(word.length), | |
Minimum(word.length), | |
Add(word.length), | |
Add(1), | |
Map(word -> Add(1))) | |
} | |
val input = "Textanalyse mit Monoiden macht Spaß!".split("""\s+""") | |
val (max, min, chars, words, wordMap) = // (2) | |
sc.parallelize(input).flatMap(_.split("""\s+""")).map(expand).combineAll | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment