Created
December 24, 2014 16:31
-
-
Save lossyrob/2219a363818e30a1f33a to your computer and use it in GitHub Desktop.
Attribute Catalog classes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package geotrellis.spark.io.accumulo | |
import geotrellis.spark._ | |
import geotrellis.spark.io._ | |
import spray.json._ | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.Logging | |
import org.apache.accumulo.core.client.Connector | |
import org.apache.accumulo.core.security.Authorizations | |
import org.apache.accumulo.core.data._ | |
import org.apache.hadoop.io.Text | |
class AccumuloAttributeCatalog(connector: Connector, val catalogTable: String) extends AttributeCatalog with Logging { | |
type ReadableWritable[T] = RootJsonFormat[T] | |
private def fetch(layerId: LayerId, attributeName: String): List[Value] = { | |
val scanner = connector.createScanner(catalogTable, new Authorizations()) | |
scanner.setRange(new Range(new Text(layerId.toString))) | |
scanner.fetchColumnFamily(new Text(attributeName)) | |
scanner.iterator.toList.map(_.getValue) | |
} | |
def load[T: RootJsonFormat](layerId: LayerId, attributeName: String): T = { | |
val values = fetch(layerId, attributeName) | |
if(values.size == 0) { | |
sys.error(s"Attribute $attributeName not found for layer $layerId") | |
} else if(values.size > 1) { | |
sys.error(s"Multiple attributes found for $attributeName for layer $layerId") | |
} else { | |
values.head.toString.parseJson.convertTo[T] | |
} | |
} | |
def save[T: RootJsonFormat](layerId: LayerId, attributeName: String, value: T): Unit = { | |
val mutation = new Mutation() | |
mutation.put( //RasterMetaData | |
new Text(attributeName), new Text(), System.currentTimeMillis(), | |
new Value(value.toJson.compactPrint.getBytes) | |
) | |
connector.write(catalogTable, mutation) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait AttributeCatalog { | |
type ReadableWritable[T] | |
def load[T: ReadableWritable](layerId: LayerId, attributeName: String): T | |
def save[T: ReadableWritable](layerId: LayerId, attributeName: String, value: T): Unit | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package geotrellis.spark.io.hadoop | |
import geotrellis.spark._ | |
import geotrellis.spark.io._ | |
import spray.json._ | |
import org.apache.hadoop.fs.Path | |
import org.apache.spark._ | |
import java.io.PrintWriter | |
class HadoopAttributeCatalog(sc: SparkContext, catalogRoot: Path, layerDataDir: LayerId => String, metaDataFileName: String) extends AttributeCatalog { | |
type ReadableWritable[T] = RootJsonFormat[T] | |
val fs = catalogRoot.getFileSystem(sc.hadoopConfiguration) | |
def attributePath(layerId: LayerId, attributeName: String): Path = | |
new Path(new Path(catalogRoot, layerDataDir(layerId)), s"${attributeName}.json") | |
def load[T: RootJsonFormat](layerId: LayerId, attributeName: String): T = { | |
val path = attributePath(layerId, attributeName) | |
val txt = HdfsUtils.getLineScanner(path, sc.hadoopConfiguration) match { | |
case Some(in) => | |
try { | |
in.mkString | |
} | |
finally { | |
in.close | |
} | |
case None => | |
throw new LayerNotFoundError(layerId) | |
} | |
txt.parseJson.convertTo[T] | |
} | |
def save[T: RootJsonFormat](layerId: LayerId, attributeName: String, value: T): Unit = { | |
val path = attributePath(layerId, attributeName) | |
if(fs.exists(path)) { | |
fs.delete(path, false) | |
} | |
val fdos = fs.create(path) | |
val out = new PrintWriter(fdos) | |
try { | |
out.println(value.toJson) | |
} finally { | |
out.close() | |
fdos.close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment