Skip to content

Instantly share code, notes, and snippets.

View pchiusano's full-sized avatar

Paul Chiusano pchiusano

View GitHub Profile
@pchiusano
pchiusano / Signals.scala
Created January 30, 2014 22:32
Killing a running `Process` asynchronously
import scala.concurrent.duration._
import scalaz.stream.{async, Process, wye}
val p1 = Process.awakeEvery(3 seconds)
val alive = async.signal[Unit]; alive.set(()).run
// this could be a generic combinator - zip takes the 'shorter' stream, so
// when 'alive' cuts out, that will kill off `p1`
val killableP1 = alive.continuous.zip(p1).map(_._2)
@pchiusano
pchiusano / Nonblocking.scala
Created February 2, 2014 21:34
A Par type for representing parallel computations, without blocking, developed in chapter 7 of FP in Scala
package fpinscala.parallelism
import java.util.concurrent.{Callable, CountDownLatch, ExecutorService}
import java.util.concurrent.atomic.AtomicReference
object Nonblocking {
trait Future[+A] {
private[parallelism] def apply(k: A => Unit): Unit
}
@pchiusano
pchiusano / prefetch.scala
Created February 10, 2014 15:31
Prefetching combinator for converting a `Tee` to a `Wye`, allowing for bounded nondeterminism
/**
* Convert the given `Tee` to a `Wye`, by allowing for bounded nondeterminism.
* That is, when the `Tee` requests from the left, as long as less than `maxBuffer`
* elements are enqueued on the right, convert this to an `AwaitBoth`. Likewise
* when the `Tee` requests from the right. Thus, elements are fed to the `Tee`
* in the order it demands, but we allow for nondeterminism in fetching the
* elements demanded.
*/
def prefetch[A,B,C](maxBuffer: Int)(tee: Tee[A,B,C]): Wye[A,B,C] = {
import scalaz.stream.tee.{AwaitL,AwaitR}
@pchiusano
pchiusano / reduceBalanced.scala
Created February 14, 2014 01:27
Balanced reduction algorithm
/**
* Do a 'balanced' reduction of `v`. Provided `f` is associative, this
* returns the same result as `v.reduceLeft(f)`, but uses a balanced
* tree of concatenations, which is more efficient for operations that
* must copy both `A` values to combine them in `f` - O(n*log n) rather than
* quadratic.
*
* Implementation uses a stack that combines the top two elements of the
* stack using `f` if the top element is more than half the size of the
* element below it.
@pchiusano
pchiusano / caseclasses.scala
Last active August 29, 2015 13:56
Default `unapply` function generated by case classes in Scala has the 'wrong' type
// I think I understand why this is, but it's an unfortunate consequence
// of encoding data constructors as subtypes - the extractors have an overly
// specific type (they take the subclass rather than actual type you care about).
scala> trait Super
defined trait Super
scala> case class Duper(i: Int) extends Super
defined class Duper
@pchiusano
pchiusano / Proc2.scala
Created February 19, 2014 04:16
Trampolined process representation
package scalaz.stream
import scalaz.concurrent.Task
import scalaz.\/
trait Proc2[+F[_],+O] {
import Proc2._
def flatMap[F2[x]>:F[x],O2](f: O => Proc2[F2,O2]): Proc2[F2,O2] =
this match {
@pchiusano
pchiusano / typetag.scala
Created March 4, 2014 21:51
TypeTag sometimes prints qualified names, and other times not.
scala> import scala.reflect.runtime.universe._
import scala.reflect.runtime.universe._
scala> typeTag[List[Double]]
res0: reflect.runtime.universe.TypeTag[List[Double]] = TypeTag[scala.List[Double]]
scala> typeTag[List[Double]].tpe.toString
res1: String = scala.List[Double]
scala> def foo[A,B](f: A => B)(implicit T: TypeTag[B]) = T.tpe.toString
@pchiusano
pchiusano / available.markdown
Created March 7, 2014 15:16
Fetching available elements in scalaz-stream
  case class Env[-I,-I2]() {
    sealed trait Y[-X] {
      def tag: Int
      def fold[R](l: => R, r: => R, both: => R): R
    }
    sealed trait T[-X] extends Y[X]
    sealed trait Is[-X] extends T[X]
    case object Left extends Is[I] {
 def tag = 0
@pchiusano
pchiusano / streamingdecoding.scala
Created March 19, 2014 19:56
Streaming binary decoding example
trait StreamDecoder[+A] {
import scodec.stream.{decode => D}
def mapEither[B](f: A => String \/ B): StreamDecoder[B] =
flatMap { a => f(a).fold(D.fail, D.emit) }
}
val packets: StreamDecoder[MpegPacket] = pcapRecordStreamDecoder mapEither { record =>
val datagramPayloadBits = record.data.drop(22 * 8).drop(20 * 8)
decode.many[MpegPacket].decode(datagramPayloadBits)
}
@pchiusano
pchiusano / unframing.scala
Last active August 29, 2015 13:57
Break a binary input stream along frame boundaries using a pure `Process1`
import scodec.codecs
import scodec.bits.{BitVector,ByteVector}
import scalaz.stream.{Process1,Process}
object Unframing {
/**
* Break an input bytestream along frame boundaries. Input consists of a stream of frames,
* where each frame is just a number of bytes, encoded as an int32, followed by a packet of
* that many bytes. End of stream is indicated with a frame of size <= 0. Output stream is