Skip to content

Instantly share code, notes, and snippets.

@ivantopo
Created April 23, 2014 02:07
Show Gist options
  • Save ivantopo/11200612 to your computer and use it in GitHub Desktop.
Save ivantopo/11200612 to your computer and use it in GitHub Desktop.
import akka.util.Timeout
import ChunkingActorCassandraIterator.{Click, Interval}
import akka.actor._
import akka.io.Tcp
import java.util.Date
import kamon.trace.TraceRecorder
import org.json4s.DefaultFormats
import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag
import scala.util.Try
import spray.http.{ContentTypes, HttpEntity, MessageChunk, ChunkedMessageEnd}
import spray.httpx.Json4sSupport
import spray.httpx.marshalling.{MarshallingContext, Marshaller}
import spray.routing.SimpleRoutingApp
import akka.pattern.ask
import scala.concurrent.duration._
object IteratorExample extends App with SimpleRoutingApp with Json4sSupport {
implicit val system = ActorSystem("iterator-example")
implicit val ec: ExecutionContext = system.dispatcher
implicit val timeout = Timeout(10 seconds)
implicit val json4sFormats = DefaultFormats
implicit def long2DateConvertible(value: Long) = new {
def toDate: Date = new Date(value)
}
implicit def cassandraIterableMarshaller[T](implicit marshaller: Marshaller[T], refFactory: ActorRefFactory) =
Marshaller[CassandraPaginateIterator[T]] {
(value, ctx) ⇒
// This happens as a callback of a future created inside the `complete` directive in the route, so it captures
// the same `TraceContext` available at that moment. When the value (the `CassandraPaginateIterator`) is sent
// to the `ChunkingActorCassandraIterator` that same `TraceContext` is available and attached to the message.
refFactory.actorOf(Props.apply(new ChunkingActorCassandraIterator(marshaller, ctx))) ! value
}
startServer(interface = "0.0.0.0", port = 9090) {
path("clicks") {
get {
parameters('start.as[Long], 'end.as[Long]) { (start, end) =>
val tryInterval = Try {
(start.toDate, end.toDate)
}
validate(tryInterval.isSuccess, "Url params must be Date") {
val (intervalStart, intervalEnd) = tryInterval.get
//time("clicks") { // Probably you won't need this, Kamon will give you the metrics! :)
complete {
(businessActor ? Interval(intervalStart, intervalEnd)).mapTo[CassandraPaginateIterator[Click]]
}
//}
}
}
}
}
}
val businessActor = system.actorOf(Props(new Actor {
def receive: Actor.Receive = {
case Interval(_, _) =>
sender ! new CassandraPaginateIterator[Click](List(
List(Click("kamon.io", 100), Click("akka.io", 100)),
List(Click("spray.io", 100), Click("google.com", 100))
).iterator)
}
}))
}
class CassandraPaginateIterator[T : ClassTag](data: Iterator[Iterable[T]]) extends Iterator[Iterable[T]] {
override def next(): Iterable[T] = data.next()
override def hasNext: Boolean = data.hasNext
}
object ChunkingActorCassandraIterator {
case class ACK(iterator: CassandraPaginateIterator[_])
case object ClosedBracket
//added by ivantopo
case class Interval(start: Date, end: Date)
case class Click(page: String, count: Int)
}
//Not handling any error, which erros are possible?
class ChunkingActorCassandraIterator[T <: AnyRef](marshaller: Marshaller[T], ctx: MarshallingContext) extends Actor { // with PlatformFormats { //with Logging {
import ChunkingActorCassandraIterator._
import org.json4s.native.Serialization._
import context._
implicit val formats = DefaultFormats
var connectionActor: ActorRef = _
def receive = firstChunk
def firstChunk: Receive = {
// When this message is received there is a `TraceContext` attached to it, so it becomes available during the processing
// of this message. Then, the call to `ctx.startChunkedMessage` ends up sending a message with your `ACK` internally,
// and that message, of course, carries the `TraceContext` with it. When the spray-can pipeline is processing this
// message the `TraceContext` is available and when it sends the `ACK` back to you, the context comes back too..
// this is how the context is being propagated back and forth in this example.
// Note: Be careful with pattern matching on parametrized types, erasure is playing against you!
case cassieIt: CassandraPaginateIterator[T] ⇒
if (!cassieIt.hasNext) {
sendEmptyIterator()
} else {
val page: Iterable[T] = cassieIt.next()
val itemsStr = makeChunk("[", page)
val headers = List.empty
val entity = makeEntity(itemsStr)
if (itemsStr == "") {
self ! cassieIt
} else {
become(remainingChunks)
connectionActor = ctx.startChunkedMessage(entity, Some(ACK(cassieIt)), headers)
}
}
case _: Tcp.ConnectionClosed ⇒
context.stop(self)
case ClosedBracket =>
connectionActor ! ChunkedMessageEnd.withAck(ChunkedMessageEnd)
case ChunkedMessageEnd => context.stop(self)
}
def remainingChunks: Receive = {
case ACK(cassieIt: CassandraPaginateIterator[T]) ⇒
if (!cassieIt.hasNext) {
finalMessage()
}
else {
val page: Iterable[T] = cassieIt.next()
val chunk = makeChunk(",",page)
if (chunk == "") {
self ! ACK(cassieIt)
} else {
val message = MessageChunk(chunk).withAck(ACK(cassieIt))
connectionActor ! message
}
}
case ClosedBracket => connectionActor ! ChunkedMessageEnd.withAck(ChunkedMessageEnd)
case ChunkedMessageEnd => context.stop(self)
case _: Tcp.ConnectionClosed ⇒
context.stop(self)
}
private def sendEmptyIterator() = {
val entity = HttpEntity(ContentTypes.`application/json`, "[]")
connectionActor = ctx.startChunkedMessage(entity, Some(ClosedBracket))
}
private def makeChunk(prefix: String, page: Iterable[T]): String = {
if (page.isEmpty) {
""
} else {
val items = page.map(item => write(item))
val result = s"$prefix${items.mkString(",")}"
result
}
}
private def finalMessage() = connectionActor ! MessageChunk("]").withAck(ClosedBracket)
private def makeEntity(content: String) = HttpEntity(ContentTypes.`application/json`, content)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment