队列同步器

模板方法模式 中提到了 AQS 同步器,实际上 AQS 同步器框架利用模板方法模式方便了锁的实现者,简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

AQS 是队列同步器 AbstractQueuedSynchronizer 的简称,是用来构建锁或者其他同步组件的基础框架,使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作。

同步器中一些模板方法定义了算法骨架,模板方法中包含了一些未被实现的抽象方法。所以同步器的使用需要子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改。在 AQS 中就是状态变量 state,许多同步器类都使用这个变量来保存一些所需的状态。如:

  • ReentrantLock 用它表示所有者线程已经重复获取该锁的次数
  • ReentrantReadWriteLock 将状态分为前 16 位标识读锁的计数,后 16 位标识写锁的计数
  • Semaphore 用它表示剩余的许可数量
  • CountDownLatch 用它表示当前的计数值
  • FutureTask 用它表示任务的状态(尚未开始、正在运行、已经完成以及已取消),jdk1.8 之前

对于继承同步器的子类实现抽象方法对同步状态进行访问时需要使用下面三个方法以保证安全的更改同步状态:

  • getState():获取当前同步状态
  • setState(int newState):设置当前同步状态
  • compareAndSetState(int expect,int update):使用 CAS 设置当前状态,该方法能够保证状态设置的原子性

子类推荐被定义为自定义同步组件的静态内部类,同步器定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。

下面是模板方法中未是实现的抽象方法:

// 判断线程是否独占资源,仅在 ConditionObject 方法内部调用,如果不使用条件,不需要定义
protected boolean isHeldExclusively() { throw new UnsupportedOperationException();}
// 尝试以独占模式获取资源。 该方法应该查询对象的状态是否允许以独占模式获取,如果是,则获取它。
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
// 尝试设置状态以独占模式释放资源。
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
// 尝试以共享模式获取。 该方法应该查询对象的状态是否允许在共享模式下获取该对象,如果是这样,就可以获取它。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
// 尝试设置状态以共享模式释放资源。
protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException();}

下面是同步器提供了模板方法,自定义的同步组件将调用以下的方法然后间接调用子类实现的抽象方法,同步器提供的模板方法分为三类:

  • 独占式获取与释放同步状态
  • 共享式获取与释放同步状态
  • 查询同步队列中的等待线程情况

队列同步器的实现

接下来分析这些同步器模板方法以及模板方法涉及的数据结构。

  • 同步队列

同步器依赖同步 FIFO 队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,然后再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。

static final class Node {

    static final Node SHARED = new Node();

    static final int CANCELLED =  1;

    static final int SIGNAL    = -1;

    static final int CONDITION = -2;

    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;
}

下面是对 Node 的属性描述:

节点是构成同步队列的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部。

在上图中同步器包含同步器包含了两个节点引用,一个指向头节点,一个指向尾节点。当有线程获取了同步状态,其他线程无法获取时将通过 compareAndSetTail(Node expect, Node update) 方法加入队列。

在同步队列中,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。如下代码:

private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
 }

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用 CAS 来保证。如下代码:

for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
}

// 该方法在当线程无法获取到 state 时调用,此时进入阻塞状态
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  • 独占式同步状态获取与释放

通过调用同步器的 acquire(int arg) 方法可以获取同步状态,当获取失败时,构造一个 Node,加入阻塞队列并阻塞线程,如果被唤醒则通过自旋再次重试,如下代码:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

首先调用自定义同步器实现的 tryAcquire(int arg) 方法,该方法保证线程安全的获取同步状态(getState()),如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过 addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用 acquireQueued(Node node,int arg) 方法,使得该节点以不断轮询的方式获取同步状态,如果被阻塞则被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

下面的代码是节点的构造和加入同步队列尾部:

// 节点的构造,并加入队列尾部,返回构造好的节点
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 快速尝试在尾部添加
    Node pred = tail;
     if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

// 加入同步队列尾部
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
        if (compareAndSetHead(new Node()))
            tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

上面代码通过 compareAndSetTail(Node expect,Node update) 方法来确保节点能够被线程安全添加。

enq(final Node node) 方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。所以, enq(final Node node) 方法将并发添加节点的请求通过 CAS “串行化”了。

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(每个线程)不断尝试,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则节点线程依然被阻塞并留在这个自旋过程中。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued(final Node node,int arg) 方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态:

头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。下图是节点自旋获取同步状态的行为:

由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点。

下图为独占式同步状态获取流程:acquire(int arg),当当前线程从acquire(int arg)方法返回时,对于锁这种并发组件而言,代表着当前线程获取了锁。

当通过调用同步器的 release(int arg) 方法时可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点,使后继节点重新尝试获取同步状态。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
        return true;
    }
    return false;
}

该方法执行时,会唤醒头节点的后继节点线程,对应于上文的 unparkSuccessor(Node node) 方法。

对于独占式同步状态获取与释放,在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;如果是前驱节点为头节点且成功获取了同步状态则可以停止自旋并尝试获取同步状态并移出队列。在释放同步状态时,调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。

  • 共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。写操作要求对资源的独占式访问,而读操作可以是共享式访问。

通过调用同步器的 acquireShared(int arg) 方法可以共享式地获取同步状态。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

acquireShared(int arg) 方法中,同步器调用 tryAcquireShared(int arg)方法尝试获取同步状态, tryAcquireShared(int arg) 方法返回值为 int 类型,当返回值大于等于 0 时,表示能够获取到同步状态。

通过上面代码,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是tryAcquireShared(int arg) 方法返回值大于等于 0。

另外,共享式获取通过调用 releaseShared(int arg) 方法可以释放同步状态。

public final boolean releaseShared(int arg) {
     if (tryReleaseShared(arg)) {
         doReleaseShared();
         return true;
     }
     return false;
 }

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。如果是支持多个读线程并发访问的同步组件,可以使用循环 + CAS 来保证多个线程同时释放同步状态的操作(同步状态保存了多个读线程的数量),这里也是通过 CAS 将多个并行操作转为串行操作,如果某个线程由于其他线程的影响而 CAS 失败则重新执行直到成功。如:Semaphore:

public class Semaphore implements java.io.Serializable {

    ...
    // 模板方法中的抽象方法步骤实现
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }
    ...

}
  • 独占式超时获取同步状态

通过调用同步器的 doAcquireNanos(int arg,long nanosTimeout) 方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回 true,否则,返回 false。

在 JVM 自带的锁机制 synchronized 中,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在 synchronized 上,等待着获取锁。在 线程中断操作 中,线程有两种方式对中断操作进行响应,而 synchronized 没有类似在可中断方法上方法签名有 InterruptedException 的标识,并且在 BLOCKED 状态下也无法通过轮询来获取当前线程是否被中断的信息,所以在 synchronized 是无法响应中断的。

而在 JUC 并发包中,同步器提供了 acquireInterruptibly(int arg) 方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException。

在超时获取同步状态过程中,doAcquireNanos(int arg,long nanosTimeout) 方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔 nanosTimeout,为了防止过早通知,nanosTimeout 计算公式为:nanosTimeout-=now-lastTime,其中 now 为当前唤醒时间,lastTime 为上次唤醒时间,如果 nanosTimeout 大于 0 则表示超时时间未到,需要继续睡眠 nanosTimeout 纳秒,反之,表示已经超时。如下同步器的 doAcquireNanos 方法代码:

private boolean doAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

代码中,该方法在自旋过程中获取成功和独占式同步获取的过程类似,但是在同步状态获取失败时,需要判断是否超时(nanosTimeout 小于等于 0 表示已经超时),如果没有超时,重新计算超时间隔 nanosTimeout,然后使当前线程等待 nanosTimeout 纳秒,如果已经到达超时时间,线程会从LockSupport.parkNanos(Object blocker,long nanos)

如果 nanosTimeout 小于等于 spinForTimeoutThreshold(1000 纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程,继续 for (;;) 流程。因为非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让 nanosTimeout 的超时从整体上表现出响应不及时的情况。

下面是独占式超时获取同步态的流程:独占式超时获取同步状态 doAcquireNanos(int arg,long nanosTimeout) 和独占式获取同步状态 acquire(int args) 类似,acquire(int args) 在未获取到同步状态时,当前线程会不断等待-阻塞-唤醒-重试,而 doAcquireNanos(int arg,long nanosTimeout) 会使当前线程等待 nanosTimeout 纳秒,如果当前线程在 nanosTimeout 纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。

自定义同步组件

设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,我们将这个同步工具命名为 TwinsLock。

  • 确定访问模式

TwinsLock 能够在同一时刻支持多个线程的访问,所以是共享式访问,所以 TwinsLock 必须重写 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法,保证同步器的共享式同步状态的获取与释放方法得以执行。

  • 定义资源数

TwinsLock 在同一时刻允许至多两个线程的同时访问,表明同步资源数 state 为 2,这样可以设置初始状态 state 为 2,当一个线程进行获取,state 减 1,该线程释放时,则 status 加 1,状态的合法范围为 0、1 和 2,0 表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。

在同步状态更改时,需要使用 compareAndSet(int expect,int update) 方法做原子性保障。

  • 组合自定义同步器

自定义同步组件通过组合自定义同步器来完成同步功能,一般情况下自定义同步器会被定义为自定义同步组件的静态内部类。

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);
    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must largethan zero.");
            }
        setState(count);
    }
    public int tryAcquireShared(int reduceCount) {
        for (;;) {
            int current = getState();
            int newCount = current - reduceCount;
            if (newCount < 0 || compareAndSetState(current,
                newCount)) {
                return newCount;
            }
        }
    }
    public boolean tryReleaseShared(int returnCount) {
        for (;;) {
            int current = getState();
            int newCount = current + returnCount;
            if (compareAndSetState(current, newCount)) {
                return true;
            }
        }
    }
}
    public void lock() {
        sync.acquireShared(1);
    }
    public void unlock() {
        sync.releaseShared(1);
    }
    // 其他接口方法略
}

TwinsLock 实现了 Lock 接口,提供了面向使用者的接口,使用者调用 lock() 方法获取锁,随后调用 unlock() 方法释放锁,而同一时刻只能有两个线程同时获取到锁。

TwinsLock 同时包含了一个自定义同步器 Sync,面向线程访问和同步状态控制。例如:当 tryAcquireShared(int reduceCount) 方法返回值大于等于 0 时,当前线程才获取同步状态,对于上层的 TwinsLock 而言,则表示当前线程获得了锁。

同步器屏蔽了底层 state 同步、队列节点轮询等待的复杂细节,使得开发者更容易定义出不同的并发组件。

下面是一个测试用例:

public void test() {
    final Lock lock = new TwinsLock();
        class Worker extends Thread {
            public void run() {
            while (true) {
                lock.lock();
                    try {
                        Thread.sleep(100);
                        System.out.println(Thread.currentThread().getName());
                        Thread.sleep(100);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }
        // 启动 10 个线程
        for (int i = 0; i < 10; i++) {
            Worker w = new Worker();
            w.start();
        }
    }
}

打印结果为两个线程名称成对输出,也就是在同一时刻只有两个线程能够获取到锁。

参考

最后修改日期: 2019年10月5日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。