Skip to content

Instantly share code, notes, and snippets.

View pchlupacek's full-sized avatar

Pavel Chlupacek pchlupacek

View GitHub Profile
package deiko
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scalaz.concurrent.Task
import scalaz.stream._
import play.api.libs.iteratee._
import Process._
object Conversion {
@pchlupacek
pchlupacek / ProcessQueue.scala
Last active December 20, 2015 21:39
Attempt to integrate process with outside world in nonblocking way
package com.spinoco.util.streams
import java.util.concurrent.{LinkedBlockingQueue, ConcurrentLinkedQueue}
import scalaz.stream.Process
import scalaz.stream.processes._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scalaz.concurrent.Task
import scalaz.\/
@pchlupacek
pchlupacek / gist:7395481
Created November 10, 2013 08:34
Memory leak example
package scalaz.stream
import org.scalacheck.Prop._
import org.scalacheck.Properties
import scala.concurrent.SyncVar
import scalaz.\/
import scalaz.stream.Process._
import scalaz.stream.async.mutable.Queue
import java.util.concurrent.{TimeUnit, Executors}
package scalaz.stream
import scalaz.stream.Process._
import scalaz.stream.These.{This, That}
sealed trait ReceiveThese[+A,+B]
case class Receive[+A,+B](t:These[A,B]) extends ReceiveThese[A,B]
@pchlupacek
pchlupacek / gist:7813959
Last active December 30, 2015 10:09
file parse example with scalaz-stream
import scalaz._
import scalaz.stream.Process._
import scalaz.stream.Process
import scalaz.stream.io
import scalaz.stream.process1
import scala.util.matching.Regex
def matchRegex(pattern:Regex) : Process1[String,String] =
receive1[String,String]({
@pchlupacek
pchlupacek / gist:7826399
Last active December 30, 2015 11:59
StateTopic
/**
* Topic that wrap state processor to produce
* - discrete stream of states that was created by publishing `A`
* - continuous stream of states that was created by publishing `A`
* - subscription to states and updates (S,A)
*/
class StateTopic[S, A](stateProcessor: Process1[A, (S, A)], strategy: Strategy) {
val topic: Topic[(A or (S, A))] = async.topic[(A or (S, A))](strategy).journal(
collect[(A or (S, A)), A]({ case a: A => a }) |>
package com.spinoco
import scalaz.stream.{Exchange, process1, Process}
import scalaz.stream.Process._
import scalaz.concurrent.Task
import scalaz.{-\/, \/-, \/}
/**
* Created by pach on 18/01/14.
*/
@pchlupacek
pchlupacek / gist:9298516
Created March 1, 2014 22:32
Process2 with Append
package scalaz.stream
import scalaz._
import scalaz.concurrent.Task
import scala.annotation.tailrec
/**
* Created by pach on 01/03/14.
*/
/** Feed a sequence of inputs to a `Process1`. */
def feed[I, O](i: Seq[I])(p: Process1[I, O]): Process1[I, O] = {
@tailrec
def go(
in: Seq[I], out: Vector[O]
, cur: Process1[I, O]
, stack: Vector[Throwable => Trampoline[Process1[I, O]]]
): Process1[I, O] = {
if (in.nonEmpty) {
@pchlupacek
pchlupacek / ExperimentSpec.scala
Last active August 29, 2015 14:01
issues with object
package scalaz.stream2
import Process._
import org.scalacheck.Prop._
import org.scalacheck.Properties
import scalaz.concurrent.{Strategy, Task}
import scalaz.{\/, stream2, \/-, -\/}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.SyncVar
import scala.concurrent.duration._