Skip to content

Instantly share code, notes, and snippets.

@huntc
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save huntc/4c25821b0dfb24fe18fc to your computer and use it in GitHub Desktop.
Save huntc/4c25821b0dfb24fe18fc to your computer and use it in GitHub Desktop.
Initial attempt at marshalling a bundle request given multipart formdata
package com.typesafe.reactiveruntime.net
import akka.actor.{ActorRef, Actor}
import akka.http.Unmarshal
import akka.http.model.HttpMethods._
import akka.http.model.{ HttpEntity, HttpRequest, HttpResponse, MultipartFormData, StatusCodes }
import akka.http.unmarshalling.Unmarshalling
import akka.pattern.ask
import akka.stream.scaladsl.{ ImplicitFlowMaterializer, Flow }
import akka.util.{ Timeout, ByteString }
import com.typesafe.reactiveruntime.load.LoadScheduler
import com.typesafe.reactiveruntime.load.LoadScheduler.BundleLoaded
import com.typesafe.reactiveruntime.start.StartScheduler.ScheduleBundle
import com.typesafe.reactiveruntime.{ Digest, _ }
import com.typesafe.reactiveruntime.net.Router._
import java.util.UUID
import org.reactivestreams.Publisher
import scala.collection.immutable.Set
import scala.concurrent.Future
/**
* Handles requests w.r.t. bundle lifecycle commands.
*/
trait BundleReqMarshalling extends ImplicitFlowMaterializer {
this: Actor =>
protected val loadScheduler: ActorRef
protected val startScheduler: ActorRef
private type Requirement = (Double, Long, Long, Set[String])
val bundleReqRoutes: PartialFunction[HttpRequest, Future[HttpResponse]] = {
case HttpRequest(POST, p"/bundles", _, entity, _) => loadBundle(entity)
case HttpRequest(POST, p"/bundles/$id/started", _, entity, _) => startBundle(id)
}
private val CpusRequiredField = "cpusRequired"
private val MemoryRequiredField = "memoryRequired"
private val TotalFileSizeField = "totalFileSize"
private val RolesField = "roles"
private val RequirementFields = Set(
CpusRequiredField,
MemoryRequiredField,
TotalFileSizeField,
RolesField)
private val settings = Settings(context.system)
private implicit val loaderBundleRetrieveTimeout = Timeout(settings.loaderBundleRetrieveTimeout)
private def loadBundle(entity: HttpEntity): Future[HttpResponse] = {
import context.dispatcher
val marshalled: Future[(Requirement, Publisher[(String, Digest, Publisher[ByteString])])] =
Unmarshal(entity).to[MultipartFormData] flatMap {
case Unmarshalling.Success(multiPartFormData) =>
(Flow(multiPartFormData.parts) prefixAndTail RequirementFields.size).toFuture() flatMap {
case (prefixBodyParts, tailBodyParts) =>
val prefixBodyPartsMappings = prefixBodyParts map { prefixBodyPart =>
val name = prefixBodyPart.name.getOrElse("")
val textValue = Unmarshal(prefixBodyPart.entity).to[String] map {
case Unmarshalling.Success(value) =>
value
case _ =>
throw new IllegalArgumentException("There was a problem marshalling the form data")
}
textValue map (v => name -> v)
}
val requirements = Future.sequence(prefixBodyPartsMappings) map (_.toMap) map {
case r if r.keySet == RequirementFields => (
r.get(CpusRequiredField).get.toDouble,
r.get(MemoryRequiredField).get.toLong,
r.get(TotalFileSizeField).get.toLong,
(r.get(RolesField).get split ' ').toSet)
case _ =>
throw new IllegalArgumentException("There was a problem marshalling the form data - missing field.")
}
val fileSources = (Flow(tailBodyParts) collect {
case part if (part.name == Some("bundle") || part.name == Some("config")) && part.filename.isDefined =>
val hash = part.filename.get.reverse.dropWhile(_ != '.').takeWhile(_ != '-').reverse
(part.name.get, Digest("SHA-256", hash.toByteString), part.entity.dataBytes(flowMaterializer))
}).toPublisher()
requirements map { requirement =>
(requirement, fileSources)
}
}
case e =>
throw new IllegalArgumentException(s"There was a problem marshalling the multipart form data: $e")
}
val reply = for {
((cpusRequired, memoryRequired, totalFileSize, roles), sources) <- marshalled
bundleLoaded <- (loadScheduler ? LoadScheduler.LoadBundle(
UUID.randomUUID(),
sources,
cpusRequired,
memoryRequired,
totalFileSize,
roles)).mapTo[BundleLoaded]
} yield HttpResponse(entity = bundleLoaded.bundleId.toString)
reply recover {
case e: IllegalArgumentException =>
HttpResponse(StatusCodes.BadRequest, entity = e.getMessage)
case e =>
HttpResponse(StatusCodes.InternalServerError, entity = e.getMessage)
}
}
private def startBundle(id: String): Future[HttpResponse] = {
startScheduler ! ScheduleBundle(UUID.randomUUID(), BundleId(id), 1)
Future.successful(HttpResponse(entity = "Scheduled"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment