Created
November 26, 2013 13:17
-
-
Save samklr/7658116 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package nodescala | |
import com.sun.net.httpserver._ | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import ExecutionContext.Implicits.global | |
import scala.async.Async.{async, await} | |
import scala.collection._ | |
import scala.collection.JavaConversions._ | |
import java.util.concurrent.{Executor, ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} | |
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer} | |
import java.net.InetSocketAddress | |
/** Contains utilities common to the NodeScala© framework. | |
*/ | |
trait NodeScala { | |
import NodeScala._ | |
def port: Int | |
def createListener(relativePath: String): Listener | |
/** Uses the response object to respond to the write the response back. | |
* The response should be written back in parts, and the method should | |
* occasionally check that server was not stopped, otherwise a very long | |
* response may take very long to finish. | |
* | |
* @param exchange the exchange used to write the response back | |
* @param token the cancellation token for | |
* @param body the response to write back | |
*/ | |
private def respond(exchange: Exchange, token: CancellationToken, response: Response): Unit = { | |
while (response.hasNext && token.nonCancelled) { | |
val s = response.next | |
exchange.write(s) | |
} | |
exchange.close | |
} | |
/** A server: | |
* 1) creates and starts an http listener | |
* 2) creates a cancellation token (hint: use one of the `Future` companion methods) | |
* 3) as long as the token is not cancelled and there is a request from the http listener | |
* asynchronously process that request using the `respond` method | |
* | |
* @param relativePath a relative path on which to start listening on | |
* @param handler a function mapping a request to a response | |
* @return a subscription that can stop the server and all its asynchronous operations *entirely*. | |
*/ | |
def start(relativePath: String)(handler: Request => Response): Subscription = { | |
val l = createListener(relativePath) | |
val listenerSub = l.start | |
val requestSub = Future.run() { ct => | |
Future { | |
while (ct.nonCancelled) { | |
val f = l.nextRequest | |
f onComplete { | |
case anyValue => { | |
val req = anyValue.get._1 | |
val exc = anyValue.get._2 | |
val res = handler(req) | |
respond(exc, ct, res) | |
} | |
} | |
} | |
} | |
} | |
Subscription.apply(listenerSub, requestSub) | |
} | |
} | |
object NodeScala { | |
/** A request is a multimap of headers, where each header is a key-value pair of strings. | |
*/ | |
type Request = Map[String, List[String]] | |
/** A response consists of a potentially long string (e.g. a data file). | |
* To be able to process this string in parts, the response is encoded | |
* as an iterator over a subsequences of the response string. | |
*/ | |
type Response = Iterator[String] | |
/** Used to write the response to the request. | |
*/ | |
trait Exchange { | |
/** Writes to the output stream of the exchange. | |
*/ | |
def write(s: String): Unit | |
/** Communicates that the response has ended and that there | |
* will be no further writes. | |
*/ | |
def close(): Unit | |
def request: Request | |
} | |
object Exchange { | |
def apply(exchange: HttpExchange) = new Exchange { | |
val os = exchange.getResponseBody() | |
exchange.sendResponseHeaders(200, 0L) | |
def write(s: String) = os.write(s.getBytes) | |
def close() = os.close() | |
def request: Request = { | |
val headers = for ((k, vs) <- exchange.getRequestHeaders) yield (k, vs.toList) | |
immutable.Map() ++ headers | |
} | |
} | |
} | |
trait Listener { | |
def port: Int | |
def relativePath: String | |
def start(): Subscription | |
def createContext(handler: Exchange => Unit): Unit | |
def removeContext(): Unit | |
/** Given a relative path: | |
* 1) constructs an uncompleted promise | |
* 2) installs an asynchronous request handler using `createContext` | |
* that completes the promise with a request when it arrives | |
* and then deregisters itself using `removeContext` | |
* 3) returns the future with the request | |
* | |
* @param relativePath the relative path on which we want to listen to requests | |
* @return the promise holding the pair of a request and an exchange object | |
*/ | |
def nextRequest(): Future[(Request, Exchange)] = { | |
val p = Promise[(Request, Exchange)]() | |
val f = p.future | |
createContext(ex => { | |
p.success((ex.request, ex)) | |
}) | |
f onComplete {ex => | |
removeContext | |
} | |
f | |
} | |
} | |
object Listener { | |
class Default(val port: Int, val relativePath: String) extends Listener { | |
private val s = HttpServer.create(new InetSocketAddress(port), 0) | |
private val executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue) | |
s.setExecutor(executor) | |
def start() = { | |
s.start() | |
new Subscription { | |
def unsubscribe() = { | |
s.stop(0) | |
executor.shutdown() | |
} | |
} | |
} | |
def createContext(handler: Exchange => Unit) = s.createContext(relativePath, new HttpHandler { | |
def handle(httpxchg: HttpExchange) = handler(Exchange(httpxchg)) | |
}) | |
def removeContext() = s.removeContext(relativePath) | |
} | |
} | |
/** The standard server implementation. | |
*/ | |
class Default(val port: Int) extends NodeScala { | |
def createListener(relativePath: String) = new Listener.Default(port, relativePath) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment