Skip to content

Instantly share code, notes, and snippets.

@davidhoyt
Last active August 29, 2015 14:14
Show Gist options
  • Save davidhoyt/f2c81f6fe8dcb06f7448 to your computer and use it in GitHub Desktop.
Save davidhoyt/f2c81f6fe8dcb06f7448 to your computer and use it in GitHub Desktop.
Attempts to take an HList like String :: Int :: HNil and zip it with resolved type class instances to produce (String, Marshaller[String]) :: (Int, Marshaller[Int]) :: HNil and then for each pair emit an http chunk via spray
// http://scastie.org/8089
/***
scalaVersion := "2.11.4"
libraryDependencies ++= Seq(
"io.spray" %% "spray-routing-shapeless2" % "1.3.2"
exclude("com.chuusai", "shapeless"),
"com.chuusai" %% "shapeless" % "2.1.0-RC2",
"com.typesafe.akka" %% "akka-actor" % "2.3.9"
)
*/
package playground.shapeless
import spray.routing.HttpServiceActor
object ChunkedHList {
import akka.actor._
import akka.io.Tcp
import scala.util.control.NonFatal
import shapeless._
import ops.hlist._
import Poly._
import HList._
import Zipper._
import spray._
import spray.http._
import httpx._
import marshalling._
type MarshallableChunk = (Any, Marshaller[Any])
type HListMarshaller[L] = Marshaller[L] // with HListMarshaller0
trait HListMarshaller0
object marshaller extends Poly1 {
implicit def caseMarshaller[T](implicit marshaller: Marshaller[T]) = at[T](_ => marshaller)
}
def marshallersFor[L <: HList, M <: HList](h: L)(implicit mapper: Mapper.Aux[marshaller.type, L, M], zip: Zip[L :: M :: HNil]): Zip[L :: M :: HNil]#Out =
h.zip(h.map(marshaller))
implicit def marshallerForHList[L <: HList, M <: HList]
(hlist: L, marshalTo: ContentType*)
(implicit actorRefFactory: ActorRefFactory,
mapper: Mapper.Aux[marshaller.type, L, M],
zip: Zip[L :: M :: HNil],
toList: ToList[Zip[L :: M :: HNil]#Out, MarshallableChunk]): HListMarshaller[L] =
new Marshaller[L] with HListMarshaller0 {
def apply(value: L, ctx: MarshallingContext): Unit =
try {
ctx.tryAccept(marshalTo) match {
case Some(contentType) =>
forHList(value, contentType, ctx)
case None =>
ctx.rejectMarshalling(marshalTo)
}
} catch {
case NonFatal(e) ⇒ ctx.handleError(e)
}
def forHList(value: L, contentType: ContentType, ctx: MarshallingContext): Unit = {
import scala.collection.immutable.{:: => !::}
case class Continue(remaining: List[MarshallableChunk])
class ChunkingHListActor(chunks: List[MarshallableChunk]) extends Actor with ActorLogging {
var responder: ActorRef = _
def stop(): Unit = {
responder ! ChunkedMessageEnd
context.stop(self)
}
def receive = {
case Continue((current: Any, marshaller: Marshaller[Any]) !:: (remaining: List[MarshallableChunk])) =>
val contextForMarshallableChunk = new DelegatingMarshallingContext(ctx) {
override def marshalTo(entity: HttpEntity, headers: HttpHeader*): Unit =
if (responder eq null)
responder = ctx.startChunkedMessage(entity, Some(Continue(remaining)), headers)
else
responder ! MessageChunk(entity.data).withAck(Continue(remaining))
override def handleError(error: Throwable): Unit =
stop()
override def startChunkedMessage(entity: HttpEntity, sentAck: Option[Any], headers: Seq[HttpHeader])(implicit sender: ActorRef) = {
log.error("Cannot marshal this")
???
}
}
marshaller(current, contextForMarshallableChunk)
case Continue(Nil) =>
stop()
case _: Tcp.ConnectionClosed =>
context.stop(self)
case unknown =>
log.error("Received unknown message in ChunkingHListActor: {}", unknown)
stop()
}
}
val chunks: List[MarshallableChunk] = marshallersFor(value).toList[MarshallableChunk]
actorRefFactory.actorOf(Props[ChunkingHListActor]) ! Continue(chunks)
}
}
implicit def toResponseMarshallable[L <: HList](hlist: => L)(implicit marshaller: Marshaller[L] with HListMarshaller0): ToResponseMarshallable =
new ToResponseMarshallable {
override def marshal(ctx: ToResponseMarshallingContext): Unit = {
val rm = ToResponseMarshaller.fromMarshaller[L]()(marshaller)
rm.apply(hlist, ctx)
}
}
// def completeHList[L <: HList](hlist: => L)(implicit marshaller: HListMarshaller[L]) = {
// new StandardRoute {
// override def apply(ctx: RequestContext): Unit =
// ctx.complete(hlist)(marshaller)
// }
// }
}
class ShapelessSpray extends HttpServiceActor {
import spray.routing._
import Directives._
def receive = runRoute(myRoute)
def myRoute: Route = {
get {
path("test") {
import ChunkedHList._
import shapeless._
val h = "A" :: HNil
implicit val marshaller = marshallerForHList(h)
complete(toResponseMarshallable(h))
}
}
}
}
object ShapelessSpray extends App {
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import spray.http._
implicit val timeout = Timeout(1.hour)
implicit val system = ActorSystem("ShapelessSpray")
val ref = system.actorOf(Props[ShapelessSpray])
println(Await.result((ref ? HttpRequest(HttpMethods.GET, Uri("http://foobar.com/test"))).mapTo[HttpResponse], 1.hour).entity.asString)
system.awaitTermination()
system.shutdown()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment