-
-
Save alsritter/7097879a8e0d53172072432f6a38b73d to your computer and use it in GitHub Desktop.
自己写一个简单的 AQS 同步队列
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @author alsritter | |
* @version 1.0 | |
**/ | |
public class MySimpleAQS { | |
// 因为是共享的变量,所以要保证可见性 | |
private volatile Node head; /* 表示头元素 */ | |
private volatile Node tail; /* 表示尾元素 */ | |
private volatile int status; /* 资源状态(一把锁)1:表示锁被占用 0:表示锁空闲 */ | |
private Thread exclusiveOwnerThread; /* 当前占有锁的线程对象 */ | |
public Thread getExclusiveOwnerThread() { | |
return exclusiveOwnerThread; | |
} | |
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) { | |
this.exclusiveOwnerThread = exclusiveOwnerThread; | |
} | |
public int getStatus() { | |
return status; | |
} | |
public void setStatus(int status) { | |
this.status = status; | |
} | |
private static Unsafe unsafe; | |
private static long statusOffset; /* 记录资源状态的偏移量 */ | |
private static long tailOffset; /* 记录队尾元素的偏移量 */ | |
private static long headOffset; /* 记录队头元素的偏移量 */ | |
private static long threadStatusOffset; /* 记录 Node 状态的偏移量 */ | |
static { | |
// 因为这个 theUnsafe 是用的 private 修饰的,所以需要使用 setAccessible 打开 | |
// 取得静态对象是用 null | |
try { | |
Field field = Unsafe.class.getDeclaredField("theUnsafe"); | |
field.setAccessible(true); | |
unsafe = (Unsafe) field.get(null); | |
statusOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("status")); | |
tailOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("tail")); | |
headOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("head")); | |
threadStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("threadStatus")); | |
} catch (IllegalAccessException | NoSuchFieldException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* 释放锁 | |
* | |
* @param arg 理论上传入的参数都是 1 | |
*/ | |
public void release(int arg) { | |
if (tryRelease(arg)) { // 如果尝试释放成功,则唤醒下一个线程 | |
// 唤醒下一个线程 | |
unparkSuccessor(head); | |
} | |
} | |
/** | |
* 唤醒 head 的下一个线程 | |
* | |
* @param head 头 | |
*/ | |
private void unparkSuccessor(Node head) { | |
// 修改当前锁的状态为 0(释放锁) | |
compareAndSetState(getStatus(), 0); | |
// 获取 head 元素的下一个元素 | |
// 下一个元素是 | |
Node next = null; // 这是要唤醒的元素 | |
if (head != null) { // 健壮性判断 | |
next = head.next; | |
} | |
// 如果下一个元素是 null,或者下一个元素的状态不是 wait,则从列表的最后一个元素开始往前找 | |
// 找到最后一个状态不是 CANCELLED(取消)的作为 next | |
if (next == null || next.threadStatus != Node.WAIT) { | |
for (Node t = tail; t != null && t != head; t = t.prev) { | |
if (t.threadStatus != Node.CANCELLED) { | |
next = t; | |
} | |
} | |
} | |
// 如果 next 元素找到了,则直接唤醒 | |
if (next != null) { | |
// 唤醒线程 | |
unsafe.unpark(next.thread); | |
System.out.println("唤醒了线程:" + Thread.currentThread().getName()); | |
// 修改状态 | |
compareAndSetNodeStatus(next,next.threadStatus,Node.DEFAULT); | |
} | |
} | |
/** | |
* 释放锁 | |
* | |
* @param arg 一般传入的都是 1 | |
* @return 是否释放成功 | |
*/ | |
private boolean tryRelease(int arg) { | |
// 计算状态 | |
int c = getStatus() - arg; // 1 | |
// 判断释放锁的线程是否是当前占用锁的线程 | |
if (Thread.currentThread() != getExclusiveOwnerThread()) { | |
throw new RuntimeException("释放锁的线程并非当前占用锁的线程"); | |
} | |
if (c == 0) { | |
// 是要释放锁,先将当前持有锁的线程对象置空 | |
setExclusiveOwnerThread(null); | |
System.out.println(Thread.currentThread().getName() + ":释放了锁"); | |
return true; | |
} | |
// 将新的状态设置回去 | |
setStatus(c); | |
return false; | |
} | |
/** | |
* 获取锁 | |
* | |
* @param arg | |
*/ | |
public final void acquire(int arg) { | |
/* | |
1. 注意看这里把 tryAcquire 放在第一位,当他返回 true 表示获取锁成功,就无需执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断) | |
2. 当他返回 false 说明锁被占用了,就需要执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断) | |
*/ | |
if (!tryAcquire(arg) | |
&& acquireQueued(addWaiter(), arg)) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
/** | |
* 判断传入的 Node 是否需要排队,如果要排队,则开始排队,如果不需要则直接获取锁 | |
* <p> | |
* 这里之所以使用 boolean 返回类型的原因就是因为可能在刚插入队列这会线程就把锁释放了,无需等待 | |
* | |
* @param node addWaiter 入队的那个 Node | |
* @param arg | |
* @return 如果排队成功(线程开始等待)返回 true,如果返回 false 表示就在丢进队列这会,线程把锁释放了,无需等待 | |
*/ | |
private boolean acquireQueued(Node node, int arg) { | |
// 自旋操作 | |
while (true) { | |
// 判断上一个元素是否是 head 元素 | |
if (node.prev == head && tryAcquire(arg)) { | |
// 如果上个元素是 head 元素,则尝试获取锁,如果获取成功 | |
setHead(node); // 将当前元素设置为头元素,之前的头元素就被排除在外(被 GC 回收了) | |
// 修改当前 node 的状态(原子操作) | |
compareAndSetNodeStatus(node, node.threadStatus, Node.DEFAULT); | |
return false; // 最终出口 | |
} | |
// 判断是否需要排队等待,这里用 && 连接,表示如果需要排队,则执行后面的 parkAndCheckInterrupt 方法中断线程 | |
if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt(node)) { | |
// 一定不会返回 true,因为 parkAndCheckInterrupt 一定返回 false(具体原因看注释) | |
return true; // 写上只是为了占位(idea的警告好烦) | |
} | |
} | |
} | |
/** | |
* 让当前线程排队(中断线程) | |
* | |
* @param node 当前 node | |
* @return 让线程等待成功返回 true,否则 false | |
*/ | |
private boolean parkAndCheckInterrupt(Node node) { | |
// 将当前的线程状态修改为等待状态 | |
compareAndSetNodeStatus(node, node.threadStatus, Node.WAIT); | |
// 中断线程 | |
unsafe.park(false, 0); | |
// 这里返回的一定是 false,因为上面调用 park 已经让线程中断,所以这个 isInterrupted 方法已经执行不了,如果能执行就表示这个线程从中断恢复了 | |
return Thread.currentThread().isInterrupted(); | |
} | |
/** | |
* 判断当前线程是否需要排队等待 | |
* 如果上一个元素是头元素,是不需要等待(不过前面已经判断了,能进这里表示一定不是头元素,所以这里就不判断了) | |
* 如果上一个元素本身都是等待状态,表示当前线程也要排队等待 | |
* 如果上一个元素不是头元素,也不是等待状态,也没有获取锁(说明上一个线程是马上就要拿到锁的状态),当前线程排队等待 | |
* | |
* @param node 当前 node | |
* @return 返回当前线程是否需要排队 | |
*/ | |
private boolean shouldParkAfterFailedAcquire(Node node) { | |
// 首先获取上一个元素 | |
Node prev = node.prev; | |
// 上个元素不为空,不为头,且没有被取消,这时表示上个 node 要么在排队,要么在尝试获取锁,所以当前 node 一定是等待 | |
if (prev != null && prev != head && prev.threadStatus != Node.CANCELLED) { | |
return true; | |
} | |
// 判断上一个元素是否被取消,如果被取消则将上一个元素从当前的队列排除(因为可能有多个被取消的,所以这里丢进 do while 循环) | |
if (prev != null && prev.threadStatus == Node.CANCELLED) { | |
do { | |
prev = prev.prev; | |
node.prev = prev; | |
} while (prev.threadStatus == Node.CANCELLED); | |
prev.next = node; | |
} | |
return false; | |
} | |
/** | |
* 原子操作修改 Node的状态 | |
* | |
* @param node 需要修改的 node | |
* @param threadStatus 期望的 threadStatus | |
* @param arg 修改为的状态 | |
*/ | |
private void compareAndSetNodeStatus(Node node, int threadStatus, int arg) { | |
unsafe.compareAndSwapInt(node, threadStatusOffset, threadStatus, arg); | |
} | |
/** | |
* 头节点都是空的,因为线程已经拿到锁了,所以没必要还在队列里 | |
* | |
* @param node 当前线程 | |
*/ | |
private void setHead(Node node) { | |
this.head = node; | |
node.thread = null; | |
node.prev = null; | |
} | |
/** | |
* 根据当前线程创建一个 Node 对象,并将其入队 | |
* | |
* @return 返回当前入队的 Node | |
*/ | |
private Node addWaiter() { | |
// 根据当前的线程创建一个 Node | |
Node node = new Node(Thread.currentThread()); | |
// 取出队尾元素 | |
Node prev = this.tail; // 新元素的上一个元素就是队尾元素 | |
if (prev != null) { | |
// 队尾元素存在 | |
node.prev = prev; | |
// 替换队尾元素(原子操作) | |
if (compareAndSetTail(prev, node)) { | |
prev.next = node; | |
return node; | |
} | |
} | |
// 执行这个方法,说明入队失败(1、队列不存在 2、队列正在初始化,还没有队尾元素) | |
return enq(node); | |
} | |
/** | |
* 自旋的完成入队操作 | |
* 如果是一个线程来创建队列,则 | |
* 循环第一次:创建了队头元素 | |
* 循环第二次:创建了第二个元素,第二个元素就是新元素,那么新元素就会入队 | |
* | |
* @param node 需要插入队里面的元素 | |
* @return 返回 node | |
*/ | |
private Node enq(Node node) { | |
while (true) { | |
// 获取队尾元素 | |
Node t = this.tail; | |
if (t == null) { // 说明队列还没有初始化 或者 队列正在初始化(可能有别的线程在初始化) | |
// 如果队列还不存在需要创建新的队列,先放队头元素 | |
// 因为可能有其他的线程也在初始化这个队列,所以这里需要使用原子操作 | |
if (compareAndSetHead(new Node())) // 创建一个新的节点(里面三个属性都是 null) | |
this.tail = this.head; // 如果创建成功,让 head 和 tail 指向同一个元素 | |
} else { | |
// 说明上面已经初始化了(队里初始只有一个空元素),这时直接把 node 入队就行了 | |
// 这里的 t 经过第二轮自旋,已经实际指向 tail 了,compareAndSetTail 主要就是把 tail 的位置换成 node(注意看偏移量) | |
// 而 t 则是指向上面的那个 new Node() 所以,这个 t 就是 head | |
if (compareAndSetTail(t, node)) { | |
node.prev = t; | |
t.next = node; | |
return node; | |
} | |
} | |
} | |
} | |
/** | |
* 原子操作,创建队头元素 | |
* | |
* @param node 创建的队头元素 | |
* @return 是否创建成功 | |
*/ | |
private boolean compareAndSetHead(Node node) { | |
// 队头必须是 null 才能执行,所以这里期望值传入 null | |
return unsafe.compareAndSwapObject(this, headOffset, null, node); | |
} | |
/** | |
* 原子操作,实现队尾元素的替换 | |
* | |
* @param prev 上个元素 | |
* @param node 需要替换的元素 | |
* @return 是否替换成功 | |
*/ | |
private boolean compareAndSetTail(Node prev, Node node) { | |
return unsafe.compareAndSwapObject(this, tailOffset, prev, node); | |
} | |
/** | |
* 尝试获取锁 | |
* | |
* @param arg 需要修改为的状态 | |
* @return 如果获取到锁返回 true,否则返回 false | |
*/ | |
private boolean tryAcquire(int arg) { | |
// 获取当前的线程对象 | |
Thread thread = Thread.currentThread(); | |
// 获取当前锁的状态 | |
int state = getStatus(); | |
if (state == 0) { //当前锁处于释放状态 | |
// 还需要判断自己是否需要排队,如果不需要排队就直接获取锁(毕竟不能插队) | |
// 注意这个判断,如果第一个方法返回 true,后面那个方法就不用执行了(因为是 && 连接的) | |
if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) { | |
// 获取锁成功,设置占用锁的线程为当前线程 | |
setExclusiveOwnerThread(thread); | |
System.out.println(thread.getName() + ":获取锁成功!"); | |
return true; | |
} | |
} else if (thread == getExclusiveOwnerThread()) { // 判断当前尝试获取锁的线程是否就是占有锁的线程(重入锁) | |
// 如果有重入锁,状态需要 加一状态 | |
state = getStatus() + arg; | |
setStatus(state); | |
return true; | |
} | |
return false; | |
} | |
/** | |
* 获取锁 因为这里可能会有多个线程同时操作,所以需要使用原子操作完成状态的修改 | |
* | |
* @param expected 预期对象 | |
* @param arg 需要修改为的状态 | |
* @return 获取锁,成功返回 true | |
*/ | |
private boolean compareAndSetState(int expected, int arg) { | |
return unsafe.compareAndSwapInt(this, statusOffset, expected, arg); | |
} | |
/** | |
* @return 判断是否需要排队 | |
*/ | |
private boolean hasQueuedPredecessors() { | |
/* | |
1. 队列为空,队列只有一个元素,不需要排队 | |
2. 队列正在初始化,排队 | |
3. 队列有元素,当前线程正好是 head元素的下一个元素,不需要排队 | |
*/ | |
Node h = head; /* 头元素 */ | |
Node t = tail; /* 头元素 */ | |
Node s; /* 头元素的下一个元素 */ | |
/* | |
h == t | |
1. h = null, t = null 队列还没创建 | |
2. h = t 队列中只有一个元素。尝试获取锁 | |
h != t | |
1. h != null, t = null 队列真正初始化 | |
2. h 1= null, t != null 虽然有队列了,但是当前线程还不在队列里面 | |
*/ | |
return h != t && // 这里的判断使用的是 && 连接,表示出现一个 false 就返回 false | |
// h.next == null 表示还在初始化 | |
// s.thread == Thread.currentThread() 表示头元素的下一个元素就是当前线程 | |
((s = h.next) == null || s.thread != Thread.currentThread()); // 这里的判断使用的是 || 连接,表示出现一个 true 就返回 true | |
} | |
static class Node { | |
Node prev; /* 指向上一个元素 */ | |
Node next; /* 指向下一个元素 */ | |
Thread thread; /* 当前元素维护的线程 */ | |
int threadStatus; /* 状态(1:运行 2:等待 3:取消) */ | |
// 定义几个状态常量 | |
/** | |
* 默认状态 | |
*/ | |
static final int DEFAULT = 0; | |
/** | |
* 等待状态 | |
*/ | |
static final int WAIT = 1; | |
/** | |
* 取消状态 | |
*/ | |
static final int CANCELLED = -1; | |
// 构造方法 | |
public Node() { | |
} | |
public Node(Thread thread) { | |
this.thread = thread; | |
} | |
public Node(Thread thread, int threadStatus) { | |
this(thread); | |
this.threadStatus = threadStatus; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment