Skip to content

Instantly share code, notes, and snippets.

@trK54Ylmz
Created August 9, 2017 08:51
Show Gist options
  • Save trK54Ylmz/e80ee74ef974fbb2951d8c9fb211fcf1 to your computer and use it in GitHub Desktop.
Save trK54Ylmz/e80ee74ef974fbb2951d8c9fb211fcf1 to your computer and use it in GitHub Desktop.
package com.androgeus.utils
import java.net.URI
import com.androgeus.config.AConfig
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
object IOUtils {
/**
* Generates partition columns for given path
*
* @param path the format ready path
* @param config instance of application configuration object
* @tparam T generic class reference of config object
* @return Returns path with partition columns
*/
def formatPath[T <: AConfig](path: String, config: T): String = {
var time: String = null
var hour: String = null
/** invokes time and hour columns using Java reflection API
*
* [[java.beans.PropertyDescriptor]] could be better for the utilization
*
*/
val methods = config.getClass.getDeclaredMethods.map(m => m.getName).toList
// extract time field
if (methods.contains("getTime")) {
time = config.getClass.getMethod("getTime").invoke(config).asInstanceOf[String]
}
// extract hour field
if (methods.contains("getHour")) {
hour = config.getClass.getMethod("getHour").invoke(config).asInstanceOf[String]
}
if (time == null && hour == null) {
throw new RuntimeException("Time and/or hour information must be entered")
}
// sometimes we need only time field
if (hour == null) {
"%s/time=%s".format(path, time)
} else {
"%s/time=%s/hour=%s".format(path, time, hour)
}
}
/**
* Returns count of ORC files
* Please note that this method doesn't
* scan content of files with [[org.apache.hadoop.hive.ql.io.orc.OrcFile]]
*
* @param path the each partition path
* @param conf the hadoop configuration
* @return Returns the count of ORC files
*/
def getNumberOfOrcFiles(path: String, conf: Configuration): Int = {
val parts = path.split("/")
// detects file schema
val prefix = if (path.startsWith("/")) {
// generates hdfs:/[path names]
// hdfs:/user/hadoop/...
"hdfs:/" + parts(1)
} else if (path.startsWith("file:///")) {
// generates file:///[local disc path names]
// file:///home/admin/...
"file:///" + parts(3)
} else {
// generates [schema]://[path names]
// s3://bucket/key/...
parts(0) + "//" + parts(2)
}
// uses hadoop configuration for path operation
val fs = FileSystem.get(new URI(prefix), conf)
val fsPath = new Path(path)
if (fs.exists(fsPath)) {
val status = fs.listStatus(fsPath)
// filtered the ORC files
val filtered = status
.map(_.getPath.getName)
.filter(p => !p.startsWith(".") && !p.startsWith("_"))
// returns the count of filtered parquet files
filtered.length
} else {
-1
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment