Skip to content

Instantly share code, notes, and snippets.

@emres
Last active May 8, 2018 12:16
Show Gist options
  • Save emres/67b4eae86fa92df69f61 to your computer and use it in GitHub Desktop.
Save emres/67b4eae86fa92df69f61 to your computer and use it in GitHub Desktop.
Spark Streaming Unit Test Printing Count Words
import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.*;
import java.io.*;
public class StarterAppTest implements Serializable {
JavaStreamingContext ssc;
File tempDir;
@Before
public void setUp() {
ssc = new JavaStreamingContext("local[2]", "test", new Duration(500));
tempDir = Files.createTempDir();
tempDir.deleteOnExit();
}
@After
public void tearDown() {
ssc.stop();
ssc = null;
}
@Test
public void testInitialization() {
Assert.assertNotNull(ssc.sc());
}
@Test
public void testCountWords() {
StarterApp starterApp = new StarterApp();
try {
JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);
wordCounts.print();
ssc.start();
Thread.sleep(1000);
File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
writer.close();
Thread.sleep(6000);
System.err.println("===== Word Counts =======");
wordCounts.print();
System.err.println("===== Word Counts =======");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
Assert.assertTrue(true);
}
}
@bjacques
Copy link

bjacques commented Mar 3, 2017

Hi Emre, I landed here from your Stackoverflow question. I'm new to spark streaming and copied your example above. When running with Spark version 2.1.0 I get the error java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported when the second call to wordCounts.print(); is made after the streaming session is started. Do you have the same error, and did you figure out how to fix it?

@mlos100
Copy link

mlos100 commented May 8, 2018

I found that the IllegalStateException goes away when I delete the second call to wordCounts.print() (line 59). I am using Apache Spark 2.3.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment