Skip to content

Instantly share code, notes, and snippets.

@marmbrus
Last active September 17, 2015 14:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marmbrus/fff0b058f134fa7752fe to your computer and use it in GitHub Desktop.
Save marmbrus/fff0b058f134fa7752fe to your computer and use it in GitHub Desktop.
Spark Hadoop Filesystem Textfile Iterator
import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.GZIPInputStream
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
/**
* Returns an iterator over the lines in the file using the hadoop file system.
*
* Comparison to sparkContext.textFile:
* - Handles large numbers of S3 files with out blocking the driver forever while
* retrieving metadata.
* - Gives access to whole files, which is useful when a single record can span more
* than a line.
* - Doesn't split large text files.
* - Probably missing all kinds of retry logic.
*/
class ReadLinesSafe(fileLocation: String) extends Iterator[String] with Serializable {
val maxLineLength = 10000000
var currentLine: StringBuilder = null
var eof = false
val fs = FileSystem.get(new Configuration())
val path = new Path(fileLocation)
val inputStream = {
try {
if (fileLocation.endsWith("gz"))
new java.util.zip.GZIPInputStream(fs.open(path))
else
fs.open(path)
} catch {
case e: Exception =>
eof = true
null
}
}
val reader = if (!eof) new BufferedReader(new InputStreamReader(inputStream)) else null
var nextChar: Int = _
def readNext(): Unit = {
nextChar = try reader.read() catch {
case e: Exception =>
println("FAILURE")
-1
}
}
def hasNext: Boolean = {
if (eof) {
if (inputStream != null) inputStream.close();
return false
}
if (currentLine != null) {
return true
}
currentLine = new StringBuilder()
readNext()
if (nextChar == -1) {
if (inputStream != null) inputStream.close();
eof = true
}
while (nextChar != -1 && nextChar != '\n') {
if (nextChar != 0 && currentLine.length < maxLineLength)
currentLine.append(nextChar.toChar)
readNext()
}
!eof
}
def next(): String = {
assert(hasNext)
val ret =
if (currentLine.length >= maxLineLength) {
currentLine.toString + "[TRUNCATED]"
} else {
currentLine.toString
}
currentLine = null
ret
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment