博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
CountDownLatch的await和countDown方法简单分析
阅读量:6037 次
发布时间:2019-06-20

本文共 7685 字,大约阅读时间需要 25 分钟。

await

调用sync.acquireSharedInterruptibly

public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}

sync.acquireSharedInterruptibly

调用tryAcquireShared方法返回<0执行doAcquireSharedInterruptibly

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}

tryAcquireShared

尝试获取共享锁,获取成功返回1,否则-1

protected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1;}

doAcquireSharedInterruptibly

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        for (;;) {            final Node p = node.predecessor();            //如果前一个node为队头,则通过tryAcquireShared尝试获取共享锁            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                //获取到锁执行                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())                throw new InterruptedException();        }    } finally {        //产生异常执行        if (failed)            cancelAcquire(node);    }}

addWaiter

调用addWaiter方法把队尾设置为当前node;如果队尾为空或者设置失败则调用enq方法

private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}

enq

调用enq方法队尾为空则创建空的队尾和队头,否则重新设置队尾为当前node,设置成功返回。enq和addWaiter方法不同在于enq循环执行一定会执行成功,不存在失败情况

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) { // Must initialize            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

predecessor

调用predecessor方法获取前一个node

final Node predecessor() throws NullPointerException {    Node p = prev;    if (p == null)        throw new NullPointerException();    else        return p;}static final int CANCELLED = 1; //取消 static final int SIGNAL = -1; //下个节点需要被唤醒 static final int CONDITION = -2; //线程在等待条件触发static final int PROPAGATE = -3; //(共享锁)状态需要向后传播

shouldParkAfterFailedAcquire

获取当前node的前一个note的线程等待状态,如果为SIGNAL,那么返回true,大于0通过循环将当前节点之前所有取消状态的节点移出队列;其他状时,利用compareAndSetWaitStatus使前节点的状态为-1;如果是第一次await时ws状态是0,多次await时ws状态是0,最后肯定返回true

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        return true;    if (ws > 0) {        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

parkAndCheckInterrupt

调用park并返回线程是否已经中断

private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}

park

调用UNSAFE.park阻塞当前线程

public static void park(Object blocker) {    Thread t = Thread.currentThread();    setBlocker(t, blocker);    UNSAFE.park(false, 0L);    setBlocker(t, null);}

setBlocker

在当前线程t的parkBlockerOffset位置设置blocker的引用

private static void setBlocker(Thread t, Object arg) {    // Even though volatile, hotspot doesn't need a write barrier here.    UNSAFE.putObject(t, parkBlockerOffset, arg);}

UNSAFE.park

/** * 阻塞一个线程直到unpark出现、线程 * 被中断或者timeout时间到期。如果一个unpark调用已经出现了, * 这里只计数。timeout为0表示永不过期.当isAbsolute为true时, * timeout是相对于新纪元之后的毫秒。否则这个值就是超时前的纳秒数。这个方法执行时 * 也可能不合理地返回(没有具体原因) *  * @param isAbsolute true if the timeout is specified in milliseconds from *                   the epoch. *                   如果为true timeout的值是一个相对于新纪元之后的毫秒数 * @param time either the number of nanoseconds to wait, or a time in *             milliseconds from the epoch to wait for. *             可以是一个要等待的纳秒数,或者是一个相对于新纪元之后的毫秒数直到 *             到达这个时间点 */UNSAFE.park(false, 0L);

countDown

调用sync.releaseShared

public void countDown() {    sync.releaseShared(1);}

releaseShared

执行tryReleaseShared成功后执行doReleaseShared

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

tryReleaseShared

更新state值为state-1,如果state新值为0返回true,否则false

protected boolean tryReleaseShared(int releases) {    // Decrement count; signal when transition to zero    for (;;) {        int c = getState();        if (c == 0)            return false;        int nextc = c-1;        if (compareAndSetState(c, nextc))            return nextc == 0;    }}

doReleaseShared

只要等待队列有数据,获取队头等待状态,队头状态=-1其他node为等待时,则把队头等待状态置为初始,且调用unparkSuccessor方法;队头状态=0时,把队头状态置为-3传播到下一node

private void doReleaseShared() {    /*     * Ensure that a release propagates, even if there are other     * in-progress acquires/releases.  This proceeds in the usual     * way of trying to unparkSuccessor of head if it needs     * signal. But if it does not, status is set to PROPAGATE to     * ensure that upon release, propagation continues.     * Additionally, we must loop in case a new node is added     * while we are doing this. Also, unlike other uses of     * unparkSuccessor, we need to know if CAS to reset status     * fails, if so rechecking.     */    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                unparkSuccessor(h);            }            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;    }}

unparkSuccessor

上面调用unparkSuccessor时,node的状态已经更改为0,且node.next存在,执行unpark方法

private void unparkSuccessor(Node node) {    /*     * If status is negative (i.e., possibly needing signal) try     * to clear in anticipation of signalling.  It is OK if this     * fails or if status is changed by waiting thread.     */    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    /*     * Thread to unpark is held in successor, which is normally     * just the next node.  But if cancelled or apparently null,     * traverse backwards from tail to find the actual     * non-cancelled successor.     */    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        LockSupport.unpark(s.thread);}

unpark

unpark执行完之后是如何更改head的?

public static void unpark(Thread thread) {    if (thread != null)        UNSAFE.unpark(thread);}

UNSAFE.unpark

/** * Releases the block on a thread created by  * park.  This method can also be used * to terminate a blockage caused by a prior call to park. * This operation is unsafe, as the thread must be guaranteed to be * live.  This is true of Java, but not native code. * 释放被park创建的在一个线程上的阻塞.这个 * 方法也可以被使用来终止一个先前调用park导致的阻塞. * 这个操作操作时不安全的,因此线程必须保证是活的.这是java代码不是native代码。 * @param thread the thread to unblock. *           要解除阻塞的线程 */UNSAFE.unpark(thread);

转载地址:http://kslhx.baihongyu.com/

你可能感兴趣的文章
Effective C++ 的52个条款列表
查看>>
c#读取ini文件
查看>>
一阶微分方程的求解
查看>>
其它 Helper
查看>>
监控利器Prometheus初探
查看>>
foreach遍历打印表格
查看>>
Oracle笔记(中) 多表查询
查看>>
Delphi 中的 XMLDocument 类详解(5) - 获取元素内容
查看>>
差异分析定位Ring 3保护模块
查看>>
2013年7月12日“修复 Migration 测试发现的 Bug”
查看>>
vim文本编辑器详解
查看>>
学习vue中遇到的报错,特此记录下来
查看>>
CentOS7 编译安装 Mariadb
查看>>
32位系统和64位系统的选择
查看>>
01配置管理过程指南
查看>>
jstl格式化时间
查看>>
一则关于运算符的小例
查看>>
centos7 ambari2.6.1.5+hdp2.6.4.0 大数据集群安装部署
查看>>
cronexpression 详解
查看>>
一周小程序学习 第1天
查看>>