Skip to content

Instantly share code, notes, and snippets.

@jfrazee
Last active September 5, 2019 12:28
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save jfrazee/a6421d0b7c1b0ec26c07 to your computer and use it in GitHub Desktop.
Save jfrazee/a6421d0b7c1b0ec26c07 to your computer and use it in GitHub Desktop.
Spark job to read gzip files, ignoring corrupted files
import java.io._
import scala.io._
import java.util.zip._
// Spark
import org.slf4j.Logger
import org.apache.spark.{ SparkConf, SparkContext, Logging }
// Hadoop
import org.apache.hadoop.io.compress.GzipCodec
object FilterBadGzipFiles extends Logging {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val files = sc.binaryFiles(args(0))
val lines =
files.flatMap {
case (path, stream) =>
try {
val is =
if (path.toLowerCase.endsWith(".gz"))
new GZIPInputStream(stream.open)
else
stream.open
try {
Source.fromInputStream(is).getLines.toList
} finally {
try { is.close } catch { case _: Throwable => }
}
} catch {
case e: Throwable =>
log.warn(s"error reading from ${path}: ${e.getMessage}", e)
List.empty[String]
}
}
lines.saveAsTextFile(args(1), classOf[GzipCodec])
}
}
@raj638111
Copy link

Works like a Charm. thanks

@zhangnew
Copy link

zhangnew commented Sep 6, 2017

thanks

@stdnt-xiao
Copy link

your method hava an error where you deal with a big data

you can also use this conf to do it。
.set("spark.files.ignoreCorruptFiles", "true")

@sylvinho81
Copy link

Hi,

That option works, but is there anyway to know the files that are corrupted and just not ignore them?

Thanks,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment