Skip to content

Instantly share code, notes, and snippets.

@lossyrob
Created December 24, 2014 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lossyrob/2219a363818e30a1f33a to your computer and use it in GitHub Desktop.
Save lossyrob/2219a363818e30a1f33a to your computer and use it in GitHub Desktop.
Attribute Catalog classes
package geotrellis.spark.io.accumulo
import geotrellis.spark._
import geotrellis.spark.io._
import spray.json._
import scala.collection.JavaConversions._
import org.apache.spark.Logging
import org.apache.accumulo.core.client.Connector
import org.apache.accumulo.core.security.Authorizations
import org.apache.accumulo.core.data._
import org.apache.hadoop.io.Text
class AccumuloAttributeCatalog(connector: Connector, val catalogTable: String) extends AttributeCatalog with Logging {
type ReadableWritable[T] = RootJsonFormat[T]
private def fetch(layerId: LayerId, attributeName: String): List[Value] = {
val scanner = connector.createScanner(catalogTable, new Authorizations())
scanner.setRange(new Range(new Text(layerId.toString)))
scanner.fetchColumnFamily(new Text(attributeName))
scanner.iterator.toList.map(_.getValue)
}
def load[T: RootJsonFormat](layerId: LayerId, attributeName: String): T = {
val values = fetch(layerId, attributeName)
if(values.size == 0) {
sys.error(s"Attribute $attributeName not found for layer $layerId")
} else if(values.size > 1) {
sys.error(s"Multiple attributes found for $attributeName for layer $layerId")
} else {
values.head.toString.parseJson.convertTo[T]
}
}
def save[T: RootJsonFormat](layerId: LayerId, attributeName: String, value: T): Unit = {
val mutation = new Mutation()
mutation.put( //RasterMetaData
new Text(attributeName), new Text(), System.currentTimeMillis(),
new Value(value.toJson.compactPrint.getBytes)
)
connector.write(catalogTable, mutation)
}
}
trait AttributeCatalog {
type ReadableWritable[T]
def load[T: ReadableWritable](layerId: LayerId, attributeName: String): T
def save[T: ReadableWritable](layerId: LayerId, attributeName: String, value: T): Unit
}
package geotrellis.spark.io.hadoop
import geotrellis.spark._
import geotrellis.spark.io._
import spray.json._
import org.apache.hadoop.fs.Path
import org.apache.spark._
import java.io.PrintWriter
class HadoopAttributeCatalog(sc: SparkContext, catalogRoot: Path, layerDataDir: LayerId => String, metaDataFileName: String) extends AttributeCatalog {
type ReadableWritable[T] = RootJsonFormat[T]
val fs = catalogRoot.getFileSystem(sc.hadoopConfiguration)
def attributePath(layerId: LayerId, attributeName: String): Path =
new Path(new Path(catalogRoot, layerDataDir(layerId)), s"${attributeName}.json")
def load[T: RootJsonFormat](layerId: LayerId, attributeName: String): T = {
val path = attributePath(layerId, attributeName)
val txt = HdfsUtils.getLineScanner(path, sc.hadoopConfiguration) match {
case Some(in) =>
try {
in.mkString
}
finally {
in.close
}
case None =>
throw new LayerNotFoundError(layerId)
}
txt.parseJson.convertTo[T]
}
def save[T: RootJsonFormat](layerId: LayerId, attributeName: String, value: T): Unit = {
val path = attributePath(layerId, attributeName)
if(fs.exists(path)) {
fs.delete(path, false)
}
val fdos = fs.create(path)
val out = new PrintWriter(fdos)
try {
out.println(value.toJson)
} finally {
out.close()
fdos.close()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment