阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
- 当队列满了的时候无法进行入队列操作
- 当队列空了的时候无法进行出队列操作
public class MoodBlockQueue<T> {
int currentSize;
int maxSize;
volatile Object[] elements;
ReentrantLock lock;
Condition writeLock;
Condition readLock;
public MoodBlockQueue(int maxSize) {
this.currentSize = 0;
lock = new ReentrantLock();
writeLock = lock.newCondition();
readLock = lock.newCondition();
elements = new Object[maxSize];
this.maxSize=maxSize;
}
public String findQueueData(){
lock.lock();
StringBuffer stringBuilder=new StringBuffer("{ ");
try {
for (Object o:elements){
stringBuilder.append(o!=null?o.toString()+",":"");
}
} finally {
lock.unlock();
}
stringBuilder.append(" }");
return stringBuilder.toString();
}
public void offer(T t) {
lock.lock();
try {
if (currentSize >= maxSize) {
System.out.println(Thread.currentThread().getName()+"队列满了无法写入 阻塞:当前队列数据: "+findQueueData());
writeLock.await();
}
elements[currentSize++] = t;
readLock.signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public T poll() {
lock.lock();
T t = null;
try {
if (currentSize == 0) {
System.out.println(Thread.currentThread().getName()+"队列空无法读取 阻塞");
readLock.await();
}
t = (T) elements[0];
Object[] newElements = new Object[maxSize];
System.arraycopy(elements, 1, newElements, 0, newElements.length-1);
elements=newElements;
currentSize--;
writeLock.signal();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
return t;
}
public static void main(String[] args) throws InterruptedException {
MoodBlockQueue moodBlockQueue=new MoodBlockQueue(10);
ExecutorService executor=Executors.newFixedThreadPool(5);
executor.execute(()->{
try {
Thread.sleep(3000);
while(1==1){
System.out.println(Thread.currentThread().getName()+"读取的数据"+moodBlockQueue.poll());
System.out.println("当前队列剩余数据: "+moodBlockQueue.findQueueData());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
for(int i=1;i<=5;i++){
int finalI = i;
moodBlockQueue.offer(finalI);
}
for(int i=8;i<=20;i++){
int finalI = i;
moodBlockQueue.offer(finalI);
}
}
}