Skip to content

Instantly share code, notes, and snippets.

@lotkowskim
Created November 25, 2019 11:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lotkowskim/76e8ff265493efd0b2b7175446805a82 to your computer and use it in GitHub Desktop.
Save lotkowskim/76e8ff265493efd0b2b7175446805a82 to your computer and use it in GitHub Desktop.
Proposed InMemoryFileIndex changes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index dc5c2ff927..9e5511edd5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -91,7 +91,12 @@ class InMemoryFileIndex(
val files = listLeafFiles(rootPaths)
cachedLeafFiles =
new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)
- cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)
+ val fullyQualifiedRoots = rootPaths.map(rootPath => rootPath.getFileSystem(hadoopConf).makeQualified(rootPath))
+ cachedLeafDirToChildrenFiles = files.toArray.groupBy { p =>
+ fullyQualifiedRoots.find { r =>
+ p.getPath.toString.startsWith(r.toString)
+ }.get
+ }
cachedPartitionSpec = null
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index bb703ad0da..7ea43dc741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -122,7 +122,7 @@ abstract class PartitioningAwareFileIndex(
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
files.exists(f => isDataPath(f.getPath))
- }.keys.toSeq
+ }.flatMap { case (_, files) => files.map(_.getPath.getParent) }.toSeq.distinct
val caseInsensitiveOptions = CaseInsensitiveMap(parameters)
val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment