双缓冲队列

为什么要引入双缓冲队列

对前一节中介绍的几种方法,都存在一个问题:在同一时刻,队列的入队和出队是互斥的,即某一刻有且仅有一个操作!

java.util.concurrent.ArrayBlockingQueue:同一时刻只能读或写;

java.util.concurrent.LinkedBlockingQueue:内部虽然使用了读写锁,实现了读写操作的锁分离,但不能在同一时刻进行双写(入队/出队)

假如我们使用两个队列,一个队列专门用来读,另一个队列专门用来写,当读队列空或写队列满时将两个队列互换,这样就可以减少锁竞争,提升写效率。

双缓冲队列的Java实现

public class DoubleBufferQueue<T> extends AbstractQueue<T> implements Queue<T> {

    private Lock readLock = new ReentrantLock();
    private Lock writeLock = new ReentrantLock();

    /**
     * 读队列,出队的时候,从该队列取出元素
     */
    private LinkedList<T> readQueue = new LinkedList<T>();
    /**
     * 写队列,入队的时候,从该队列放入元素
     */
    private LinkedList<T> writeQueue = new LinkedList<T>();

    public DoubleBufferQueue() {
        super();
    }

    /**
     * 添加元素到队尾
     */
    @Override
    public boolean offer(T e) {
        writeLock.lock();
        try {
            return writeQueue.offer(e);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 移除并返问队头元素
     *
     * 当读队列为空时,交换队列
     */
    @Override
    public T poll() {
        readLock.lock();
        try {
            if (readQueue.size() == 0) {
                swap();
            }

            return readQueue.poll();
        } finally {
            readLock.unlock();
        }
    }

    /**
     * 返回队头元素(不移除)
     */
    @Override
    public T peek() {
        readLock.lock();
        try {
            if (readQueue.size() == 0) {
                swap();
            }
            return readQueue.peek();
        } finally {
            readLock.unlock();
        }
    }

    /**
     * 增加元素到队尾
     */
    @Override
    public boolean add(T e) {
        writeLock.lock();
        try {
            return writeQueue.add(e);
        } finally {
            writeLock.unlock();
        }
    }

    /**
     * 批量增加元素到队尾
     */
    @Override
    public boolean addAll(Collection<? extends T> c) {
        writeLock.lock();
        try {
            return writeQueue.addAll(c);
        } finally {
            writeLock.unlock();
        }
    }

    @Override
    public Iterator<T> iterator() {
        throw new NotImplementedException();
    }

    @Override
    public int size() {
        readLock.lock();
        writeLock.lock();
        try {
            return readQueue.size() + writeQueue.size();
        } finally {
            try {
                writeLock.unlock();
            } finally {
                readLock.unlock();
            }
        }
    }

    /**
     * 读队列和写队列交换
     */
    private void swap() {
        writeLock.lock();
        try {
            if (writeQueue.size() > 0) {
                LinkedList<T> tmp = readQueue;
                readQueue = writeQueue;
                writeQueue = tmp;
                tmp = null;
            }
        } finally {
            writeLock.unlock();
        }
    }
}

测试:

输出结果:

双缓冲队列的扩展

上述实现中,相当于在队列内部使用了两个容器,并使用两个锁来进行读写(这里的读是指移除元素)分离。但是,对于写/读操作而言,同一时刻,仅仅分别支持一个线程进行操作,即该双写队列的并发度为2,假如多个线程同时进行插入(或同时删除),仍然会产生大量的锁竞争。

在Java1.7版本的ConcurrentHashMap的实现中,有一种思想是分段锁。套用到这里,我们可以实现一种类似的分段队列:在队列内部,使用多个队列,其中一部分队列用于读,另一部分队列用于写,这样就可以提高同时插入或删除的线程并发数量。

参考

双缓冲队列

brucelt1993/companyCode代码来源

Last updated