Created
March 17, 2014 19:13
-
-
Save roryokane/9606238 to your computer and use it in GitHub Desktop.
Java generic pipe and filter classes, plus examples
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations.example; | |
import pipe_foundations.Pipe; | |
import pipe_foundations.SimpleFilter; | |
public class ExampleFilter extends SimpleFilter<Integer, String> { | |
public ExampleFilter(Pipe<Integer> input, Pipe<String> output) { | |
super(input, output); | |
} | |
@Override | |
protected String transformOne(Integer in) { | |
String out = Integer.toString(in); | |
System.out.println("filtered " + Integer.toString(in) + " to " + out); | |
delayForDebug(100); | |
return out; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations.example; | |
import pipe_foundations.Generator; | |
import pipe_foundations.Pipe; | |
public class ExampleGenerator extends Generator<Integer> { | |
public ExampleGenerator(Pipe<Integer> output) { | |
super(output); | |
} | |
@Override | |
public void generateInto(Pipe<Integer> pipe) { | |
for (int i = 1; i <= 10; i++) { | |
pipe.put(new Integer(i)); | |
System.out.println("generated " + Integer.toString(i)); | |
delayForDebug(200); | |
} | |
pipe.closeForWriting(); | |
System.out.println("generator finished"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations.example; | |
import pipe_foundations.*; | |
public class ExampleRunner { | |
public static void main(String[] args) { | |
// create pipes | |
final Pipe<Integer> genToFilter = new PipeImpl<Integer>(); | |
final Pipe<String> filterToOut = new PipeImpl<String>(); | |
// create components that use the pipes | |
final Generator<Integer> generator = new ExampleGenerator(genToFilter); | |
final Filter<Integer, String> filter = new ExampleFilter(genToFilter, filterToOut); | |
final Sink<String> sink = new ExampleSink(filterToOut); | |
// start all components | |
generator.start(); | |
filter.start(); | |
sink.start(); | |
System.out.println("runner finished"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations.example; | |
import pipe_foundations.Pipe; | |
import pipe_foundations.Sink; | |
public class ExampleSink extends Sink<String> { | |
public ExampleSink(Pipe<String> input) { | |
super(input); | |
} | |
@Override | |
public void takeFrom(Pipe<String> pipe) { | |
try { | |
String in; | |
while ((in = pipe.nextOrNullIfEmptied()) != null) { | |
System.out.println(in); | |
delayForDebug(300); | |
} | |
System.out.println("sink finished"); | |
} catch (InterruptedException e) { | |
System.err.println("interrupted"); | |
e.printStackTrace(); | |
} finally { | |
System.out.close(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class Filter<I, O> extends ThreadedRunner { | |
protected Pipe<I> input; | |
protected Pipe<O> output; | |
public Filter(Pipe<I> input, Pipe<O> output) { | |
this.input = input; | |
this.output = output; | |
} | |
@Override | |
public void run() { | |
transformBetween(input, output); | |
} | |
protected abstract void transformBetween(Pipe<I> input, Pipe<O> output); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class Generator<T> extends ThreadedRunner { | |
protected Pipe<T> output; | |
public Generator(Pipe<T> output) { | |
this.output = output; | |
} | |
@Override | |
public void run() { | |
generateInto(output); | |
} | |
public abstract void generateInto(Pipe<T> pipe); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public interface Pipe<T> { | |
public boolean put(T obj); | |
public T nextOrNullIfEmptied() throws InterruptedException; | |
public void closeForWriting(); | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
import java.io.IOException; | |
import java.util.LinkedList; | |
import java.util.NoSuchElementException; | |
import java.util.Queue; | |
// replaceable with BlockingQueue<T>? That doesn't support closing, though | |
public class PipeImpl<T> implements Pipe<T> { | |
private Queue<T> buffer = new LinkedList<T>(); | |
private boolean isOpenForWriting = true; | |
private boolean hasReadLastObject = false; | |
@Override | |
public synchronized boolean put(T obj) { | |
if (!isOpenForWriting) { | |
throw new RuntimeException(new IOException("pipe is closed; cannot write to it")); | |
} else if (obj == null) { | |
throw new IllegalArgumentException("cannot put null in pipe; null is reserved for pipe-empty sentinel value"); | |
} | |
boolean wasAdded = buffer.add(obj); | |
notify(); | |
//System.out.println("added to pipe: " + (obj==null?"<null>":obj.toString())); | |
return wasAdded; | |
} | |
@Override | |
// not using next() and willHaveNext() because a currently-empty pipe might be | |
// closed after the willHaveNext() check, causing next() to wait forever | |
// not using an exception because would require consumers to write unidiomatic `while(true)` | |
// not using an Option because there is no standard Option and reimplementing it is too annoying | |
public synchronized T nextOrNullIfEmptied() throws InterruptedException { | |
if (hasReadLastObject) { | |
throw new NoSuchElementException("pipe is closed and empty; will never contain any further values"); | |
} | |
while (buffer.isEmpty()) { | |
wait(); // pipe empty - wait | |
} | |
T obj = buffer.remove(); | |
if (obj == null) { // will be null if it's the last element | |
hasReadLastObject = true; | |
} | |
return obj; | |
} | |
@Override | |
public synchronized void closeForWriting() { | |
isOpenForWriting = false; | |
buffer.add(null); | |
notify(); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class SimpleFilter<I, O> extends Filter<I, O> { | |
public SimpleFilter(Pipe<I> input, Pipe<O> output) { | |
super(input, output); | |
} | |
@Override | |
protected void transformBetween(Pipe<I> input, Pipe<O> output) { | |
try { | |
I in; | |
while ((in = input.nextOrNullIfEmptied()) != null) { | |
O out = transformOne(in); | |
output.put(out); | |
} | |
} catch (InterruptedException e) { | |
// TODO handle properly, using advice in http://www.ibm.com/developerworks/java/library/j-jtp05236/ | |
System.err.println("interrupted"); | |
e.printStackTrace(); | |
return; | |
} | |
output.closeForWriting(); | |
} | |
protected abstract O transformOne(I in); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class SimpleSink<T> extends Sink<T> { | |
public SimpleSink(Pipe<T> input) { | |
super(input); | |
} | |
@Override | |
public void takeFrom(Pipe<T> pipe) { | |
try { | |
// any required setup can be done in the constructor, so not providing a hook | |
T in; | |
while ((in = pipe.nextOrNullIfEmptied()) != null) { | |
handle(in); | |
} | |
} catch (InterruptedException e) { | |
// TODO handle properly, using advice in http://www.ibm.com/developerworks/java/library/j-jtp05236/ | |
System.err.println("interrupted"); | |
e.printStackTrace(); | |
} finally { | |
cleanup(); | |
} | |
} | |
protected abstract void handle(T in); | |
/** | |
* override this if you need to do anything, like closing an opened file | |
*/ | |
protected void cleanup() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class Sink<T> extends ThreadedRunner { | |
protected Pipe<T> input; | |
public Sink(Pipe<T> input) { | |
this.input = input; | |
} | |
@Override | |
public void run() { | |
takeFrom(input); | |
} | |
public abstract void takeFrom(Pipe<T> pipe); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipe_foundations; | |
public abstract class ThreadedRunner implements Runnable { | |
private boolean isStarted = false; | |
@Override | |
abstract public void run(); | |
public void start(){ | |
if(! isStarted){ | |
isStarted = true; | |
Thread thread = new Thread(this); | |
thread.start(); | |
} | |
} | |
public void stop(){ | |
isStarted = false; | |
} | |
/** | |
* make your thread sleep so you can confirm that other threads still run in the meantime | |
* for debugging purposes only | |
*/ | |
protected void delayForDebug(long millis) { | |
try { | |
Thread.sleep(millis); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How can I execute this?