Skip to content

Instantly share code, notes, and snippets.

@GEOFBOT
Created July 17, 2016 13:16
Show Gist options
  • Save GEOFBOT/041d76b47f08919305493f57ebdde0f7 to your computer and use it in GitHub Desktop.
Save GEOFBOT/041d76b47f08919305493f57ebdde0f7 to your computer and use it in GitHub Desktop.
package org.apache.flink;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.com.google.common.io.Files;
import org.apache.flink.util.Collector;
import java.io.*;
import java.util.*;
public class DistCacheTest {
// tests creating and reading from DistributedCache on local machine
// based on DistributedCacheTest
public static final String data
= "machen\n"
+ "zeit\n"
+ "heerscharen\n"
+ "keiner\n"
+ "meine\n";
protected static String textPath;
private static final List<File> tempFiles = new ArrayList<File>();
public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
private static final long serialVersionUID = 1L;
private final Set<String> wordList = new HashSet<>();
@Override
public void open(Configuration conf) throws FileNotFoundException, IOException {
File file = getRuntimeContext().getDistributedCache().getFile("cache_test");
BufferedReader reader = new BufferedReader(new FileReader(file));
String tempString;
while ((tempString = reader.readLine()) != null) {
wordList.add(tempString);
}
reader.close();
}
@Override
public void flatMap(String word, Collector<Tuple1<String>> out) throws Exception {
if (wordList.contains(word)) {
out.collect(new Tuple1<>(word));
}
}
}
//
// Program
//
public static void main(String[] args) throws Exception {
textPath = createTempFile("count.txt", data);
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile(textPath, "cache_test");
env
.readTextFile(textPath)
.flatMap(new WordChecker())
.print();
}
// --------------------------------------------------------------------------------------------
// Temporary File Utilities
// --------------------------------------------------------------------------------------------
public static String createTempFile(String fileName, String contents) throws IOException {
File f = createAndRegisterTempFile(fileName);
Files.write(contents, f, Charsets.UTF_8);
return f.toURI().toString();
}
public static File createAndRegisterTempFile(String fileName) throws IOException {
File baseDir = new File(System.getProperty("java.io.tmpdir"));
File f = new File(baseDir, "-" + fileName);
if (f.exists()) {
deleteRecursively(f);
}
File parentToDelete = f;
while (true) {
File parent = parentToDelete.getParentFile();
if (parent == null) {
throw new IOException("Missed temp dir while traversing parents of a temp file.");
}
if (parent.equals(baseDir)) {
break;
}
parentToDelete = parent;
}
Files.createParentDirs(f);
tempFiles.add(parentToDelete);
return f;
}
private static void deleteRecursively(File f) throws IOException {
if (f.isDirectory()) {
FileUtils.deleteDirectory(f);
} else if (!f.delete()) {
System.err.println("Failed to delete file " + f.getAbsolutePath());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment