Skip to content

Instantly share code, notes, and snippets.

@jhu-chang
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhu-chang/1ee5b0788c7479414eeb to your computer and use it in GitHub Desktop.
Save jhu-chang/1ee5b0788c7479414eeb to your computer and use it in GitHub Desktop.
FileDStreamIssue
package org.apache.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream._
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
object FileDStreamTest {
def main(args : Array[String]) = {
val sparkConf = new SparkConf().setAppName("FileDStreamTest")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
//// there is a file in this directory with content like this:
/// id001,1
/// id002,2
val directory = """G:\ScalaWorkSpace\FileIssue\customer\"""
val filedsteam = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (x : Path) => true, false).map(_._2.toString)
//new FileInputDStream[LongWritable, Text, TextInputFormat](ssc, directory, _ => true, false)
/// expect stateful dstream has a initial value (id001,1) and (id002,2)
/// the behavior is correct in 1.1, but in 1.2, we get nothing
val stateful = filedsteam.map(v => {
val as = v.split(",")
(as(0),as(1).toInt)}).updateStateByKey(
(si : Seq[Int], lv : Option[Int]) => {
if (si.isEmpty)
lv
else
Some(si.sum).map(_ + lv.getOrElse(0))
})
stateful.print
ssc.start()
ssc.awaitTermination()
}
}
------------------------------------1.1------------------------
Here are part of log with spark 1.1
15/01/21 16:10:32 [main] INFO org.apache.spark.streaming.scheduler.JobGenerator: Started JobGenerator at 1421827835000 ms
15/01/21 16:10:32 [main] INFO org.apache.spark.streaming.scheduler.JobScheduler: Started JobScheduler
15/01/21 16:10:35 [RecurringTimer - JobGenerator] DEBUG org.apache.spark.streaming.util.RecurringTimer: Callback for JobGenerator called at time 1421827835000
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event GenerateJobs(1421827835000 ms)
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generating jobs for time 1421827835000 ms
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Time 1421827835000 ms is valid
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.StateDStream: Time 1421827830000 ms is invalid as zeroTime is 1421827830000 ms and slideDuration is 5000 ms and difference is 0 ms
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827835000 ms is valid
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827835000 ms is valid
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Time 1421827835000 ms is valid
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Trying to get new files for time 1421827835000
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Mod time for file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv is 1421826082540
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Accepted file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 23 ms
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: # cached file times = 1
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: New files at time 1421827835000 ms:
file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.storage.MemoryStore: ensureFreeSpace(164241) called with curMem=0, maxMem=740889722
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.4 KB, free 706.4 MB)
----------------------------------------1.2----------------------------------------
Here are part of log with spark 1.2
15/01/21 16:13:10 [RecurringTimer - JobGenerator] DEBUG org.apache.spark.streaming.util.RecurringTimer: Callback for JobGenerator called at time 1421827990000
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event GenerateJobs(1421827990000 ms)
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generating jobs for time 1421827990000 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Time 1421827990000 ms is valid
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.StateDStream: Time 1421827985000 ms is invalid as zeroTime is 1421827985000 ms and slideDuration is 5000 ms and difference is 0 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827990000 ms is valid
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827990000 ms is valid
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Time 1421827990000 ms is valid
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Getting new files for time 1421827990000, ignoring files older than 1421827930000
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv ignored as mod time 1421826082540 <= ignore time 1421827930000
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 2 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: # cached file times = 1
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: New files at time 1421827990000 ms:
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Persisting RDD 4 for time 1421827990000 ms to StorageLevel(false, true, false, false, 1)
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generated 1 jobs for time 1421827990000 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 1421827990000 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event DoCheckpoint(1421827990000 ms)
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.scheduler.JobGenerator: Checkpointing graph for time 1421827990000 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.DStreamGraph: Updating checkpoint data for time 1421827990000 ms
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-5] INFO org.apache.spark.streaming.scheduler.JobScheduler: Starting job streaming job 1421827990000 ms.0 from job set of time 1421827990000 ms
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment