Skip to content

Instantly share code, notes, and snippets.

@bblfish
Created January 29, 2012 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bblfish/1701141 to your computer and use it in GitHub Desktop.
Save bblfish/1701141 to your computer and use it in GitHub Desktop.
Starting to play with writing a RDF client using the async-http-client https://github.com/ning/async-http-client
package org.w3c.readwriteweb.cache
import com.ning.http.client.AsyncHandler.STATE
import com.ning.http.client.AsyncHandler
import com.ning.http.client.AsyncHttpClient
import com.ning.http.client.HttpResponseBodyPart
import com.ning.http.client.HttpResponseHeaders
import com.ning.http.client.HttpResponseStatus
import com.ning.http.client.Response
import java.util.concurrent.Future
import com.hp.hpl.jena.rdf.model.{ModelFactory, Model}
import java.net.URL
import org.w3.readwriteweb.Lang
import java.io._
import com.weiglewilczek.slf4s.Logging
import java.util.List
import specs2.html
/**
* @author bblfish
* @created 27/01/2012
*/
class URLFetcher(url: URL) extends AsyncHandler[Model]() with Logging {
import scala.collection.JavaConversions._
val in = new PipedInputStream()
val out = new PipedOutputStream(in)
var result: Model = _
var status: HttpResponseStatus = _
def onThrowable(t: Throwable) {
logger.error(t.getMessage)
}
def onBodyPartReceived(bodyPart: HttpResponseBodyPart) = {
bodyPart.writeTo(out)
STATE.CONTINUE
}
def onStatusReceived(responseStatus: HttpResponseStatus) = {
status = responseStatus
STATE.CONTINUE
}
def onHeadersReceived(headers: HttpResponseHeaders) = {
if (status.getStatusCode < 200 && status.getStatusCode > 204) {
STATE.CONTINUE
} else {
val typeHdr = headers.getHeaders.get("Content-Type")
logger.info("Content-Types ➤ " + typeHdr.mkString(" ➤ "))
val mime = typeHdr.flatMap(mime => Lang(mime.split(";")(0))).headOption
val locHdr = headers.getHeaders.get("Content-Location")
logger.info("Content-Location ➤ " + locHdr.mkString(" ➤ "))
val location = locHdr.headOption match {
case Some(loc) => new URL(url, loc)
case None => new URL(url.getProtocol, url.getAuthority, url.getPort, url.getPath)
}
result = modelFromInputStream(in, url, mime getOrElse Lang.default)
STATE.CONTINUE
}
}
def onCompleted() = result
def modelFromInputStream(is: InputStream,
base: URL,
lang: Lang): Model =
try {
val m = ModelFactory.createDefaultModel()
m.getReader(lang.jenaLang).read(m, in, base.toString)
m
} catch {
case t => {
logger.info("cought exception turning stream into model ", t)
throw t
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment