Skip to content

Instantly share code, notes, and snippets.

@agazzarini
Created January 17, 2017 11:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save agazzarini/864574e336ba5268e87882d16bbc222b to your computer and use it in GitHub Desktop.
Save agazzarini/864574e336ba5268e87882d16bbc222b to your computer and use it in GitHub Desktop.
Storm Spout Directory Reader
package spt;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class DirectoryReader extends BaseRichSpout {
static class FileData {
File file;
long size;
FileData(final File file) {
this.file = file;
this.size = file.length();
}
boolean stillHasTheSameSize() {
return file.length() == size;
}
}
private static final long serialVersionUID = 8348075212268125112L;
private SpoutOutputCollector collector;
private Queue<FileData> queue;
@SuppressWarnings("rawtypes")
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
this.collector = collector;
queue = new ConcurrentLinkedQueue<FileData>();
}
@Override
public void ack(Object msgId) {
System.out.println(msgId.getClass());
}
public void nextTuple() {
final FileData candidate = queue.poll();
if (candidate != null) {
if (candidate.stillHasTheSameSize()) {
final File workingFile = new File(
candidate.file.getParent(),
candidate.file.getName() + ".ack");
candidate.file.renameTo(workingFile);
collector.emit(new Values(workingFile));
} else {
queue.offer(candidate);
}
} else {
try {
Files.walk(Paths.get(new File("<dir path>").toURI()))
.map(path -> path.toFile())
.filter(File::isFile)
.filter(file -> !file.isHidden() && !file.getName().endsWith(".ack"))
.limit(100)
.forEach(file -> queue.offer(new FileData(file)));
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("file"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment