ArrayBlockingQueue
有界阻塞队列,初始化的时候必须要指定队列长度,且指定长度之后不允许进行修改,内部使用数组持有元素。
它的主要属性如下
// 存储队列元素的数组,是个循环数组
final Object[] items;
// 拿数据的索引,用于take,poll,peek,remove方法
int takeIndex;
// 放数据的索引,用于put,offer,add方法
int putIndex;
// 元素个数
int count;
// 可重入锁
final ReentrantLock lock;
// notEmpty条件对象,由lock创建
private final Condition notEmpty;
// notFull条件对象,由lock创建
private final Condition notFull;
可见,ArrayBlockingQueue只有1个锁,同一时刻,要么添加数据,要么删除数据,两者不能并行执行。
插入元素
offer()方法:如果队列已满,直接返回fasle
public boolean offer(E e) {
checkNotNull(e); // 不允许元素为空
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁,保证只有1个线程进行offer
try {
if (count == items.length) // 如果队列已满
return false; // 直接返回false,添加失败
else {
insert(e); // 数组没满的话调用insert方法
return true; // 返回true,添加成功
}
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用offer方法
}
}
其中的insert方法如下:
private void insert(E x) {
items[putIndex] = x; // 元素添加到数组里
putIndex = inc(putIndex); // 放数据索引+1,当索引满了变成0
++count; // 元素个数+1
notEmpty.signal(); // 使用条件对象notEmpty通知,比如使用take方法的时候队列里没有数据,被阻塞。这个时候队列insert了一条数据,需要调用signal进行通知
}
add()方法:如果队列已满,抛出IllegalStateException("Queue full")异常;
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
put()方法:如果队列已满,当前线程会一直阻塞,直到队列中出现空位或响应中断退出
public void put(E e) throws InterruptedException {
checkNotNull(e); // 不允许元素为空
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁(但可相应中断),保证调用put方法的时候只有1个线程
try {
while (count == items.length) // 如果队列满了,阻塞当前线程,并加入到条件对象notFull的等待队列里
notFull.await(); // 线程阻塞并被挂起,同时释放锁
insert(e); // 调用insert方法
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用put方法
}
}
移除元素
poll()方法:如果队列为空,直接返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁,保证调用poll方法的时候只有1个线程
try {
return (count == 0) ? null : extract(); // 如果队列里没元素了,返回null,否则调用extract方法
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用poll方法
}
}
poll()方法内部调用extract()方法:
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]); // 得到取索引位置上的元素
items[takeIndex] = null; // 对应取索引上的数据清空
takeIndex = inc(takeIndex); // 取数据索引+1,当索引满了变成0
--count; // 元素个数-1
notFull.signal(); // 使用条件对象notFull通知,比如使用put方法放数据的时候队列已满,被阻塞。这个时候消费了一条数据,队列没满了,就需要调用signal进行通知
return x; // 返回元素
}
take()方法:如果队列为空,当前线程会一直阻塞,直到队列中有元素插入或响应中断退出;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); // 加锁,保证调用take方法的时候只有1个线程
try {
while (count == 0) // 如果队列空,阻塞当前线程,并加入到条件对象notEmpty的等待队列里
notEmpty.await(); // 线程阻塞并被挂起,同时释放锁
return extract(); // 调用extract方法
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用take方法
}
}
remove()方法:如果队列为空,则抛出NoSuchElementException异常
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁,保证调用remove方法的时候只有1个线程
try {
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { // 遍历元素
if (o.equals(items[i])) { // 两个对象相等的话
removeAt(i); // 调用removeAt方法
return true; // 删除成功,返回true
}
}
return false; // 删除成功,返回false
} finally {
lock.unlock(); // 释放锁,让其他线程可以调用remove方法
}
}
内容来源:Java阻塞队列ArrayBlockingQueue和LinkedBlockingQueue实现原理分析