Created
September 20, 2014 14:53
-
-
Save cs224/019382d9ddb760267c97 to your computer and use it in GitHub Desktop.
Reimplementing the Matthias Mann Coroutine and CoIterator on top of Quasar.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package control.structures.continuations.quasarFiberChannelSameThread; | |
import java.io.Serializable; | |
import java.util.Iterator; | |
import java.util.NoSuchElementException; | |
import co.paralleluniverse.fibers.SuspendExecution; | |
import co.paralleluniverse.strands.Strand.State; | |
import co.paralleluniverse.strands.SuspendableCallable; | |
public abstract class CoIterator<E> implements Iterator<E>, Serializable { | |
private static final long serialVersionUID = 351278561539L; | |
private final Coroutine co; | |
private E element; | |
private boolean hasElement; | |
protected CoIterator() { | |
co = new Coroutine((SuspendableCallable<Void>)() -> {run(); return null;}); | |
} | |
public boolean hasNext() { | |
while(!hasElement && co.getState() != State.TERMINATED) { | |
co.run(); | |
} | |
return hasElement; | |
} | |
public E next() { | |
if(!hasNext()) { | |
throw new NoSuchElementException(); | |
} | |
E result = element; | |
hasElement = false; | |
element = null; | |
return result; | |
} | |
/** | |
* Always throws UnsupportedOperationException. | |
* @throws java.lang.UnsupportedOperationException always | |
*/ | |
public void remove() throws UnsupportedOperationException { | |
throw new UnsupportedOperationException("Not supported"); | |
} | |
/** | |
* Produces the next value to be returned by the {@link #next} method. | |
* | |
* @param element The value that should be returned by {@link #next} | |
* @throws de.matthiasmann.continuations.SuspendExecution This method will suspend the execution | |
*/ | |
protected void produce(E element) throws SuspendExecution { | |
if(hasElement) { | |
throw new IllegalStateException("hasElement = true"); | |
} | |
this.element = element; | |
hasElement = true; | |
co.yield(); | |
} | |
/** | |
* <p>This is the body of the Iterator. This method is executed as a | |
* {@link Coroutine} to {@link #produce} the values of the Iterator.</p> | |
* | |
* <p>Note that this method is suspended each time it calls produce. And if | |
* the consumer does not consume all values of the Iterator then this | |
* method does not get the change to finish it's execution. This also | |
* includes the finally blocks.</p> | |
* | |
* <p>This method must only suspend by calling produce. Any other reason | |
* for suspension will cause a busy loop in the Iterator.</p> | |
* | |
* @throws de.matthiasmann.continuations.SuspendExecution | |
*/ | |
protected abstract void run() throws SuspendExecution; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package control.structures.continuations.quasarFiberChannelSameThread; | |
import static org.junit.Assert.fail; | |
import java.io.Serializable; | |
import java.util.Iterator; | |
import java.util.concurrent.ExecutionException; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import co.paralleluniverse.fibers.SuspendExecution; | |
public class CoIteratorTest { | |
private static final Logger logger = LoggerFactory.getLogger(CoIteratorTest.class); | |
@Test | |
public void test() throws ExecutionException, InterruptedException { | |
Iterator<String> iter = new TestIterator(); | |
try { | |
while(iter.hasNext()) { | |
System.out.println(iter.next()); | |
} | |
} catch (Exception e) { | |
String msg = "Unexpected exception thrown."; | |
logger.error(msg, e); | |
fail(msg); | |
} | |
} | |
public static class TestIterator extends CoIterator<String> implements Serializable { | |
private static final long serialVersionUID = 1L; | |
@Override | |
protected void run() throws SuspendExecution { | |
produce("A"); | |
produce("B"); | |
for(int i = 0; i < 4; i++) { | |
produce("C" + i); | |
} | |
produce("D"); | |
produce("E"); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package control.structures.continuations.quasarFiberChannelSameThread; | |
import java.util.concurrent.ExecutorService; | |
import co.paralleluniverse.fibers.Fiber; | |
import co.paralleluniverse.fibers.FiberExecutorScheduler; | |
import co.paralleluniverse.fibers.FiberScheduler; | |
import co.paralleluniverse.fibers.SuspendExecution; | |
import co.paralleluniverse.strands.SuspendableCallable; | |
import co.paralleluniverse.strands.Strand.State; | |
import co.paralleluniverse.strands.channels.Channel; | |
import co.paralleluniverse.strands.channels.Channels; | |
public class Coroutine { | |
private final Fiber<Void> fiber; | |
private final Channel<Object> channel; | |
public Coroutine(SuspendableCallable<Void> proto) { | |
this(proto, Fiber.DEFAULT_STACK_SIZE); | |
} | |
public Coroutine(SuspendableCallable<Void> proto, int stackSize) { | |
channel = Channels.newChannel(1); | |
//ExecutorService myExecutor = Executors.newSingleThreadExecutor(); | |
ExecutorService myExecutor = new MyExecutor(); | |
FiberScheduler myFiberScheduler = new FiberExecutorScheduler("my-scheduler", myExecutor); | |
fiber = new Fiber<>(myFiberScheduler, () -> { channel.receive(); proto.run(); channel.close(); }); | |
//fiber = new Fiber<>(() -> { channel.receive(); proto.run(); channel.close();}); | |
fiber.start(); | |
} | |
protected void yield() throws SuspendExecution { | |
try { | |
channel.receive(); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
// as long as I am single threaded and as long as I am sticking to the protocol of alternating calls to yield() and run() | |
// I don't need to throw the SuspendExecution exception here, because this channel should never suspend. | |
protected void run() { | |
try { | |
channel.send("something"); | |
} catch (SuspendExecution | InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
public State getState() { | |
return fiber.getState(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package control.structures.continuations.quasarFiberChannelSameThread; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.AbstractExecutorService; | |
import java.util.concurrent.TimeUnit; | |
public class MyExecutor extends AbstractExecutorService { | |
@Override | |
public void shutdown() { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public List<Runnable> shutdownNow() { | |
// TODO Auto-generated method stub | |
return new ArrayList<Runnable>(); | |
} | |
@Override | |
public boolean isShutdown() { | |
return false; | |
} | |
@Override | |
public boolean isTerminated() { | |
return false; | |
} | |
@Override | |
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { | |
return false; | |
} | |
@Override | |
public void execute(Runnable command) { | |
command.run(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment