Skip to content

Instantly share code, notes, and snippets.

@yashk
Last active April 20, 2023 18:08
Show Gist options
  • Save yashk/ed11fc712fece0c259a018e2a26c23b7 to your computer and use it in GitHub Desktop.
Save yashk/ed11fc712fece0c259a018e2a26c23b7 to your computer and use it in GitHub Desktop.
spark underscore
// https://github.com/apache/spark/blob/01dc1cb491aefd30ffde6b01416175e2ac1b881a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L765
if (checkFilesExist) {
val (filteredOut, filteredIn) = allPaths.partition { path =>
HadoopFSUtils.shouldFilterOutPathName(path.getName)
}
if (filteredIn.isEmpty) {
logWarning(
s"All paths were ignored:\n ${filteredOut.mkString("\n ")}")
} else {
logDebug(
s"Some paths were ignored:\n ${filteredOut.mkString("\n ")}")
}
}
allPaths
}
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L299
/** Checks if we should filter out this path name. */
def shouldFilterOutPathName(pathName: String): Boolean = {
// We filter follow paths:
// 1. everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
// should skip this file in case of double reading.
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
pathName.startsWith(".") || pathName.endsWith("._COPYING_")
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
exclude && !include
}
}
scala> val hadoopConf = spark.sparkContext.hadoopConfiguration
hadoopConf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
scala> val className = FileInputFormat.PATHFILTER_CLASS
className: String = mapreduce.input.pathFilter.class
scala> spark.sparkContext.hadoopConfiguration.setClass(FileInputFormat.PATHFILTER_CLASS, classOf[underscore.TmpFileFilter], classOf[org.apache.hadoop.fs.PathFilter])
scala> val t1 = spark.read.textFile("hdfs://hdfs/tmp/yash/underscore/_test.txt")
23/04/20 17:34:25 WARN DataSource: All paths were ignored:
hdfs://hdfs/tmp/yash/underscore/_test.txt
t1: org.apache.spark.sql.Dataset[String] = [value: string]
scala> val test = spark.read.text("hdfs://hdfs/tmp/yash/underscore/_test.txt")
23/04/20 17:53:50 WARN DataSource: All paths were ignored:
hdfs://hdfs/tmp/yash/underscore/_test.txt
test: org.apache.spark.sql.DataFrame = [value: string]
scala> test.show(false)
+-----+
|value|
+-----+
+-----+
scala>
package underscore
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.spark.sql.SparkSession
class TmpFileFilter extends PathFilter {
override def accept(path: Path): Boolean = {
println(path)
true
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment