Skip to content

Instantly share code, notes, and snippets.

@Arnauld
Created May 15, 2014 20:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Arnauld/e0dd490671a46694ce92 to your computer and use it in GitHub Desktop.
Save Arnauld/e0dd490671a46694ce92 to your computer and use it in GitHub Desktop.
package sequencer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* @author <a href="http://twitter.com/aloyer">@aloyer</a>
*/
public class Sequencer {
private final ExecutionUnit executionUnit;
private final AtomicBoolean shutdown = new AtomicBoolean();
public Sequencer(Supplier<ExecutorService> executor) {
this.executionUnit = new ExecutionUnit(executor);
}
public void dispatch(Runnable runnable) {
executionUnit.dispatch(runnable);
}
private static class ExecutionUnit implements Runnable {
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
private final Supplier<ExecutorService> executor;
private volatile boolean isRunning = false;
public ExecutionUnit(Supplier<ExecutorService> executor) {
this.executor = executor;
}
public synchronized void dispatch(Runnable runnable) {
queue.add(runnable);
if (!isRunning) {
dispatchNext();
}
}
public synchronized void taskDone() {
isRunning = false;
dispatchNext();
}
private synchronized void dispatchNext() {
if (queue.isEmpty())
return;
isRunning = true;
executor.get().submit(this);
}
private void executeNextTask() {
try {
Runnable runnable = queue.poll();
if (runnable != null)
runnable.run();
} catch (Exception ie) {
exceptionPolicyHandle(ie);
}
}
@Override
public void run() {
try {
executeNextTask();
} catch (Exception ex) {
exceptionPolicyHandle(ex);
} finally {
taskDone();
}
}
private void exceptionPolicyHandle(Exception ex) {
// ignore and continue to next task ?
}
}
}
@Arnauld
Copy link
Author

Arnauld commented May 15, 2014

Basic testing, nbThreads=4

package sequencer;

import org.junit.Test;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

public class SequencerTest {

    private static Random random = new Random(17L);
    private static volatile Exception raised;

    @Test
    public void exec_single_sequencer() {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        final CyclicBarrier barrier = new CyclicBarrier(2);

        SeqContext context = new SeqContext(executor);
        context.dispatchRandomTasks(500);
        context.dispatch(() -> await(barrier));

        await(barrier);
        assertThat(raised).isNull();
        assertThat(context.seq()).isEqualTo(499);
    }

    @Test
    public void exec_4_sequencers() {
        int nbThreads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(nbThreads);

        final CyclicBarrier begBarrier = new CyclicBarrier(nbThreads + 1);
        final CyclicBarrier endBarrier = new CyclicBarrier(nbThreads + 1);

        SeqContext[] contexts = new SeqContext[nbThreads];
        for(int i=0;i<nbThreads;i++) {
            SeqContext context = new SeqContext(executor);
            context.dispatch(() -> await(begBarrier));
            context.dispatchRandomTasks(500);
            context.dispatch(() -> await(endBarrier));

            contexts[i] = context;
        }

        await(begBarrier);
        await(endBarrier);
        assertThat(raised).isNull();
        for(int i=0;i<nbThreads;i++) {
            assertThat(contexts[i].seq()).isEqualTo(499);
        }
    }

    private static void simulateWork(AtomicInteger seq, int value) {
        if (!seq.compareAndSet(value - 1, value)) {
            System.err.println("..." + value);
            raised = new RuntimeException();
        }
        int millis = random.nextInt(10);
        doSleep(millis);
    }

    private static void doSleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static void await(CyclicBarrier barrier) {
        try {
            barrier.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static class SeqContext {

        private Sequencer sequencer;
        private AtomicInteger seq = new AtomicInteger(-1);

        public SeqContext(ExecutorService executor) {
            sequencer = new Sequencer(() -> executor);
        }

        public void dispatchRandomTasks(int nb) {
            for (int i = 0; i < nb; i++) {
                final int ref = i;
                sequencer.dispatch(() -> simulateWork(seq, ref));
            }
        }

        public void dispatch(Runnable runnable) {
            sequencer.dispatch(runnable);
        }

        public int seq() {
            return seq.get();
        }
    }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment