AQS之Condition

Scroll Down

Condition

有面试官让手撸代码来来来你手写个阻塞队列,我说用到ReentrantLock的Condition,他说可以,写出来,然后谈谈你为什么要使用这个。我说使用Condition因为阻塞队列有要求,写满无法再写,队列为空无法继续读。又问那你可以使用Synchronized实现吗。我说可以,然后配合2个变量用wait和notifyAll来实现。然后他让我写个双检锁。

class BlockQueen<T>{
    private final ReentrantLock lock=new ReentrantLock();
    private volatile int modCount=0;
    private volatile int cap=10;
    private final Condition Write = lock.newCondition();
    private final Condition Read = lock.newCondition();
    private List<T> list;
    public BlockQueen(){ }
    public BlockQueen(int caplity){
        list=new ArrayList<>(caplity);
        cap=caplity;
    }
    public void put(T t){
    lock.lock();
        try{
            if (modCount==cap){
                System.out.println("队列已满,写请求阻塞"+Thread.currentThread().getName());
                Write.await();
            }

            list.add(t);
            System.out.println("写入队列"+t+Thread.currentThread().getName());
            modCount++;
            Read.signal();
//            System.out.println("唤醒读");
        }catch (Exception e){}finally {
            lock.unlock();
        }
    }
    public T pop(T t){
     lock.lock();
        try{
            if (modCount==0){
                System.out.println("队列为空,读请求阻塞"+Thread.currentThread().getName());
                Read.await();
            }
            if (list.contains(t)){
                list.remove(t);
                System.out.println("读出队列"+t+Thread.currentThread().getName());
            }
            modCount--;
            Write.signal();
//            System.out.println("唤醒写");
        }catch (Exception e){}finally {
            lock.unlock();
        }
        return t;
    }

}

双检锁

为啥使用双重检查呢,首先我们看到和普通的单例多了volatile关键字和synchronized关键字,加synchronized是保证在多线程环境下的访问,加volatile,可以避免指令的重排序,这里的重排序它体现在testSingle成员变量的构造过程,首先在创建引用变量,在堆中分配内存,然后把引用变量关联到堆中的内存。这三步在多线程环境下由于指令重排序,就可能有的线程看到了该实例已创建的问题,从而导致该线程拿到的是空对象进而导致错误。使用volatile在这里利用了是其禁止指令重排序,有的博客将是使用了可见性,有点扯淡了,synchronized难道不保证可见性吗?

class TestSingle{
    private static volatile TestSingle testSingle=null;
    private TestSingle(){}
    public static TestSingle getInstance(){
        if(testSingle==null){
            synchronized (TestSingle.class){
                if (testSingle==null){
                    testSingle=new TestSingle();
                }
            }
        }
        return TestSingle.testSingle;
    }
}

面试回来。。。说年限太低需要综合几个候选人在考虑下,让回来等消息。心塞中。。。


正好分析一下这个Condition的源码,深入学习一下。首先顺着AQS展开来看
,AbstractQueuedSynchronizer位于java.util.concurrent.locks之下所有对他的使用均通过继承来使用,可参考ReentrantLockSync内部类。

 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable{
    static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    }
 }

首先可以看到AbstractQueuedSynchronizer中的静态内部类Node我先介绍下对应的含义

  • prev:前继节点;
  • next:后继节点;
  • nextWaiter:存储condition队列中的后继节点;
  • thread:当前线程。
  • waitStatus 这个字段标识同步队列中节点的状态或者是条件队列中节点的状态

同步队列的话默认状态为0,条件队列默认为CONDITION,节点状态的变更通过该方法compareAndSetWaitStatus把状态更新为-1。下列是该状态的所有对应值的解释。

- CANCELLED:值为1,表示当前节点被取消;
- SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点;
- CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中;
- PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
- 0:无状态,表示当前节点在队列中等待获取锁。

---

接着看我们的阻塞队列的await方法。

首先创建一个条件节点(当前线程,CONDITION)增加到队列中,在添加到队列方法中,先判断队尾的节点不为空并且状态不是等待条件状态,那么直接设置为尾节点,如果队尾也是等待状态,那么久执行入队操作。然后释放当前线程的锁,释放完毕后,遍历AQS的队列,看当前节点是否在队列中,如果不在,说明它还没有竞争锁的资格,所以继续将自己沉睡。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter(); 
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }