Skip to content

Instantly share code, notes, and snippets.

@NicolasRouquette
Last active July 10, 2017 23:11
Show Gist options
  • Save NicolasRouquette/68e6dec26f5b2752612b42efc163d6fa to your computer and use it in GitHub Desktop.
Save NicolasRouquette/68e6dec26f5b2752612b42efc163d6fa to your computer and use it in GitHub Desktop.
http4s / fs2 version of Cafe examples
package cafe
import java.lang.{Integer,System}
import fs2._
import scala.{App,Long,StringContext}
object Main1 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder = Task {
c.getOrder()
}
// A process that repeatedly ask for an order
val askForOrders = Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders = askForOrders.take(norders)
// Run and collect the results for print out.
val orderingResult = orders.runLog.unsafeRun()
orderingResult.foreach(order => System.out.println(s"order: $order"))
}
package cafe
import java.lang.{Integer,System}
import fs2._
import scala.{App,Long,StringContext}
object Main2 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder = Task {
c.getOrder()
}
// A process that repeatedly ask for an order
val askForOrders = Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders = askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly = orders.map(_.items)
// Run and collect the results for print out.
val orderingResult = drinksOnly.runLog.unsafeRun()
orderingResult.foreach(order => System.out.println(s"order: $order"))
}
package cafe
import java.lang.{Integer, System}
import fs2._
import scalaz._
import scala.collection.immutable.Seq
import scala.{App, Long, StringContext}
import scala.Predef.String
object Main3 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder
: Task[Order]
= Task { c.getOrder() }
// A process that repeatedly ask for an order
val askForOrders
: Stream[Task, Order]
= Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders
: Stream[Task, Order]
= askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly
: Stream[Task, Seq[Item]]
= orders.map(_.items)
// Based on "iced", map to left or right. Use \/ (disjunction).
// Each drink order is broken out individually.
val coldOrHotIndividualDrink
: Stream[Task, \/[Item, Item]]
= drinksOnly.flatMap { items =>
Stream.emits(items.map { item =>
if (item.iced) -\/(item)
else \/-(item)
})
}
val printer
: Pipe[Task, \/[Item,Item], String]
= { in =>
in.map {
case -\/(i) =>
s"Left: $i"
case \/-(i) =>
s"Right: $i"
}
}
val items
: Stream[Task, String]
= coldOrHotIndividualDrink.flatMap(ii => printer(Stream.emit(ii)))
val waiter
: Sink[Task, String]
= { in =>
in.map { message =>
System.out.println(message)
()
}
}
// Run and collect the results for print out.
val orderingResult = items to waiter
orderingResult.run.unsafeRun()
}
package cafe
import java.lang.{Integer, System}
import fs2._
import scalaz._
import scala.collection.immutable.Seq
import scala.{App, Long, StringContext}
import scala.Predef.String
object Main4 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder
: Task[Order]
= Task { c.getOrder() }
// A process that repeatedly ask for an order
val askForOrders
: Stream[Task, Order]
= Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders
: Stream[Task, Order]
= askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly
: Stream[Task, Seq[Item]]
= orders.map(_.items)
// Based on "iced", map to left or right. Use \/ (disjunction).
// Each drink order is broken out individually.
val coldOrHotIndividualDrink
: Pipe[Task, Item, \/[Item, Item]]
= { in =>
in.map { item =>
if (item.iced) -\/(item)
else \/-(item)
}
}
val printer
: Pipe[Task, \/[Item,Item], String]
= { in =>
in.map {
case -\/(i) =>
s"Left: $i"
case \/-(i) =>
s"Right: $i"
}
}
val items
: Stream[Task, String]
= drinksOnly.flatMap(Stream.emits).through(coldOrHotIndividualDrink).flatMap(ii => printer(Stream.emit(ii)))
val waiter
: Sink[Task, String]
= { in =>
in.map { message =>
System.out.println(message)
()
}
}
// Run and collect the results for print out.
val orderingResult = items to waiter
orderingResult.run.unsafeRun()
}
package cafe
import java.lang.{Integer, System}
import fs2._
import scalaz._
import scala.collection.immutable.Seq
import scala.{App, Long, StringContext}
import scala.Predef.String
object Main5 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder
: Task[Order]
= Task { c.getOrder() }
// A process that repeatedly ask for an order
val askForOrders
: Stream[Task, Order]
= Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders
: Stream[Task, Order]
= askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly
: Pipe[Task, Order, Seq[Item]]
= { in =>
in.map { o =>
o.items
}
}
// Based on "iced", map to left or right. Use \/ (disjunction).
// Each drink order is broken out individually.
val coldOrHotIndividualDrink
: Pipe[Task, Item, \/[Item, Item]]
= { in =>
in.map { item =>
if (item.iced) -\/(item)
else \/-(item)
}
}
val printer
: Pipe[Task, \/[Item,Item], String]
= { in =>
in.map {
case -\/(i) =>
s"Left: $i"
case \/-(i) =>
s"Right: $i"
}
}
val items
: Stream[Task, String]
= orders
.through(drinksOnly)
.flatMap(Stream.emits)
.through(coldOrHotIndividualDrink)
.flatMap(ii => printer(Stream.emit(ii)))
val waiter
: Sink[Task, String]
= { in =>
in.map { message =>
System.out.println(message)
()
}
}
// Run and collect the results for print out.
val orderingResult = items to waiter
orderingResult.run.unsafeRun()
}
package cafe
import java.lang.{Integer, System}
import fs2._
import scalaz._
import scala.collection.immutable.Seq
import scala.{App, Long, StringContext}
import scala.Predef.String
object Main6 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder
: Task[Order]
= Task { c.getOrder() }
// A process that repeatedly ask for an order
val askForOrders
: Stream[Task, Order]
= Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders
: Stream[Task, Order]
= askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly
: Pipe[Task, Order, Seq[Item]]
= { in =>
in.map { o =>
o.items
}
}
// Based on "iced", map to left or right. Use \/ (disjunction).
// Each drink order is broken out individually.
val coldOrHotIndividualDrink
: Pipe[Task, Item, \/[Item, Item]]
= { in =>
in.map { item =>
if (item.iced) -\/(item)
else \/-(item)
}
}
def createDrinks(barista: Barista)
: Pipe[Task, \/[Item, Item], Drink]
= { in => in.map {
case -\/(item) =>
barista.prepareColdDrink(item)
case \/-(item) =>
barista.prepareHotDrink(item)
}}
val printer
: Pipe[Task, Drink, String]
= { in =>
in.map { _.toString }
}
val items
: Stream[Task, String]
= orders
.through(drinksOnly)
.flatMap(Stream.emits)
.through(coldOrHotIndividualDrink)
.through(createDrinks(Barista()))
.flatMap(ii => printer(Stream.emit(ii)))
val waiter
: Sink[Task, String]
= { in =>
in.map { message =>
System.out.println(message)
()
}
}
// Run and collect the results for print out.
val orderingResult = items to waiter
orderingResult.run.unsafeRun()
}
package cafe
import java.lang.{Integer, System, Thread}
import fs2._
import fs2.async._
import scala.collection.immutable.Seq
import scala.{App, Int, Long, StringContext, Unit}
import scala.Predef.String
object Main7 extends App {
implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val norders: Long = Integer.parseInt(args(0)).toLong
val hotQueueLength: Int = Integer.parseInt(args(1))
val coldQueueLength: Int = Integer.parseInt(args(2))
val c = Cafe()
System.out.println(s"Cafe ${c.name} opened for business")
// An asynchronous task that asks for a single order in
// case getOrder() does always not return quickly.
val askForOrder
: Task[Order]
= Task {
val o = c.getOrder()
System.out.println(s"""${Thread.currentThread.getName} asking for $o""")
o
}
// A process that repeatedly ask for an order
val askForOrders
: Stream[Task, Order]
= Stream.repeatEval(askForOrder)
// The process that asks for N orders. Normally the ordering
// process would end when the Cafe closes. This process
// enters a Halt(Cause.End) state automatically after the take() reaches
// its limit.
val orders
: Stream[Task, Order]
= askForOrders.take(norders)
// Splitting the order means extracting out the drinks.
val drinksOnly
: Pipe[Task, Order, Seq[Item]]
= { in =>
in.map { o =>
o.items
}
}
val hotDrinkQueue
: Task[mutable.Queue[Task, Item]]
= boundedQueue[Task, Item](maxSize = hotQueueLength)
val coldDrinkQueue
: Task[mutable.Queue[Task, Item]]
= boundedQueue[Task, Item](maxSize = coldQueueLength)
// Based on "iced", map to left or right. Use \/ (disjunction).
// Each drink order is broken out individually.
def coldOrHotIndividualDrink
(hq: mutable.Queue[Task, Item],
cq: mutable.Queue[Task, Item])
: Pipe[Task, Item, Unit]
= { in =>
in.map { item =>
if (item.iced)
cq.enqueue1(item).unsafeRun()
else
hq.enqueue1(item).unsafeRun()
}
}
def processHotDrinks(barista: Barista)
: Pipe[Task, Item, Drink]
= { _.map(barista.prepareHotDrink) }
def processColdDrinks(barista: Barista)
: Pipe[Task, Item, Drink]
= { _.map(barista.prepareColdDrink) }
def hotBaristaWorker
(q: mutable.Queue[Task, Item],
barista: Barista)
: Stream[Task, Drink]
= q.size.discrete.flatMap { _ =>
q.dequeue.map { item =>
barista.prepareHotDrink(item)
}
}
def coldBaristaWorker
(q: mutable.Queue[Task, Item],
barista: Barista)
: Stream[Task, Drink]
= q.size.discrete.flatMap { _ =>
q.dequeue.map { item =>
barista.prepareColdDrink(item)
}
}
val joinDrinkQueue
: Task[mutable.Queue[Task, Drink]]
= boundedQueue[Task, Drink](maxSize = 3)
def joinBaristaWorker
(s: Stream[Task, Drink],
j: mutable.Queue[Task, Drink])
: Stream[Task, Unit]
= s.to(j.enqueue)
val printer
: Pipe[Task, Drink, String]
= { in =>
in.map { _.toString }
}
def items
(hq: mutable.Queue[Task, Item],
cq: mutable.Queue[Task, Item])
: Stream[Task, Unit]
= orders
.through(drinksOnly)
.flatMap(Stream.emits)
.through(coldOrHotIndividualDrink(hq, cq))
def waiter
(j: mutable.Queue[Task, Drink])
: Stream[Task, Unit]
= j.size.discrete.flatMap { _ =>
j.dequeue.map { drink =>
System.out.println(drink)
()
}
}
val b1 = Barista("Jack")
val b2 = Barista("Peter")
val b3 = Barista("Jane")
val b4 = Barista("Susan")
val hq = hotDrinkQueue.unsafeRun()
val cq = coldDrinkQueue.unsafeRun()
val j = joinDrinkQueue.unsafeRun()
val h1 = hotBaristaWorker(hq, b1)
val h2 = hotBaristaWorker(hq, b2)
val h3 = hotBaristaWorker(hq, b3)
val c1 = coldBaristaWorker(cq, b4)
val i: Stream[Task, Unit] = items(hq, cq)
val j1: Stream[Task, Unit] = joinBaristaWorker(h1, j)
val j2: Stream[Task, Unit] = joinBaristaWorker(h2, j)
val j3: Stream[Task, Unit] = joinBaristaWorker(h3, j)
val j4: Stream[Task, Unit] = joinBaristaWorker(c1, j)
val w: Stream[Task, Unit] = waiter(j)
i.merge(j1).merge(j2).merge(j3).merge(j4).merge(w).drain.run.unsafeRun()
}
@NicolasRouquette
Copy link
Author

These are fs2 versions of scalaz examples from this book:
https://aappddeevv.gitbooks.io/test_private_book/content/examples/cafe.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment