Skip to content

Instantly share code, notes, and snippets.

@roryokane
Created March 17, 2014 19:13
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roryokane/9606238 to your computer and use it in GitHub Desktop.
Save roryokane/9606238 to your computer and use it in GitHub Desktop.
Java generic pipe and filter classes, plus examples
package pipe_foundations.example;
import pipe_foundations.Pipe;
import pipe_foundations.SimpleFilter;
public class ExampleFilter extends SimpleFilter<Integer, String> {
public ExampleFilter(Pipe<Integer> input, Pipe<String> output) {
super(input, output);
}
@Override
protected String transformOne(Integer in) {
String out = Integer.toString(in);
System.out.println("filtered " + Integer.toString(in) + " to " + out);
delayForDebug(100);
return out;
}
}
package pipe_foundations.example;
import pipe_foundations.Generator;
import pipe_foundations.Pipe;
public class ExampleGenerator extends Generator<Integer> {
public ExampleGenerator(Pipe<Integer> output) {
super(output);
}
@Override
public void generateInto(Pipe<Integer> pipe) {
for (int i = 1; i <= 10; i++) {
pipe.put(new Integer(i));
System.out.println("generated " + Integer.toString(i));
delayForDebug(200);
}
pipe.closeForWriting();
System.out.println("generator finished");
}
}
package pipe_foundations.example;
import pipe_foundations.*;
public class ExampleRunner {
public static void main(String[] args) {
// create pipes
final Pipe<Integer> genToFilter = new PipeImpl<Integer>();
final Pipe<String> filterToOut = new PipeImpl<String>();
// create components that use the pipes
final Generator<Integer> generator = new ExampleGenerator(genToFilter);
final Filter<Integer, String> filter = new ExampleFilter(genToFilter, filterToOut);
final Sink<String> sink = new ExampleSink(filterToOut);
// start all components
generator.start();
filter.start();
sink.start();
System.out.println("runner finished");
}
}
package pipe_foundations.example;
import pipe_foundations.Pipe;
import pipe_foundations.Sink;
public class ExampleSink extends Sink<String> {
public ExampleSink(Pipe<String> input) {
super(input);
}
@Override
public void takeFrom(Pipe<String> pipe) {
try {
String in;
while ((in = pipe.nextOrNullIfEmptied()) != null) {
System.out.println(in);
delayForDebug(300);
}
System.out.println("sink finished");
} catch (InterruptedException e) {
System.err.println("interrupted");
e.printStackTrace();
} finally {
System.out.close();
}
}
}
package pipe_foundations;
public abstract class Filter<I, O> extends ThreadedRunner {
protected Pipe<I> input;
protected Pipe<O> output;
public Filter(Pipe<I> input, Pipe<O> output) {
this.input = input;
this.output = output;
}
@Override
public void run() {
transformBetween(input, output);
}
protected abstract void transformBetween(Pipe<I> input, Pipe<O> output);
}
package pipe_foundations;
public abstract class Generator<T> extends ThreadedRunner {
protected Pipe<T> output;
public Generator(Pipe<T> output) {
this.output = output;
}
@Override
public void run() {
generateInto(output);
}
public abstract void generateInto(Pipe<T> pipe);
}
package pipe_foundations;
public interface Pipe<T> {
public boolean put(T obj);
public T nextOrNullIfEmptied() throws InterruptedException;
public void closeForWriting();
}
package pipe_foundations;
import java.io.IOException;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Queue;
// replaceable with BlockingQueue<T>? That doesn't support closing, though
public class PipeImpl<T> implements Pipe<T> {
private Queue<T> buffer = new LinkedList<T>();
private boolean isOpenForWriting = true;
private boolean hasReadLastObject = false;
@Override
public synchronized boolean put(T obj) {
if (!isOpenForWriting) {
throw new RuntimeException(new IOException("pipe is closed; cannot write to it"));
} else if (obj == null) {
throw new IllegalArgumentException("cannot put null in pipe; null is reserved for pipe-empty sentinel value");
}
boolean wasAdded = buffer.add(obj);
notify();
//System.out.println("added to pipe: " + (obj==null?"<null>":obj.toString()));
return wasAdded;
}
@Override
// not using next() and willHaveNext() because a currently-empty pipe might be
// closed after the willHaveNext() check, causing next() to wait forever
// not using an exception because would require consumers to write unidiomatic `while(true)`
// not using an Option because there is no standard Option and reimplementing it is too annoying
public synchronized T nextOrNullIfEmptied() throws InterruptedException {
if (hasReadLastObject) {
throw new NoSuchElementException("pipe is closed and empty; will never contain any further values");
}
while (buffer.isEmpty()) {
wait(); // pipe empty - wait
}
T obj = buffer.remove();
if (obj == null) { // will be null if it's the last element
hasReadLastObject = true;
}
return obj;
}
@Override
public synchronized void closeForWriting() {
isOpenForWriting = false;
buffer.add(null);
notify();
}
}
package pipe_foundations;
public abstract class SimpleFilter<I, O> extends Filter<I, O> {
public SimpleFilter(Pipe<I> input, Pipe<O> output) {
super(input, output);
}
@Override
protected void transformBetween(Pipe<I> input, Pipe<O> output) {
try {
I in;
while ((in = input.nextOrNullIfEmptied()) != null) {
O out = transformOne(in);
output.put(out);
}
} catch (InterruptedException e) {
// TODO handle properly, using advice in http://www.ibm.com/developerworks/java/library/j-jtp05236/
System.err.println("interrupted");
e.printStackTrace();
return;
}
output.closeForWriting();
}
protected abstract O transformOne(I in);
}
package pipe_foundations;
public abstract class SimpleSink<T> extends Sink<T> {
public SimpleSink(Pipe<T> input) {
super(input);
}
@Override
public void takeFrom(Pipe<T> pipe) {
try {
// any required setup can be done in the constructor, so not providing a hook
T in;
while ((in = pipe.nextOrNullIfEmptied()) != null) {
handle(in);
}
} catch (InterruptedException e) {
// TODO handle properly, using advice in http://www.ibm.com/developerworks/java/library/j-jtp05236/
System.err.println("interrupted");
e.printStackTrace();
} finally {
cleanup();
}
}
protected abstract void handle(T in);
/**
* override this if you need to do anything, like closing an opened file
*/
protected void cleanup() {
}
}
package pipe_foundations;
public abstract class Sink<T> extends ThreadedRunner {
protected Pipe<T> input;
public Sink(Pipe<T> input) {
this.input = input;
}
@Override
public void run() {
takeFrom(input);
}
public abstract void takeFrom(Pipe<T> pipe);
}
package pipe_foundations;
public abstract class ThreadedRunner implements Runnable {
private boolean isStarted = false;
@Override
abstract public void run();
public void start(){
if(! isStarted){
isStarted = true;
Thread thread = new Thread(this);
thread.start();
}
}
public void stop(){
isStarted = false;
}
/**
* make your thread sleep so you can confirm that other threads still run in the meantime
* for debugging purposes only
*/
protected void delayForDebug(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@brendalf
Copy link

brendalf commented Jul 8, 2019

How can I execute this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment