Skip to content

Instantly share code, notes, and snippets.

@rnorth
Created October 17, 2013 06:23
Show Gist options
  • Save rnorth/7019921 to your computer and use it in GitHub Desktop.
Save rnorth/7019921 to your computer and use it in GitHub Desktop.
Quick example of using Quasar channels. Must run using the Quasar JVM agent. https://github.com/puniverse/quasar/ Example output: 'Hello 1' -> 'Hello 2' -> 'Hello 3' -> 'Hello 4' -> 'Hello 5' -> 'Hello 6' -> 'Hello 7' -> 'Hello 8' -> -> 'Hello 1' 'Hello 9' -> -> 'Hello 2' 'Hello 10' -> -> 'Hello 3' 'Hello 11' -> -> 'Hello 4' 'Hello 12' -> -> 'He…
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.SuspendableRunnable;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import jsr166e.ForkJoinPool;
import java.util.concurrent.ExecutionException;
import static java.lang.String.format;
import static java.lang.System.out;
/**
* @author richardnorth
*/
public class QuasarChannelsExample {
private static <Message> Channel<Message> newChannel() {
int mailboxSize = 5;
Channels.OverflowPolicy policy = Channels.OverflowPolicy.BLOCK;
boolean singleProducer = true;
boolean singleConsumer = true;
return Channels.newChannel(mailboxSize, policy, singleProducer, singleConsumer);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Channel<String> ch = newChannel();
final ForkJoinPool forkJoinPool = new ForkJoinPool(4, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
final Fiber sender = new Fiber("sender", forkJoinPool, new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
for (int i=1; ; i++) {
Fiber.sleep(200L);
String message = format("Hello %4d", i);
ch.send(message);
out.printf("'%s' ->\n", message);
}
}
}).start();
final Fiber receiver = new Fiber("receiver", forkJoinPool, new SuspendableRunnable() {
@Override
public void run() throws SuspendExecution, InterruptedException {
while (true) {
Fiber.sleep(2000L);
String s = ch.receive();
out.printf(" -> '%s'\n", s);
}
}
}).start();
sender.join();
receiver.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment