Skip to content

Instantly share code, notes, and snippets.

@markus1189
Created March 11, 2018 18:34
Show Gist options
  • Save markus1189/e4c8719f217c928e298f5f194087504c to your computer and use it in GitHub Desktop.
Save markus1189/e4c8719f217c928e298f5f194087504c to your computer and use it in GitHub Desktop.
Monoid
// -*- compile-command: "scalafmt Code.scala && amm Code.scala" -*-
/*
Code for the Java-Magazin Article
Author: Markus Hauck, codecentric AG
You can execute this using ammonite (http://ammonite.io/#Ammonite-REPL)
> amm Code.scala
Alternatively, you can take all $ivy imports and build an sbt project yourself
*/
import $ivy.`org.apache.spark:spark-core_2.11:2.2.1`
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
case class Config(one: String, two: String)
object Listings extends App {
trait Monoid[A] {
def empty: A
def combine(lhs: A, rhs: A): A
}
object Monoid {
def apply[A: Monoid]: Monoid[A] = implicitly
}
implicit val stringMonoid: Monoid[String] = new Monoid[String] {
override val empty: String =
" "
override def combine(lhs: String, rhs: String) = lhs + rhs
}
implicit def listMonoid[A]: Monoid[List[A]] = new Monoid[List[A]] {
override val empty: List[A] = List()
override def combine(xs: List[A], ys: List[A]): List[A] =
xs ++ ys
}
object Examples1 {
val list = Monoid[List[Int]].combine(List(1, 2, 3), List(4, 5, 6))
val str = Monoid[String].empty
val str2 = Monoid[String].combine("Hello, ", "World!")
}
implicit class MonoidOps[A](self: A) {
def |+|(other: A)(implicit m: Monoid[A]) = m.combine(self, other)
}
object Examples2 {
val lst = List(1) |+| List(2, 3)
val str = "Hello, " |+| "World!"
}
def functionMonoid[A, B: Monoid]: Monoid[A => B] = new Monoid[A => B] {
override val empty: A => B = _ => Monoid[B].empty
override def combine(f: A => B, g: A => B): A => B =
input => Monoid[B].combine(f(input), g(input))
}
def futureMonoid[A: Monoid](
implicit ec: ExecutionContext): Monoid[Future[A]] =
new Monoid[Future[A]] {
override val empty: Future[A] = Future.successful(Monoid[A].empty)
override def combine(lhs: Future[A], rhs: Future[A]): Future[A] =
for {
(x, y) ← lhs.zip(rhs)
} yield Monoid[A].combine(x, y)
}
object Patterns2 {
def sum(xs: Seq[Int]): Int = xs.foldLeft(0)(_ + _)
def product(xs: Seq[Int]): Int = xs.foldLeft(1)(_ * _)
def concat(xs: Seq[String]): String = xs.foldLeft("")(_ + _)
def fold[A: Monoid](xs: Seq[A]) =
xs.foldLeft(Monoid[A].empty)(Monoid[A].combine)
}
implicit def tupleMonoid[A: Monoid, B: Monoid]: Monoid[(A, B)] =
new Monoid[(A, B)] {
override val empty = (Monoid[A].empty, Monoid[B].empty)
override def combine(lhs: (A, B), rhs: (A, B)): (A, B) =
(lhs._1 |+| rhs._1, lhs._2 |+| rhs._2)
}
object Examples3 {
import Patterns2._
implicit val intMonoid = new Monoid[Int] {
override val empty = 0
override def combine(x: Int, y: Int): Int = x + y
}
val result = fold(Seq(1 -> "a", 2 -> "b", 3 -> "c"))
}
case class Add(value: Int) extends AnyVal
object Add {
implicit val addMonoid: Monoid[Add] = new Monoid[Add] {
override val empty = new Add(0)
override def combine(lhs: Add, rhs: Add) =
Add(lhs.value + rhs.value)
}
}
{
import scala.concurrent.ExecutionContext.Implicits.global
val m1 = (config: Config) =>
Future {
Map(1 -> ("eins", config.one))
}
val m2 = (config: Config) =>
Future {
Map(
1 -> ("one", s"${config.one}!"),
2 -> ("zwei", Some(config.two))
)
}
}
sealed trait Max
final case object NegInf extends Max
final case class Maximum(value: Int) extends Max
object Max {
implicit val maxMonoid: Monoid[Max] = new Monoid[Max] {
override val empty = NegInf
override def combine(lhs: Max, rhs: Max) = (lhs, rhs) match {
case (NegInf, y) => y
case (x, NegInf) => x
case ((Maximum(x)), Maximum(y)) => Maximum(x.max(y))
}
}
}
sealed trait Min
final case object PosInf extends Min
final case class Minimum(value: Int) extends Min
object Min {
implicit val minMonoid: Monoid[Min] = new Monoid[Min] {
override val empty = PosInf
override def combine(lhs: Min, rhs: Min) = (lhs, rhs) match {
case (PosInf, y) => y
case (x, PosInf) => x
case ((Minimum(x)), Minimum(y)) => Minimum(x.max(y))
}
}
}
implicit def tuple4Monoid[A: Monoid, B: Monoid, C: Monoid, D: Monoid]
: Monoid[(A, B, C, D)] =
new Monoid[(A, B, C, D)] {
override val empty =
(Monoid[A].empty, Monoid[B].empty, Monoid[C].empty, Monoid[D].empty)
override def combine(lhs: (A, B, C, D), rhs: (A, B, C, D)): (A, B, C, D) =
(lhs._1 |+| rhs._1,
lhs._2 |+| rhs._2,
lhs._3 |+| rhs._3,
lhs._4 |+| rhs._4)
}
implicit def tuple5Monoid[A: Monoid,
B: Monoid,
C: Monoid,
D: Monoid,
E: Monoid]: Monoid[(A, B, C, D, E)] =
new Monoid[(A, B, C, D, E)] {
override val empty =
(Monoid[A].empty,
Monoid[B].empty,
Monoid[C].empty,
Monoid[D].empty,
Monoid[E].empty)
override def combine(lhs: (A, B, C, D, E),
rhs: (A, B, C, D, E)): (A, B, C, D, E) =
(lhs._1 |+| rhs._1,
lhs._2 |+| rhs._2,
lhs._3 |+| rhs._3,
lhs._4 |+| rhs._4,
lhs._5 |+| rhs._5)
}
implicit def mapMonoid[A, B: Monoid]: Monoid[Map[A, B]] =
new Monoid[Map[A, B]] {
override val empty = Map()
override def combine(lhs: Map[A, B], rhs: Map[A, B]): Map[A, B] = {
lhs.foldLeft(rhs) {
case (acc, (k, v)) =>
acc.updated(
k,
acc.get(k).map(vv => Monoid[B].combine(v, vv)).getOrElse(v))
}
}
}
implicit class MonoidRDD[T](val rdd: RDD[T]) {
def combineAll(implicit M: Monoid[T]): T =
rdd.fold(M.empty)(M.combine(_, _))
}
object SparkExample extends App {
val conf = new SparkConf() // not shown: how to setup config
val sc: SparkContext = new SparkContext(conf)
def expand(word: String): (Max, Min, Add, Add) = {
(Maximum(word.length), Minimum(word.length), Add(word.length), Add(1))
}
val input = "Textanalyse mit Monoiden macht Spaß!".split("""\s+""") // (1)
val (max, min, chars, words) = // (2)
sc.parallelize(input).flatMap(_.split("""\s+""")).map(expand).combineAll
println(s"""Finished calculation:
| - max word length: ${max}
| - min word length: ${min}
| - total characters: ${chars.value}
| - total words: ${words.value}
| - average word length: ${chars.value / words.value}
|""".stripMargin)
}
object SparkExample2 extends App {
val conf = new SparkConf() // not shown: how to setup config
val sc: SparkContext = new SparkContext(conf)
def expand(word: String): (Max, Min, Add, Add, Map[String, Add]) = { // (1)
(Maximum(word.length),
Minimum(word.length),
Add(word.length),
Add(1),
Map(word -> Add(1)))
}
val input = "Textanalyse mit Monoiden macht Spaß!".split("""\s+""")
val (max, min, chars, words, wordMap) = // (2)
sc.parallelize(input).flatMap(_.split("""\s+""")).map(expand).combineAll
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment