Last active
August 29, 2015 14:13
-
-
Save jhu-chang/1ee5b0788c7479414eeb to your computer and use it in GitHub Desktop.
FileDStreamIssue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.spark.streaming | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.StreamingContext._ | |
import org.apache.spark.streaming.dstream._ | |
import org.apache.hadoop.io.{LongWritable, Text} | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat | |
import org.apache.hadoop.fs.Path | |
object FileDStreamTest { | |
def main(args : Array[String]) = { | |
val sparkConf = new SparkConf().setAppName("FileDStreamTest") | |
val ssc = new StreamingContext(sparkConf, Seconds(5)) | |
ssc.checkpoint("checkpoint") | |
//// there is a file in this directory with content like this: | |
/// id001,1 | |
/// id002,2 | |
val directory = """G:\ScalaWorkSpace\FileIssue\customer\""" | |
val filedsteam = ssc.fileStream[LongWritable, Text, TextInputFormat](directory, (x : Path) => true, false).map(_._2.toString) | |
//new FileInputDStream[LongWritable, Text, TextInputFormat](ssc, directory, _ => true, false) | |
/// expect stateful dstream has a initial value (id001,1) and (id002,2) | |
/// the behavior is correct in 1.1, but in 1.2, we get nothing | |
val stateful = filedsteam.map(v => { | |
val as = v.split(",") | |
(as(0),as(1).toInt)}).updateStateByKey( | |
(si : Seq[Int], lv : Option[Int]) => { | |
if (si.isEmpty) | |
lv | |
else | |
Some(si.sum).map(_ + lv.getOrElse(0)) | |
}) | |
stateful.print | |
ssc.start() | |
ssc.awaitTermination() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
------------------------------------1.1------------------------ | |
Here are part of log with spark 1.1 | |
15/01/21 16:10:32 [main] INFO org.apache.spark.streaming.scheduler.JobGenerator: Started JobGenerator at 1421827835000 ms | |
15/01/21 16:10:32 [main] INFO org.apache.spark.streaming.scheduler.JobScheduler: Started JobScheduler | |
15/01/21 16:10:35 [RecurringTimer - JobGenerator] DEBUG org.apache.spark.streaming.util.RecurringTimer: Callback for JobGenerator called at time 1421827835000 | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event GenerateJobs(1421827835000 ms) | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generating jobs for time 1421827835000 ms | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Time 1421827835000 ms is valid | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.StateDStream: Time 1421827830000 ms is invalid as zeroTime is 1421827830000 ms and slideDuration is 5000 ms and difference is 0 ms | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827835000 ms is valid | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827835000 ms is valid | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Time 1421827835000 ms is valid | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Trying to get new files for time 1421827835000 | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Mod time for file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv is 1421826082540 | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Accepted file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 23 ms | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: # cached file times = 1 | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: New files at time 1421827835000 ms: | |
file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.storage.MemoryStore: ensureFreeSpace(164241) called with curMem=0, maxMem=740889722 | |
15/01/21 16:10:35 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.4 KB, free 706.4 MB) | |
----------------------------------------1.2---------------------------------------- | |
Here are part of log with spark 1.2 | |
15/01/21 16:13:10 [RecurringTimer - JobGenerator] DEBUG org.apache.spark.streaming.util.RecurringTimer: Callback for JobGenerator called at time 1421827990000 | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event GenerateJobs(1421827990000 ms) | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generating jobs for time 1421827990000 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Time 1421827990000 ms is valid | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.StateDStream: Time 1421827985000 ms is invalid as zeroTime is 1421827985000 ms and slideDuration is 5000 ms and difference is 0 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827990000 ms is valid | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.MappedDStream: Time 1421827990000 ms is valid | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Time 1421827990000 ms is valid | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: Getting new files for time 1421827990000, ignoring files older than 1421827930000 | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: file:/G:/ScalaWorkSpace/FileIssue/customer/customer.csv ignored as mod time 1421826082540 <= ignore time 1421827930000 | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: Finding new files took 2 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.FileInputDStream: # cached file times = 1 | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.dstream.FileInputDStream: New files at time 1421827990000 ms: | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.dstream.StateDStream: Persisting RDD 4 for time 1421827990000 ms to StorageLevel(false, true, false, false, 1) | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.DStreamGraph: Generated 1 jobs for time 1421827990000 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.scheduler.JobScheduler: Added jobs for time 1421827990000 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] DEBUG org.apache.spark.streaming.scheduler.JobGenerator: Got event DoCheckpoint(1421827990000 ms) | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.scheduler.JobGenerator: Checkpointing graph for time 1421827990000 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-2] INFO org.apache.spark.streaming.DStreamGraph: Updating checkpoint data for time 1421827990000 ms | |
15/01/21 16:13:10 [sparkDriver-akka.actor.default-dispatcher-5] INFO org.apache.spark.streaming.scheduler.JobScheduler: Starting job streaming job 1421827990000 ms.0 from job set of time 1421827990000 ms |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
id001,1 | |
id002,2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment