阻塞队列

Scroll Down

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

  • 当队列满了的时候无法进行入队列操作
  • 当队列空了的时候无法进行出队列操作
public class MoodBlockQueue<T> {

    int currentSize;
    int maxSize;
    volatile Object[] elements;
    ReentrantLock lock;
    Condition writeLock;
    Condition readLock;

    public MoodBlockQueue(int maxSize) {
        this.currentSize = 0;
        lock = new ReentrantLock();
        writeLock = lock.newCondition();
        readLock = lock.newCondition();
        elements = new Object[maxSize];
        this.maxSize=maxSize;
    }
    public String findQueueData(){
        lock.lock();
        StringBuffer stringBuilder=new StringBuffer("{ ");
        try {
            for (Object o:elements){
                stringBuilder.append(o!=null?o.toString()+",":"");
            }
        } finally {
            lock.unlock();
        }
        stringBuilder.append(" }");
        return stringBuilder.toString();
    }
    public void offer(T t) {
        lock.lock();
        try {
            if (currentSize >= maxSize) {
                System.out.println(Thread.currentThread().getName()+"队列满了无法写入 阻塞:当前队列数据: "+findQueueData());
                writeLock.await();
            }
            elements[currentSize++] = t;
            readLock.signal();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public T poll() {
        lock.lock();
        T t = null;
        try {
            if (currentSize == 0) {
                System.out.println(Thread.currentThread().getName()+"队列空无法读取 阻塞");
                readLock.await();
            }
            t = (T) elements[0];
            Object[] newElements = new Object[maxSize];
            System.arraycopy(elements, 1, newElements, 0, newElements.length-1);
            elements=newElements;
            currentSize--;
            writeLock.signal();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
        return t;
    }
    public static void main(String[] args) throws InterruptedException {
        MoodBlockQueue moodBlockQueue=new MoodBlockQueue(10);
        ExecutorService executor=Executors.newFixedThreadPool(5);
        executor.execute(()->{
            try {
                Thread.sleep(3000);
                while(1==1){
                    System.out.println(Thread.currentThread().getName()+"读取的数据"+moodBlockQueue.poll());
                    System.out.println("当前队列剩余数据: "+moodBlockQueue.findQueueData());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        for(int i=1;i<=5;i++){
            int finalI = i;
            moodBlockQueue.offer(finalI);
        }

        for(int i=8;i<=20;i++){
            int finalI = i;
            moodBlockQueue.offer(finalI);
        }
    }
}