并发编程-AQS源码简单剖析

AQS简介

AQS维护了一个同步队列实现了同步锁,ReentrantLock,读写锁,闭锁等底层都是AQS实现的,搞懂了AQS会对后续这些工具类的使用有很大的帮助。此片文章只简单剖析一下独占式锁的获取以及释放过程。

节点的几种等待状态

在开始解读代码前我们先来看一下同步队列中节点的2种状态,后续会用到这2种状态

CANCELLED = 1:因为超时或者中断,结点会被设置为取消状态,被废弃的状态

SIGNAL = -1:表示这个结点的后续节点可以休息了

获取锁

acquire方法

    public final void acquire(int arg) {
//      AQS典型的模板设计模式 tryAcquire是需要自己实现的方法,参考ReentrantLock
        if (!tryAcquire(arg) &&
//             addWaiter方法是把线程封装成一个节点扔进同步队列 并返回给acquireQueued方法进行后续处理
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//          如果在获取锁的过程中,尝试中断线程,只有等获取成功才能中断(只是修改中断标志位哈)
            selfInterrupt();
    }

tryAcquire方法

AQS是典型的模板设计模式,而tryAcquire方法就是需要我们自己实现的方法之一,可以参考ReentrantLock的代码。这个方法主要做的事情就是利用CAS尝试获取一次锁,如果返回ture则代表拿到了锁。此时线程都不用被扔到同步队列中。

addWaiter方法

1.把线程封装成一个节点

2.利用CAS尝试把节点插入到队尾

3.如果竞争激烈可能插入队尾失败,此时调用enq方法自旋(死循环)插入队尾

4.返回当前节点给acquireQueued方法做后续处理

    private Node addWaiter(Node mode) {
//      把当前线程封装成一个节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
//      利用CAS尝试把节点插入队尾,注意如果tail为null是不进行插入的
//      也就是如果同步队列还没有初始化,是不在此进行插入队尾的,是在enq方法中插入的
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
//              插入成功直接返回当前节点
                return node;
            }
        }
//      如果插入失败则调用enq方法继续插入 
        enq(node);
        return node;
    }

enq方法

1.如果未初始化队列则对队列进行初始化

2.尝试把节点插入队尾,直到成功

    private Node enq(final Node node) {
//      自旋就是死循环 
        for (;;) {
            Node t = tail;
//          初始化队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
//          把节点插入队尾,直至成功      
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

至此addWaiter方法执行完,并返回当前节点给acquireQueued方法执行后续操作。

acquireQueued方法

1.如果当前节点的前驱节点是头节点则尝试拿锁

2.拿锁成功则更新当前节点为头节点并删除之前的头节点

3.拿锁失败,或者当前节点的前驱节点不是头节则尝试进入阻塞状态

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
//          这个就是判断在获取锁的过程中是否修改了中断标志位
//          它的作用就是获取锁后再安全的中断
            boolean interrupted = false;
            for (;;) {
//              获取当前节点的前驱节点
                final Node p = node.predecessor();
//               如果前驱节点是头节点并且尝试拿锁成功
                if (p == head && tryAcquire(arg)) {
//                  设置当前节点为头节点
                    setHead(node);
//                  删除之前的头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
//               shouldParkAfterFailedAcquire判断当前线程是否可以阻塞
//               parkAndCheckInterrupt 阻塞线程,并判断是否是中断被唤醒的
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法

1.如果前驱节点的waitStatus是-1则线程可以阻塞

2.如果前驱节点的waitStatus是1则代表线程可以被废弃了,则维护同步队列直到前驱的waitStatus不是1

3.如果前驱节点的waitStatus是其他值则设置为-1

这个方法被自旋包裹着,多次操作后到最终前驱的waitStatus都会是-1,然后当前线程进入阻塞状态,线程不能一直处于抢占cpu的状态那样性能会大大降低,阻塞线程,等需要的时候在唤醒线程。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
//       如果是-1则线程可以安心的进入休息状态
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
//      >0也就是cancelled状态,此时需要维护同步队列把前边是cancelled状态的节点全部移除掉
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
//      0是初始化的状态则把前驱的状态设置为-1  -3的情况不探讨属于共享锁
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt方法

1.阻塞当前线程

2.判断当前线程是否是中断被唤醒的,是则返回true

如果parkAndCheckInterrupt方法返回true,则acquireQueued方法返回true,说明在获取锁的过程中尝试了中断线程,所以获取锁后要把中断标志位更新为中断的状态。

至此获取锁的过程全部完成。

释放锁

释放成功则唤醒第一个节点

    public final boolean release(int arg) {
//      尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
//              释放成功则唤醒下一个节点  
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

以上为个人见解,不对请指出,谢谢!

 

 

 

 

 

© 版权声明
THE END
喜欢就支持以下吧
点赞1
分享
评论 抢沙发