Skip to content

Instantly share code, notes, and snippets.

@bblfish
Created January 31, 2012 23:07
Show Gist options
  • Save bblfish/1713663 to your computer and use it in GitHub Desktop.
Save bblfish/1713663 to your computer and use it in GitHub Desktop.
An test to start fetching content asynchronously using the aalto parser.
/*
* Copyright (c) 2012 Henry Story (bblfish.net)
* under the MIT licence defined at
* http://www.opensource.org/licenses/mit-license.html
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of
* this software and associated documentation files (the "Software"), to deal in the
* Software without restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
* PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
* OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.w3c.readwriteweb.cache
import com.ning.http.client.AsyncHandler.STATE
import com.ning.http.client.AsyncHandler
import com.ning.http.client.AsyncHttpClient
import com.ning.http.client.HttpResponseBodyPart
import com.ning.http.client.HttpResponseHeaders
import com.ning.http.client.HttpResponseStatus
import java.net.URL
import org.w3.readwriteweb.Lang
import com.weiglewilczek.slf4s.Logging
import com.hp.hpl.jena.rdf.model.{ModelFactory, Model}
import scalaz.Zero
import java.util.Collections
import com.hp.hpl.jena.sparql.function.library.e
import com.fasterxml.aalto.stax.InputFactoryImpl
import com.fasterxml.aalto.{AsyncXMLStreamReader, AsyncInputFeeder}
import com.hp.hpl.jena.rdf.arp.{StAX2SAX, SAX2Model}
import org.xml.sax.ContentHandler
import javax.xml.stream.{XMLInputFactory, XMLStreamReader}
import patch.AsyncJenaParser
/**
* @author bblfish
* @created 27/01/2012
*/
class URLFetcher(url: URL) extends AsyncHandler[Model]() with Logging {
import scala.collection.JavaConverters._
// var reader: RDFReader = _
// var base: String = _
var status: HttpResponseStatus = _
var counter = 0
var base: String = _
var asyncParser: AsyncJenaParser = _
lazy val asyncReader: AsyncXMLStreamReader = new InputFactoryImpl().createAsyncXMLStreamReader();
lazy val feeder: AsyncInputFeeder = asyncReader.getInputFeeder();
lazy val model: Model = ModelFactory.createDefaultModel()
def onThrowable(t: Throwable) {
logger.error(t.getMessage)
}
def onBodyPartReceived(bodyPart: HttpResponseBodyPart) = {
logger.info("body part n."+counter)
counter += 1
val bytes = bodyPart.getBodyPartBytes
if (feeder.needMoreInput()) {
logger.info("giving feeder more input. n of bytes is "+bytes.length)
feeder.feedInput(bytes,0,bytes.length)
} else logger.info("feeder don't want more input")
//should one check if asyncParser needs more input?
asyncParser.parse()
STATE.CONTINUE
}
def onStatusReceived(responseStatus: HttpResponseStatus) = {
status = responseStatus
STATE.CONTINUE
}
def onHeadersReceived(headers: HttpResponseHeaders) = {
if (status.getStatusCode < 200 && status.getStatusCode > 204) {
STATE.CONTINUE
} else {
val typeHdr = nullSquisher( headers.getHeaders.get("Content-Type") ).asScala
logger.info("Content-Types ➤ " + typeHdr.mkString(" ➤ "))
val mime = typeHdr.flatMap(mime => Lang(mime.split(";")(0))).headOption
val locHdr = nullSquisher ( headers.getHeaders.get("Content-Location")).asScala
logger.info("Content-Location ➤ " + locHdr.mkString(" ➤ "))
val location = locHdr.headOption match {
case Some(loc) => new URL(url, loc)
case None => new URL(url.getProtocol, url.getAuthority, url.getPort, url.getPath)
}
base = location.toString
// currently we assume rdf/xml
// val lang = mime getOrElse Lang.default
// reader = model.getReader(lang.jenaLang)
asyncParser = new AsyncJenaParser(SAX2Model.create(base, model),asyncReader)
STATE.CONTINUE
}
}
def onCompleted() = {
feeder.endOfInput()
model
}
def nullSquisher[T](body: => T)(implicit z: Zero[T]): T =
try {
val res = body;
if (res == null) z.zero else res
} catch {
case e => {
logger.warn("squished an exception to null",e)
z.zero
}
}
implicit def JavaListZero[A]: Zero[java.util.List[A]] = new Zero[java.util.List[A]] { val zero = Collections.emptyList[A]() }
}
object ModelCache {
lazy val url = "http://bblfish.net/people/henry/card.rdf"
lazy val client = new AsyncHttpClient
def response(url: String) = client.prepareGet(url).
setFollowRedirects(true).
execute(new URLFetcher(new URL(url)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment