Skip to content

Instantly share code, notes, and snippets.

@pchlupacek
Created January 18, 2014 11:39
Show Gist options
  • Save pchlupacek/8489285 to your computer and use it in GitHub Desktop.
Save pchlupacek/8489285 to your computer and use it in GitHub Desktop.
package com.spinoco
import scalaz.stream.{Exchange, process1, Process}
import scalaz.stream.Process._
import scalaz.concurrent.Task
import scalaz.{-\/, \/-, \/}
/**
* Created by pach on 18/01/14.
*/
package object http {
type HttpMessage[H <: HttpMessageHeader, A] = Process[Task, H \/ A]
type HttpRequest[A] = HttpMessage[HttpRequestHeader, A]
type HttpResponse[A] = HttpMessage[HttpResponseHeader, A]
type HttpChannel[A,B] = Channel[Task,HttpRequest[A],HttpResponse[B]]
type WebSocket[A,B] = Exchange[HttpRequest[A],HttpResponse[B]]
implicit class HttpRequestSyntax[A, H <: HttpMessageHeader](val p: HttpMessage[H, A]) {
/** see [[scalaz.stream.Process.map( )]] **/
def map[B](f: A => B): HttpMessage[H, B] =
p.map(_.map(f))
/** see [[scalaz.stream.Process.flatMap( )]] **/
def flatMap[B](f: A => Process[Task, B]): HttpMessage[H, B] =
p.flatMap {
case \/-(a) => f(a).map(b => \/-(b))
case -\/(h) => emit(-\/(h))
}
/** pipes through supplied Process1 all `A` received to produce `B` **/
def pipe[B](p1: Process1[A, B]): HttpMessage[H, B] =
p.pipeHeader(_ => p1)
/** once header is known, it runs it through f to get Process1 to pipe `A` through it **/
def pipeHeader[B](f: H => Process1[A, B]) = {
def go(cur:Process1[A, B]): Process1[H \/ A, H \/ B] = {
receive1[H \/ A, H \/ B]({
case \/-(a) => cur.feed1(a).unemit match {
case (xb,hlt@Halt(rsn)) => emitSeq(xb.map(\/-(_))) fby hlt
case (xb,next) => emitSeq(xb.map(\/-(_))) fby go(next)
}
case -\/(h) => Halt(new Exception("Only one header in message, got second one: " + h))
})
}
def awaitH:Process1[H \/ A, H \/ B] =
receive1[H \/ A, H \/ B]({
case \/-(a) => Halt(new Exception("Header must be first in message, got: "+ a))
case -\/(h) => go(f(h))
})
p.pipe(awaitH)
}
}
}
@pchiusano
Copy link

I think it is a little weird that the header can be emitted at any time (according to the type), when in fact the header always has to come first. You could make the input:

type HTTPMessage[H,A] = Process[Task, (H, Process[Task, A])]

Aside - I don't know if the H <: HttpMessageHeader is buying you anything. Just let HTTPMessage be fully polymorphic and use pimping if you wish to expose different functionality depending on the H.

The HTTPMessage type above is actually not specific to http at all, it's just a structured stream with two typed parts, a bit like a session type.

Also, somewhat related, but for streaming Http, but I've used SSE streams (with scalaz-stream) to get streaming from the server to the client. You often only need streaming in that direction - in the other direction (client to server) you can make ordinary requests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment