Condition
有面试官让手撸代码来来来你手写个阻塞队列,我说用到ReentrantLock的Condition,他说可以,写出来,然后谈谈你为什么要使用这个。我说使用Condition因为阻塞队列有要求,写满无法再写,队列为空无法继续读。又问那你可以使用Synchronized实现吗。我说可以,然后配合2个变量用wait和notifyAll来实现。然后他让我写个双检锁。
class BlockQueen<T>{
private final ReentrantLock lock=new ReentrantLock();
private volatile int modCount=0;
private volatile int cap=10;
private final Condition Write = lock.newCondition();
private final Condition Read = lock.newCondition();
private List<T> list;
public BlockQueen(){ }
public BlockQueen(int caplity){
list=new ArrayList<>(caplity);
cap=caplity;
}
public void put(T t){
lock.lock();
try{
if (modCount==cap){
System.out.println("队列已满,写请求阻塞"+Thread.currentThread().getName());
Write.await();
}
list.add(t);
System.out.println("写入队列"+t+Thread.currentThread().getName());
modCount++;
Read.signal();
// System.out.println("唤醒读");
}catch (Exception e){}finally {
lock.unlock();
}
}
public T pop(T t){
lock.lock();
try{
if (modCount==0){
System.out.println("队列为空,读请求阻塞"+Thread.currentThread().getName());
Read.await();
}
if (list.contains(t)){
list.remove(t);
System.out.println("读出队列"+t+Thread.currentThread().getName());
}
modCount--;
Write.signal();
// System.out.println("唤醒写");
}catch (Exception e){}finally {
lock.unlock();
}
return t;
}
}
双检锁
为啥使用双重检查呢,首先我们看到和普通的单例多了volatile关键字和synchronized关键字,加synchronized是保证在多线程环境下的访问,加volatile,可以避免指令的重排序,这里的重排序它体现在testSingle成员变量的构造过程,首先在创建引用变量,在堆中分配内存,然后把引用变量关联到堆中的内存。这三步在多线程环境下由于指令重排序,就可能有的线程看到了该实例已创建的问题,从而导致该线程拿到的是空对象进而导致错误。使用volatile在这里利用了是其禁止指令重排序,有的博客将是使用了可见性,有点扯淡了,synchronized难道不保证可见性吗?
class TestSingle{
private static volatile TestSingle testSingle=null;
private TestSingle(){}
public static TestSingle getInstance(){
if(testSingle==null){
synchronized (TestSingle.class){
if (testSingle==null){
testSingle=new TestSingle();
}
}
}
return TestSingle.testSingle;
}
}
面试回来。。。说年限太低需要综合几个候选人在考虑下,让回来等消息。心塞中。。。
正好分析一下这个Condition的源码,深入学习一下。首先顺着AQS展开来看
,AbstractQueuedSynchronizer位于java.util.concurrent.locks
之下所有对他的使用均通过继承来使用,可参考ReentrantLock
的Sync
内部类。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable{
static final class Node {
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
}
}
首先可以看到AbstractQueuedSynchronizer
中的静态内部类Node
我先介绍下对应的含义
- prev:前继节点;
- next:后继节点;
- nextWaiter:存储condition队列中的后继节点;
- thread:当前线程。
- waitStatus 这个字段标识同步队列中节点的状态或者是条件队列中节点的状态
同步队列的话默认状态为0,条件队列默认为CONDITION,节点状态的变更通过该方法
compareAndSetWaitStatus
把状态更新为-1。下列是该状态的所有对应值的解释。
- CANCELLED:值为1,表示当前节点被取消;
- SIGNAL:值为-1,表示当前节点的的后继节点将要或者已经被阻塞,在当前节点释放的时候需要unpark后继节点;
- CONDITION:值为-2,表示当前节点在等待condition,即在condition队列中;
- PROPAGATE:值为-3,表示releaseShared需要被传播给后续节点(仅在共享模式下使用);
- 0:无状态,表示当前节点在队列中等待获取锁。
---
接着看我们的阻塞队列的await方法。
首先创建一个条件节点(当前线程,CONDITION)增加到队列中,在添加到队列方法中,先判断队尾的节点不为空并且状态不是等待条件状态,那么直接设置为尾节点,如果队尾也是等待状态,那么久执行入队操作。然后释放当前线程的锁,释放完毕后,遍历AQS的队列,看当前节点是否在队列中,如果不在,说明它还没有竞争锁的资格,所以继续将自己沉睡。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}