Skip to content

Instantly share code, notes, and snippets.

@khotyn
Created October 18, 2012 14:00
Show Gist options
  • Save khotyn/3912000 to your computer and use it in GitHub Desktop.
Save khotyn/3912000 to your computer and use it in GitHub Desktop.
一个无限容量的用于 Work Steeling 的 DEQueue 的实现
package com.khotyn.test;
/**
* Created with IntelliJ IDEA.
* User: khotyn
* Date: 12-10-18
* Time: 下午9:14
* To change this template use File | Settings | File Templates.
*/
public class CircularArray {
private int logCapacity;
private Runnable[] currentTasks;
CircularArray(int myLogCapacity) {
logCapacity = myLogCapacity;
currentTasks = new Runnable[1 << logCapacity];
}
int capacity() {
return 1 << logCapacity;
}
Runnable get(int i) {
return currentTasks[i % capacity()];
}
void put(int i, Runnable task) {
currentTasks[i % capacity()] = task;
}
CircularArray resize(int bottom, int top) {
CircularArray newTasks = new CircularArray(logCapacity);
for(int i = top; i < bottom; i++) {
newTasks.put(i, get(i));
}
return newTasks;
}
}
package com.khotyn.test;
import java.util.concurrent.atomic.AtomicReference;
/**
* User: khotyn
* Date: 12-10-18
* Time: 下午9:18
* 一个无限容量的用于 Work Steeling 的 DEQueue 的实现,代码来自于 The Art of Multiprocessor Programming.
*/
public class UnboundedDEQueue {
private final static int LOG_CAPACITY = 4;
private volatile CircularArray tasks;
volatile int bottom;
AtomicReference<Integer> top;
public UnboundedDEQueue(int LOG_CAPACITY) {
tasks = new CircularArray(LOG_CAPACITY);
top = new AtomicReference<Integer>(0);
bottom = 0;
}
boolean isEmpty() {
int localTop = top.get();
int localBottom = bottom;
return localBottom <= localTop;
}
public void pushBottom(Runnable r) {
int oldBottom = bottom;
int oldTop = top.get();
CircularArray currentTasks = tasks;
int size = oldBottom - oldTop;
if (size >= currentTasks.capacity() - 1) { // Bottom 快领先 Top 一圈,可以马上进行扩容了。
currentTasks = currentTasks.resize(oldBottom, oldTop);
tasks = currentTasks;
}
tasks.put(oldBottom, r);
bottom = oldBottom + 1;
}
public Runnable popTop() {
int oldTop = top.get();
int newTop = oldTop + 1;
int oldBottom = bottom;
CircularArray currentTasks = tasks; // 为什么加上这么一句?这个变量是 volatile 的,是为了在这里放一个屏障吗?前后的顺序不会乱掉?
int size = oldBottom - oldTop;
if (size <= 0) { // size 小于 0,DEQueue 已经空了,直接返回 null
return null;
}
Runnable r = tasks.get(oldTop);
if (top.compareAndSet(oldTop, newTop)) {
return r;
}
return null; // CAS 操作失败,直接返回 null
}
public Runnable popBottom() {
CircularArray currentTasks = tasks;
bottom--; // bottom 先减掉,上面的 popTop 方法如果正在执行,那么就马上可以看见了。
int oldTop = top.get();
int newTop = oldTop + 1;
int size = bottom - oldTop;
if (size < 0) { // bottom 比 oldTop 小,DEQueue 已经空了,直接返回 null
bottom = oldTop;
return null;
}
Runnable r = tasks.get(bottom);
if (size > 0) { // size 大于 0,表示 bottom 和 top 的距离还比较远,可以放心操作,直接返回 r
return r;
}
if (!top.compareAndSet(oldTop, newTop)) { // size 等于 0 的情况,top 和 bottom 跑到一块儿去了,用 CAS 操作保证安全
r = null;
}
bottom = oldTop + 1; // 无论上面的 CAS 操作是否成功(竞争的双方总有一个人成功了,这意味着 top 增加了 1,bottom 已经比 top 小 1,所以这里把它设置成和 top 一样大小。)
return r;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment