Skip to content

Instantly share code, notes, and snippets.

@MansurAshraf
Last active August 29, 2015 14:21
Show Gist options
  • Save MansurAshraf/d8277a5bcb12c0a6f994 to your computer and use it in GitHub Desktop.
Save MansurAshraf/d8277a5bcb12c0a6f994 to your computer and use it in GitHub Desktop.
package com.twitter.algebird
import com.twitter.scalding.typed.TypedPipe
/**
* @author Mansur Ashraf.
*/
class jankyMonoid {
implicit val sg = new EnumSemiGroup
val aggregator = Aggregator.fromSemigroup[(String,Enum)]
.andThenPresent {
case (id,Enum(x)) if x == "enumVal3" => (id,"stateOne")
case (id,_) => (id,"stateTwo")
}
val input = List(
("id1", Enum("enumVal1")),
("id1", Enum(" enumVal2")),
("id2", Enum(" enumVal2")),
("id3", Enum(" enumVal1")),
("id3", Enum(" enumVal2")),
("id3", Enum(" enumVal3")))
TypedPipe.from(input)
.groupBy(_._1)
.aggregate(aggregator)
}
case class Enum(value: String)
class EnumSemiGroup extends Semigroup[(String,Enum)] {
override def plus(left: (String,Enum), right: (String,Enum)): (String,Enum) = (left, right) match {
case ((lid,Enum(l)), (rid,Enum(r))) if l == "enumVal3" => (lid,Enum(l))
case ((lid,Enum(l)), (rid,Enum(r))) if r == "enumVal3" => (rid,Enum(r))
case ((lid,Enum(l)), (rid,Enum(r))) => (rid,Enum(r))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment