-
-
Save emres/67b4eae86fa92df69f61 to your computer and use it in GitHub Desktop.
Spark Streaming Unit Test Printing Count Words
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.io.Files; | |
import org.apache.spark.streaming.Duration; | |
import org.apache.spark.streaming.api.java.JavaDStream; | |
import org.apache.spark.streaming.api.java.JavaPairDStream; | |
import org.apache.spark.streaming.api.java.JavaStreamingContext; | |
import org.junit.*; | |
import java.io.*; | |
public class StarterAppTest implements Serializable { | |
JavaStreamingContext ssc; | |
File tempDir; | |
@Before | |
public void setUp() { | |
ssc = new JavaStreamingContext("local[2]", "test", new Duration(500)); | |
tempDir = Files.createTempDir(); | |
tempDir.deleteOnExit(); | |
} | |
@After | |
public void tearDown() { | |
ssc.stop(); | |
ssc = null; | |
} | |
@Test | |
public void testInitialization() { | |
Assert.assertNotNull(ssc.sc()); | |
} | |
@Test | |
public void testCountWords() { | |
StarterApp starterApp = new StarterApp(); | |
try { | |
JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath()); | |
JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines); | |
wordCounts.print(); | |
ssc.start(); | |
Thread.sleep(1000); | |
File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt"); | |
PrintWriter writer = new PrintWriter(tmpFile, "UTF-8"); | |
writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin"); | |
writer.close(); | |
Thread.sleep(6000); | |
System.err.println("===== Word Counts ======="); | |
wordCounts.print(); | |
System.err.println("===== Word Counts ======="); | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
} catch (UnsupportedEncodingException e) { | |
e.printStackTrace(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
Assert.assertTrue(true); | |
} | |
} |
I found that the IllegalStateException goes away when I delete the second call to wordCounts.print() (line 59). I am using Apache Spark 2.3.0
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Emre, I landed here from your Stackoverflow question. I'm new to spark streaming and copied your example above. When running with Spark version 2.1.0 I get the error
java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
when the second call towordCounts.print();
is made after the streaming session is started. Do you have the same error, and did you figure out how to fix it?