Skip to content

Instantly share code, notes, and snippets.

@cs224
Created September 20, 2014 14:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cs224/019382d9ddb760267c97 to your computer and use it in GitHub Desktop.
Save cs224/019382d9ddb760267c97 to your computer and use it in GitHub Desktop.
Reimplementing the Matthias Mann Coroutine and CoIterator on top of Quasar.
package control.structures.continuations.quasarFiberChannelSameThread;
import java.io.Serializable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.Strand.State;
import co.paralleluniverse.strands.SuspendableCallable;
public abstract class CoIterator<E> implements Iterator<E>, Serializable {
private static final long serialVersionUID = 351278561539L;
private final Coroutine co;
private E element;
private boolean hasElement;
protected CoIterator() {
co = new Coroutine((SuspendableCallable<Void>)() -> {run(); return null;});
}
public boolean hasNext() {
while(!hasElement && co.getState() != State.TERMINATED) {
co.run();
}
return hasElement;
}
public E next() {
if(!hasNext()) {
throw new NoSuchElementException();
}
E result = element;
hasElement = false;
element = null;
return result;
}
/**
* Always throws UnsupportedOperationException.
* @throws java.lang.UnsupportedOperationException always
*/
public void remove() throws UnsupportedOperationException {
throw new UnsupportedOperationException("Not supported");
}
/**
* Produces the next value to be returned by the {@link #next} method.
*
* @param element The value that should be returned by {@link #next}
* @throws de.matthiasmann.continuations.SuspendExecution This method will suspend the execution
*/
protected void produce(E element) throws SuspendExecution {
if(hasElement) {
throw new IllegalStateException("hasElement = true");
}
this.element = element;
hasElement = true;
co.yield();
}
/**
* <p>This is the body of the Iterator. This method is executed as a
* {@link Coroutine} to {@link #produce} the values of the Iterator.</p>
*
* <p>Note that this method is suspended each time it calls produce. And if
* the consumer does not consume all values of the Iterator then this
* method does not get the change to finish it's execution. This also
* includes the finally blocks.</p>
*
* <p>This method must only suspend by calling produce. Any other reason
* for suspension will cause a busy loop in the Iterator.</p>
*
* @throws de.matthiasmann.continuations.SuspendExecution
*/
protected abstract void run() throws SuspendExecution;
}
package control.structures.continuations.quasarFiberChannelSameThread;
import static org.junit.Assert.fail;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import co.paralleluniverse.fibers.SuspendExecution;
public class CoIteratorTest {
private static final Logger logger = LoggerFactory.getLogger(CoIteratorTest.class);
@Test
public void test() throws ExecutionException, InterruptedException {
Iterator<String> iter = new TestIterator();
try {
while(iter.hasNext()) {
System.out.println(iter.next());
}
} catch (Exception e) {
String msg = "Unexpected exception thrown.";
logger.error(msg, e);
fail(msg);
}
}
public static class TestIterator extends CoIterator<String> implements Serializable {
private static final long serialVersionUID = 1L;
@Override
protected void run() throws SuspendExecution {
produce("A");
produce("B");
for(int i = 0; i < 4; i++) {
produce("C" + i);
}
produce("D");
produce("E");
}
}
}
package control.structures.continuations.quasarFiberChannelSameThread;
import java.util.concurrent.ExecutorService;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.FiberExecutorScheduler;
import co.paralleluniverse.fibers.FiberScheduler;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableCallable;
import co.paralleluniverse.strands.Strand.State;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
public class Coroutine {
private final Fiber<Void> fiber;
private final Channel<Object> channel;
public Coroutine(SuspendableCallable<Void> proto) {
this(proto, Fiber.DEFAULT_STACK_SIZE);
}
public Coroutine(SuspendableCallable<Void> proto, int stackSize) {
channel = Channels.newChannel(1);
//ExecutorService myExecutor = Executors.newSingleThreadExecutor();
ExecutorService myExecutor = new MyExecutor();
FiberScheduler myFiberScheduler = new FiberExecutorScheduler("my-scheduler", myExecutor);
fiber = new Fiber<>(myFiberScheduler, () -> { channel.receive(); proto.run(); channel.close(); });
//fiber = new Fiber<>(() -> { channel.receive(); proto.run(); channel.close();});
fiber.start();
}
protected void yield() throws SuspendExecution {
try {
channel.receive();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// as long as I am single threaded and as long as I am sticking to the protocol of alternating calls to yield() and run()
// I don't need to throw the SuspendExecution exception here, because this channel should never suspend.
protected void run() {
try {
channel.send("something");
} catch (SuspendExecution | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public State getState() {
return fiber.getState();
}
}
package control.structures.continuations.quasarFiberChannelSameThread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;
public class MyExecutor extends AbstractExecutorService {
@Override
public void shutdown() {
// TODO Auto-generated method stub
}
@Override
public List<Runnable> shutdownNow() {
// TODO Auto-generated method stub
return new ArrayList<Runnable>();
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void execute(Runnable command) {
command.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment