Created
January 17, 2017 11:39
-
-
Save agazzarini/864574e336ba5268e87882d16bbc222b to your computer and use it in GitHub Desktop.
Storm Spout Directory Reader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spt; | |
import java.io.File; | |
import java.nio.file.Files; | |
import java.nio.file.Paths; | |
import java.util.Map; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import org.apache.storm.spout.SpoutOutputCollector; | |
import org.apache.storm.task.TopologyContext; | |
import org.apache.storm.topology.OutputFieldsDeclarer; | |
import org.apache.storm.topology.base.BaseRichSpout; | |
import org.apache.storm.tuple.Fields; | |
import org.apache.storm.tuple.Values; | |
public class DirectoryReader extends BaseRichSpout { | |
static class FileData { | |
File file; | |
long size; | |
FileData(final File file) { | |
this.file = file; | |
this.size = file.length(); | |
} | |
boolean stillHasTheSameSize() { | |
return file.length() == size; | |
} | |
} | |
private static final long serialVersionUID = 8348075212268125112L; | |
private SpoutOutputCollector collector; | |
private Queue<FileData> queue; | |
@SuppressWarnings("rawtypes") | |
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { | |
this.collector = collector; | |
queue = new ConcurrentLinkedQueue<FileData>(); | |
} | |
@Override | |
public void ack(Object msgId) { | |
System.out.println(msgId.getClass()); | |
} | |
public void nextTuple() { | |
final FileData candidate = queue.poll(); | |
if (candidate != null) { | |
if (candidate.stillHasTheSameSize()) { | |
final File workingFile = new File( | |
candidate.file.getParent(), | |
candidate.file.getName() + ".ack"); | |
candidate.file.renameTo(workingFile); | |
collector.emit(new Values(workingFile)); | |
} else { | |
queue.offer(candidate); | |
} | |
} else { | |
try { | |
Files.walk(Paths.get(new File("<dir path>").toURI())) | |
.map(path -> path.toFile()) | |
.filter(File::isFile) | |
.filter(file -> !file.isHidden() && !file.getName().endsWith(".ack")) | |
.limit(100) | |
.forEach(file -> queue.offer(new FileData(file))); | |
Thread.sleep(2000); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public void declareOutputFields(final OutputFieldsDeclarer declarer) { | |
declarer.declare(new Fields("file")); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment