Created
August 9, 2017 08:51
-
-
Save trK54Ylmz/e80ee74ef974fbb2951d8c9fb211fcf1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.androgeus.utils | |
import java.net.URI | |
import com.androgeus.config.AConfig | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
object IOUtils { | |
/** | |
* Generates partition columns for given path | |
* | |
* @param path the format ready path | |
* @param config instance of application configuration object | |
* @tparam T generic class reference of config object | |
* @return Returns path with partition columns | |
*/ | |
def formatPath[T <: AConfig](path: String, config: T): String = { | |
var time: String = null | |
var hour: String = null | |
/** invokes time and hour columns using Java reflection API | |
* | |
* [[java.beans.PropertyDescriptor]] could be better for the utilization | |
* | |
*/ | |
val methods = config.getClass.getDeclaredMethods.map(m => m.getName).toList | |
// extract time field | |
if (methods.contains("getTime")) { | |
time = config.getClass.getMethod("getTime").invoke(config).asInstanceOf[String] | |
} | |
// extract hour field | |
if (methods.contains("getHour")) { | |
hour = config.getClass.getMethod("getHour").invoke(config).asInstanceOf[String] | |
} | |
if (time == null && hour == null) { | |
throw new RuntimeException("Time and/or hour information must be entered") | |
} | |
// sometimes we need only time field | |
if (hour == null) { | |
"%s/time=%s".format(path, time) | |
} else { | |
"%s/time=%s/hour=%s".format(path, time, hour) | |
} | |
} | |
/** | |
* Returns count of ORC files | |
* Please note that this method doesn't | |
* scan content of files with [[org.apache.hadoop.hive.ql.io.orc.OrcFile]] | |
* | |
* @param path the each partition path | |
* @param conf the hadoop configuration | |
* @return Returns the count of ORC files | |
*/ | |
def getNumberOfOrcFiles(path: String, conf: Configuration): Int = { | |
val parts = path.split("/") | |
// detects file schema | |
val prefix = if (path.startsWith("/")) { | |
// generates hdfs:/[path names] | |
// hdfs:/user/hadoop/... | |
"hdfs:/" + parts(1) | |
} else if (path.startsWith("file:///")) { | |
// generates file:///[local disc path names] | |
// file:///home/admin/... | |
"file:///" + parts(3) | |
} else { | |
// generates [schema]://[path names] | |
// s3://bucket/key/... | |
parts(0) + "//" + parts(2) | |
} | |
// uses hadoop configuration for path operation | |
val fs = FileSystem.get(new URI(prefix), conf) | |
val fsPath = new Path(path) | |
if (fs.exists(fsPath)) { | |
val status = fs.listStatus(fsPath) | |
// filtered the ORC files | |
val filtered = status | |
.map(_.getPath.getName) | |
.filter(p => !p.startsWith(".") && !p.startsWith("_")) | |
// returns the count of filtered parquet files | |
filtered.length | |
} else { | |
-1 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment