Skip to content

Instantly share code, notes, and snippets.

@alsritter
Created June 18, 2021 16:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alsritter/7097879a8e0d53172072432f6a38b73d to your computer and use it in GitHub Desktop.
Save alsritter/7097879a8e0d53172072432f6a38b73d to your computer and use it in GitHub Desktop.
自己写一个简单的 AQS 同步队列
/**
* @author alsritter
* @version 1.0
**/
public class MySimpleAQS {
// 因为是共享的变量,所以要保证可见性
private volatile Node head; /* 表示头元素 */
private volatile Node tail; /* 表示尾元素 */
private volatile int status; /* 资源状态(一把锁)1:表示锁被占用 0:表示锁空闲 */
private Thread exclusiveOwnerThread; /* 当前占有锁的线程对象 */
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
private static Unsafe unsafe;
private static long statusOffset; /* 记录资源状态的偏移量 */
private static long tailOffset; /* 记录队尾元素的偏移量 */
private static long headOffset; /* 记录队头元素的偏移量 */
private static long threadStatusOffset; /* 记录 Node 状态的偏移量 */
static {
// 因为这个 theUnsafe 是用的 private 修饰的,所以需要使用 setAccessible 打开
// 取得静态对象是用 null
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
unsafe = (Unsafe) field.get(null);
statusOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("status"));
tailOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("tail"));
headOffset = unsafe.objectFieldOffset(MySimpleAQS.class.getDeclaredField("head"));
threadStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("threadStatus"));
} catch (IllegalAccessException | NoSuchFieldException e) {
e.printStackTrace();
}
}
/**
* 释放锁
*
* @param arg 理论上传入的参数都是 1
*/
public void release(int arg) {
if (tryRelease(arg)) { // 如果尝试释放成功,则唤醒下一个线程
// 唤醒下一个线程
unparkSuccessor(head);
}
}
/**
* 唤醒 head 的下一个线程
*
* @param head 头
*/
private void unparkSuccessor(Node head) {
// 修改当前锁的状态为 0(释放锁)
compareAndSetState(getStatus(), 0);
// 获取 head 元素的下一个元素
// 下一个元素是
Node next = null; // 这是要唤醒的元素
if (head != null) { // 健壮性判断
next = head.next;
}
// 如果下一个元素是 null,或者下一个元素的状态不是 wait,则从列表的最后一个元素开始往前找
// 找到最后一个状态不是 CANCELLED(取消)的作为 next
if (next == null || next.threadStatus != Node.WAIT) {
for (Node t = tail; t != null && t != head; t = t.prev) {
if (t.threadStatus != Node.CANCELLED) {
next = t;
}
}
}
// 如果 next 元素找到了,则直接唤醒
if (next != null) {
// 唤醒线程
unsafe.unpark(next.thread);
System.out.println("唤醒了线程:" + Thread.currentThread().getName());
// 修改状态
compareAndSetNodeStatus(next,next.threadStatus,Node.DEFAULT);
}
}
/**
* 释放锁
*
* @param arg 一般传入的都是 1
* @return 是否释放成功
*/
private boolean tryRelease(int arg) {
// 计算状态
int c = getStatus() - arg; // 1
// 判断释放锁的线程是否是当前占用锁的线程
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new RuntimeException("释放锁的线程并非当前占用锁的线程");
}
if (c == 0) {
// 是要释放锁,先将当前持有锁的线程对象置空
setExclusiveOwnerThread(null);
System.out.println(Thread.currentThread().getName() + ":释放了锁");
return true;
}
// 将新的状态设置回去
setStatus(c);
return false;
}
/**
* 获取锁
*
* @param arg
*/
public final void acquire(int arg) {
/*
1. 注意看这里把 tryAcquire 放在第一位,当他返回 true 表示获取锁成功,就无需执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断)
2. 当他返回 false 说明锁被占用了,就需要执行后面的判断语句(创建队列,创建 Node元素,排队,线程中断)
*/
if (!tryAcquire(arg)
&& acquireQueued(addWaiter(), arg)) {
Thread.currentThread().interrupt();
}
}
/**
* 判断传入的 Node 是否需要排队,如果要排队,则开始排队,如果不需要则直接获取锁
* <p>
* 这里之所以使用 boolean 返回类型的原因就是因为可能在刚插入队列这会线程就把锁释放了,无需等待
*
* @param node addWaiter 入队的那个 Node
* @param arg
* @return 如果排队成功(线程开始等待)返回 true,如果返回 false 表示就在丢进队列这会,线程把锁释放了,无需等待
*/
private boolean acquireQueued(Node node, int arg) {
// 自旋操作
while (true) {
// 判断上一个元素是否是 head 元素
if (node.prev == head && tryAcquire(arg)) {
// 如果上个元素是 head 元素,则尝试获取锁,如果获取成功
setHead(node); // 将当前元素设置为头元素,之前的头元素就被排除在外(被 GC 回收了)
// 修改当前 node 的状态(原子操作)
compareAndSetNodeStatus(node, node.threadStatus, Node.DEFAULT);
return false; // 最终出口
}
// 判断是否需要排队等待,这里用 && 连接,表示如果需要排队,则执行后面的 parkAndCheckInterrupt 方法中断线程
if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt(node)) {
// 一定不会返回 true,因为 parkAndCheckInterrupt 一定返回 false(具体原因看注释)
return true; // 写上只是为了占位(idea的警告好烦)
}
}
}
/**
* 让当前线程排队(中断线程)
*
* @param node 当前 node
* @return 让线程等待成功返回 true,否则 false
*/
private boolean parkAndCheckInterrupt(Node node) {
// 将当前的线程状态修改为等待状态
compareAndSetNodeStatus(node, node.threadStatus, Node.WAIT);
// 中断线程
unsafe.park(false, 0);
// 这里返回的一定是 false,因为上面调用 park 已经让线程中断,所以这个 isInterrupted 方法已经执行不了,如果能执行就表示这个线程从中断恢复了
return Thread.currentThread().isInterrupted();
}
/**
* 判断当前线程是否需要排队等待
* 如果上一个元素是头元素,是不需要等待(不过前面已经判断了,能进这里表示一定不是头元素,所以这里就不判断了)
* 如果上一个元素本身都是等待状态,表示当前线程也要排队等待
* 如果上一个元素不是头元素,也不是等待状态,也没有获取锁(说明上一个线程是马上就要拿到锁的状态),当前线程排队等待
*
* @param node 当前 node
* @return 返回当前线程是否需要排队
*/
private boolean shouldParkAfterFailedAcquire(Node node) {
// 首先获取上一个元素
Node prev = node.prev;
// 上个元素不为空,不为头,且没有被取消,这时表示上个 node 要么在排队,要么在尝试获取锁,所以当前 node 一定是等待
if (prev != null && prev != head && prev.threadStatus != Node.CANCELLED) {
return true;
}
// 判断上一个元素是否被取消,如果被取消则将上一个元素从当前的队列排除(因为可能有多个被取消的,所以这里丢进 do while 循环)
if (prev != null && prev.threadStatus == Node.CANCELLED) {
do {
prev = prev.prev;
node.prev = prev;
} while (prev.threadStatus == Node.CANCELLED);
prev.next = node;
}
return false;
}
/**
* 原子操作修改 Node的状态
*
* @param node 需要修改的 node
* @param threadStatus 期望的 threadStatus
* @param arg 修改为的状态
*/
private void compareAndSetNodeStatus(Node node, int threadStatus, int arg) {
unsafe.compareAndSwapInt(node, threadStatusOffset, threadStatus, arg);
}
/**
* 头节点都是空的,因为线程已经拿到锁了,所以没必要还在队列里
*
* @param node 当前线程
*/
private void setHead(Node node) {
this.head = node;
node.thread = null;
node.prev = null;
}
/**
* 根据当前线程创建一个 Node 对象,并将其入队
*
* @return 返回当前入队的 Node
*/
private Node addWaiter() {
// 根据当前的线程创建一个 Node
Node node = new Node(Thread.currentThread());
// 取出队尾元素
Node prev = this.tail; // 新元素的上一个元素就是队尾元素
if (prev != null) {
// 队尾元素存在
node.prev = prev;
// 替换队尾元素(原子操作)
if (compareAndSetTail(prev, node)) {
prev.next = node;
return node;
}
}
// 执行这个方法,说明入队失败(1、队列不存在 2、队列正在初始化,还没有队尾元素)
return enq(node);
}
/**
* 自旋的完成入队操作
* 如果是一个线程来创建队列,则
* 循环第一次:创建了队头元素
* 循环第二次:创建了第二个元素,第二个元素就是新元素,那么新元素就会入队
*
* @param node 需要插入队里面的元素
* @return 返回 node
*/
private Node enq(Node node) {
while (true) {
// 获取队尾元素
Node t = this.tail;
if (t == null) { // 说明队列还没有初始化 或者 队列正在初始化(可能有别的线程在初始化)
// 如果队列还不存在需要创建新的队列,先放队头元素
// 因为可能有其他的线程也在初始化这个队列,所以这里需要使用原子操作
if (compareAndSetHead(new Node())) // 创建一个新的节点(里面三个属性都是 null)
this.tail = this.head; // 如果创建成功,让 head 和 tail 指向同一个元素
} else {
// 说明上面已经初始化了(队里初始只有一个空元素),这时直接把 node 入队就行了
// 这里的 t 经过第二轮自旋,已经实际指向 tail 了,compareAndSetTail 主要就是把 tail 的位置换成 node(注意看偏移量)
// 而 t 则是指向上面的那个 new Node() 所以,这个 t 就是 head
if (compareAndSetTail(t, node)) {
node.prev = t;
t.next = node;
return node;
}
}
}
}
/**
* 原子操作,创建队头元素
*
* @param node 创建的队头元素
* @return 是否创建成功
*/
private boolean compareAndSetHead(Node node) {
// 队头必须是 null 才能执行,所以这里期望值传入 null
return unsafe.compareAndSwapObject(this, headOffset, null, node);
}
/**
* 原子操作,实现队尾元素的替换
*
* @param prev 上个元素
* @param node 需要替换的元素
* @return 是否替换成功
*/
private boolean compareAndSetTail(Node prev, Node node) {
return unsafe.compareAndSwapObject(this, tailOffset, prev, node);
}
/**
* 尝试获取锁
*
* @param arg 需要修改为的状态
* @return 如果获取到锁返回 true,否则返回 false
*/
private boolean tryAcquire(int arg) {
// 获取当前的线程对象
Thread thread = Thread.currentThread();
// 获取当前锁的状态
int state = getStatus();
if (state == 0) { //当前锁处于释放状态
// 还需要判断自己是否需要排队,如果不需要排队就直接获取锁(毕竟不能插队)
// 注意这个判断,如果第一个方法返回 true,后面那个方法就不用执行了(因为是 && 连接的)
if (!hasQueuedPredecessors() && compareAndSetState(0, arg)) {
// 获取锁成功,设置占用锁的线程为当前线程
setExclusiveOwnerThread(thread);
System.out.println(thread.getName() + ":获取锁成功!");
return true;
}
} else if (thread == getExclusiveOwnerThread()) { // 判断当前尝试获取锁的线程是否就是占有锁的线程(重入锁)
// 如果有重入锁,状态需要 加一状态
state = getStatus() + arg;
setStatus(state);
return true;
}
return false;
}
/**
* 获取锁 因为这里可能会有多个线程同时操作,所以需要使用原子操作完成状态的修改
*
* @param expected 预期对象
* @param arg 需要修改为的状态
* @return 获取锁,成功返回 true
*/
private boolean compareAndSetState(int expected, int arg) {
return unsafe.compareAndSwapInt(this, statusOffset, expected, arg);
}
/**
* @return 判断是否需要排队
*/
private boolean hasQueuedPredecessors() {
/*
1. 队列为空,队列只有一个元素,不需要排队
2. 队列正在初始化,排队
3. 队列有元素,当前线程正好是 head元素的下一个元素,不需要排队
*/
Node h = head; /* 头元素 */
Node t = tail; /* 头元素 */
Node s; /* 头元素的下一个元素 */
/*
h == t
1. h = null, t = null 队列还没创建
2. h = t 队列中只有一个元素。尝试获取锁
h != t
1. h != null, t = null 队列真正初始化
2. h 1= null, t != null 虽然有队列了,但是当前线程还不在队列里面
*/
return h != t && // 这里的判断使用的是 && 连接,表示出现一个 false 就返回 false
// h.next == null 表示还在初始化
// s.thread == Thread.currentThread() 表示头元素的下一个元素就是当前线程
((s = h.next) == null || s.thread != Thread.currentThread()); // 这里的判断使用的是 || 连接,表示出现一个 true 就返回 true
}
static class Node {
Node prev; /* 指向上一个元素 */
Node next; /* 指向下一个元素 */
Thread thread; /* 当前元素维护的线程 */
int threadStatus; /* 状态(1:运行 2:等待 3:取消) */
// 定义几个状态常量
/**
* 默认状态
*/
static final int DEFAULT = 0;
/**
* 等待状态
*/
static final int WAIT = 1;
/**
* 取消状态
*/
static final int CANCELLED = -1;
// 构造方法
public Node() {
}
public Node(Thread thread) {
this.thread = thread;
}
public Node(Thread thread, int threadStatus) {
this(thread);
this.threadStatus = threadStatus;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment