跳到主要内容

前言

BlockingQueue直译为阻塞队列,他的阻塞主要体现在两个方面,分别是出队的时候和入队的时候。

  • 出队

出队的时候如果阻塞队列是空的,那么将被阻塞

  • 入队

入队的时候如果阻塞队列是满的,那么将被阻塞

需要注意的是我们阻塞的都是针对于线程进行阻塞的,其实可以把我们的阻塞队列看成一个消息队列,他们作用其实 是相通的。

image.png

通常我们使用的场景也是,一个线程生产对象,另一个线程消费对象

我们可以看到它也是在一个队列里面

image.png

我们看到BlockingQueue的类图,它是一个接口,继承于Queue,所以他也是一个接口,他也有队列的所有特性,也就是先进先出

核心方法

image.png

我们可以看到他有很多方法,并且有的方法是继承于其父接口的

  • 添加

boolean add(E);

这个方法继承于父接口Queue,作用和普通队列的add方法一致

boolean offer(E e);

这个方法继承于父接口Queue,作用和普通队列的offer方法一致

void put(E e) throws InterruptedException;

将元素添加到队列中,如果队列满了,等待有空间再放入(也就是阻塞线程)。

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;

将元素添加到队列中,如果队列满了,等待有空间再放入(阻塞线程,如果超过等待时间,那么返回fasle)

  • 删除

boolean remove(Object o);

继承与父接口Collection,作用删除一个元素。

E take() throws InterruptedException;

移除队列头元素,如果队列中没有元素,那么等待有元素再去移出(阻塞线程)

E poll(long timeout, TimeUnit unit) throws InterruptedException;

移除队列头元素,如果队列中没有元素,那么等待指定时间有元素再去移出(阻塞线程)

实现类

image.png 我们可以看到它有很多实现类,今天我们主要看ArrayBlockingQueueLinkedBlockingQueue

ArrayBlockingQueue

image.png

我们可以看到ArrayBlockingQueue继承于AbstractQueue同时实现了BlockingQueue,所以它具有一般队列的性质,也同时具有阻塞队列的性质

ArrayBlockingQueue底层使用的是数组,并且它是一个有界队列,简单来说就是只能放置指定元素,我们使用的时候需要手动指定界限

成员变量

    // 序列化ID
private static final long serialVersionUID = -817911632652898426L;
// 存储元素的实现
final Object[] items;
// 已经取出元素的索引,用于下一个poll或者take(出队索引)
int takeIndex;
// 已经插入元素的索引,用于下一个put或者offer(入队索引)
int putIndex;
// 元素个数
int count;
// 用于控制访问的主要锁
final ReentrantLock lock;
// 用于出队的条件
private final Condition notEmpty;
// 用于入队的条件
private final Condition notFull;
// 内部维护的是一个链表,是一个通道,用于在删除的时候修改队列以更新迭代器
transient Itrs itrs = null;

构造方法

一共有三种构造方法,都是必须要指定容量的,我们可以看到第一种和第三种,都调用了第二种构造, 所以核心参数有两个,一个是capacity也就是容量,一个是fair,这个参数是传给ReentrantLock的,用于ReentrantLock的实例化,也就是创建公平锁或者非公平锁,true为公平锁,false为非公平锁,这里就不再叙述ReentrantLock,我之前的文章里面说过

    public ArrayBlockingQueue(int capacity) {
//调用第二种构造方法
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
//容量小于0直接抛出异常
if (capacity <= 0)
throw new IllegalArgumentException();
//进行初始化数组
this.items = new Object[capacity];
//实例化ReentrantLock(公平锁/非公平锁)
lock = new ReentrantLock(fair);
//实例化获取队列元素条件
notEmpty = lock.newCondition();
//实例化插入队列元素条件
notFull = lock.newCondition();
}
//这里看到有第三个参数,这个参数是为了给阻塞队列初始化元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
//调用第二种构造方法
this(capacity, fair);
//拿到lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
int i = 0;
try {
//遍历集合c把元素添加到阻塞队列中
for (E e : c) {
//判断元素e是否为null,如果为null,直接抛出异常
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
//设置插入元素的索引,也就是当前容量,因为已经进行了初始化
putIndex = (i == capacity) ? 0 : i;
} finally {
//释放锁
}
}

添加(入队)

    
public boolean add(E e) {
//调用父类的add方法,没什么好说的
return super.add(e);
}
public void put(E e) throws InterruptedException {
//检查元素e是否为null
checkNotNull(e);
//拿到lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//队列满了,线程阻塞等待
while (count == items.length)
notFull.await();
//真正的元素入队方法
enqueue(e);
} finally {
//释放锁
lock.unlock();
}
}
public boolean offer(E e) {
//判断e是否为null
checkNotNull(e);
//获取lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//判断队列是否满了
if (count == items.length)
//队列满了返回fasle(也就是添加元素失败)
return false;
else {
//真正的元素入队方法
enqueue(e);
//队列没满返回ture(添加元素成功)
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
//需要入队的方法e,超时时间timeout,超时时间单位unit
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//检查元素e是否为null
checkNotNull(e);
//获得超时时间(转换为纳秒)
long nanos = unit.toNanos(timeout);
//获得lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//队列满了,线程阻塞
while (count == items.length) {
//超时时间小于等于0直接返回false
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
//真正的元素入队的方法
enqueue(e);
return true;
} finally {
//释放锁
lock.unlock();
}
}
//真正的元素入队的方法
private void enqueue(E x) {
//先拿到真正存储元素的数组
final Object[] items = this.items;
//通过插入索引来赋值元素
items[putIndex] = x;
//如果索引加一已经达到数组长度,那么将插入索引修改为0,防止数组越界
if (++putIndex == items.length)
putIndex = 0;
//元素个数加一
count++;
notEmpty.signal();
}

删除(出队)

    //这个方法实现的是Queue接口的
public E poll() {
//拿到lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//判断元素个数是否为0,如果为0返回null,否则调用真正的出队方法
return (count == 0) ? null : dequeue();
} finally {
//释放锁
lock.unlock();
}
}
//超时时间timeout,超时时间单位unit
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//获取超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
//获得lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//如果队列中元素为0,线程阻塞
while (count == 0) {
//如果等待时间小于等于0那么返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//代码走到这里证明元素不为0,调用真正的出队方法
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
public E take() throws InterruptedException {
//获得lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lockInterruptibly();
try {
//如果队列元素为0,线程阻塞
while (count == 0)
notEmpty.await();
//元素已经不为0,调用真正的出队方法
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
public boolean remove(Object o) {
//如果o为null 直接返回false,因为队列中不存放null的元素
if (o == null) return false;
//获取真正存放元素的数组
final Object[] items = this.items;
//拿到lock锁
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//如果元素个数大于0,才去删除元素o
if (count > 0) {
//拿到添加元素的索引(入队索引)
final int putIndex = this.putIndex;
//拿到删除元素的索引(出队索引)
int i = takeIndex;
do {
//通过equals方法判断元素是否相等,如果相等就删除并返回true
if (o.equals(items[i])) {
removeAt(i);
return true;
}
//如果++i等于length证明到达数组尾部,从数组头部开始继续遍历
//注意这里数组尾部不一定是入队索引位置
if (++i == items.length)
i = 0;
//从出队索引位置开始遍历,直到到达入队索引位置
} while (i != putIndex);
}
//走到这里证明没有o这个元素,那么返回false
return false;
} finally {
//释放锁
lock.unlock();
}
}
//真正的出队方法
private E dequeue() {
//拿到存放元素的数组
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//拿到出队索引位置的元素
E x = (E) items[takeIndex];
//将出队索引位置置为null
items[takeIndex] = null;
//如果出队索引位置下一个为item.length,那么将其置为0,避免越界
if (++takeIndex == items.length)
takeIndex = 0;
//元素个数减一
count--;
//itrs主要用来记录集合中数据,方便集合在枚举过程中操作(删除)数据
//其实我并不是很了解itrs的作用,请教一下各位大佬!
if (itrs != null)
itrs.elementDequeued();
//通知等待入队的线程
notFull.signal();
//返回元素
return x;
}

小结

  • ArrayBlockingQueue有界阻塞队列,我们使用的时候必须指定一个队列长度,并且使用过程中队列长度不能改变,队列中不能存放null值的元素
  • 因为使用ReentrantLock来保证线程安全,所以支持两种形式的锁,也就是公平锁和非公平锁,操作的是一把锁,如果在高并发下可能会有性能问题
  • 队列满的情况下,会阻塞需要入队的线程,队列空的情况下,会阻塞出队的线程

LinkedBlockingQueue

image.png

我们可以看到LinkedBlockingQueue和ArrayBlockingQueue一样都继承于AbstractQueue并且实现了BlockingQueue接口

LinkedBlockingQueue底层实现方式是一个链表,他既可以是有界的也可以是无界的,因为我们知道链表和数组不同没有固定长度,并且的为了保证并发,使用了两个lock锁来控制

成员变量

    //序列化ID
private static final long serialVersionUID = -6903933977591709194L;
//队列容量,设置的话就是有界队列,不设置就是无界的,但是不能超过(Integer.MAX_VALUE)
private final int capacity;
//元素个数
private final AtomicInteger count = new AtomicInteger();
//链表头
transient Node<E> head;
//链表尾
private transient Node<E> last;
//出队锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队条件
private final Condition notEmpty = takeLock.newCondition();
//入队锁
private final ReentrantLock putLock = new ReentrantLock();
//入队条件
private final Condition notFull = putLock.newCondition();

构造方法

    public LinkedBlockingQueue() {
//调用有参构造,其实就是设置容量为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
//设置容量为capacity
public LinkedBlockingQueue(int capacity) {
//容量不能小于等于0
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
指定链表头和链表尾都是null
last = head = new Node<E>(null);
}
//将集合c的元素全部放入队列中
public LinkedBlockingQueue(Collection<? extends E> c) {
//调用有参构造,其实就是设置容量为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
//拿到入队锁
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//用于计算入队了多少个元素
int n = 0;
//遍历集合c
for (E e : c) {
//队列中不能存放null的元素,如果有null直接抛出空指针异常
if (e == null)
throw new NullPointerException();
//如果n已经等于capacity了证明队列满了
if (n == capacity)
throw new IllegalStateException("Queue full");
//真正的入队方法
enqueue(new Node<E>(e));
++n;
}
//元素个数加n
count.set(n);
} finally {
//释放锁
putLock.unlock();
}
}

添加(入队)

    //实现于Queue接口,需要入队的元素e
public boolean offer(E e) {
//如果e为null直接抛出空指针异常
if (e == null) throw new NullPointerException();
//拿到队列元素个数
final AtomicInteger count = this.count;
//如果队列元素个数等于队列容量大小,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
//实例化一个Node结点,用于添加到队列中
Node<E> node = new Node<E>(e);
//拿到入队锁
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//如果队列元素个数小于队列容量大小
if (count.get() < capacity) {
//执行真正的入队方法
enqueue(node);
//队列元素个数加一
c = count.getAndIncrement();
//如果队列个数加一仍然小于队列容量大小,那么就唤醒一个入队线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放锁
putLock.unlock();
}

if (c == 0)
//唤醒一个出队线程
signalNotEmpty();
return c >= 0;
}
//需要入队的元素e,超时时间timeout,超时时间单位unit
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
//如果e为null的话抛出空指针异常
if (e == null) throw new NullPointerException();
//获取超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
int c = -1;
//获取入队锁
final ReentrantLock putLock = this.putLock;
//获取队列元素个数
final AtomicInteger count = this.count;
//加锁
putLock.lockInterruptibly();
try {
//如果队列元素个数等于队列容量,队列满了阻塞线程
while (count.get() == capacity) {
//如果超时时间小于等于0证明已经超时,返回false
if (nanos <= 0)
return false;
//继续等待
nanos = notFull.awaitNanos(nanos);
}
//真正的入队方法
enqueue(new Node<E>(e));
//队列元素个数加一
c = count.getAndIncrement();
//队列元素个数加一仍然小于队列容量,那么唤醒一个入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
if (c == 0)
//唤醒一个出队线程
signalNotEmpty();
return true;
}
//需要添加的元素e
public void put(E e) throws InterruptedException {
//如果e为null那么直接抛出空指针异常
if (e == null) throw new NullPointerException();
int c = -1;
//创建一个Node结点用于添加
Node<E> node = new Node<E>(e);
//获取入队锁
final ReentrantLock putLock = this.putLock;
//获取队列元素个数
final AtomicInteger count = this.count;
//加锁
putLock.lockInterruptibly();
try {
//如果队列元素个数等于队列容量大小,入队线程阻塞
while (count.get() == capacity) {
notFull.await();
}
//走到这里证明没有被阻塞
//真正的入队方法
enqueue(node);
//队列元素个数加一
c = count.getAndIncrement();
//如果队列元素加一小于队列容量大小,那么唤醒一个入队线程
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
if (c == 0)
//唤醒一个出队线程
signalNotEmpty();
}
//真正的入队方法
private void enqueue(Node<E> node) {
last = last.next = node;
}

删除(出队)

    public E poll() {
//获取队列元素个数
final AtomicInteger count = this.count;
//判断如果元素个数等于0,那么直接返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
//获取出队锁
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lock();
try {
//如果队列元素个数大于0
if (count.get() > 0) {
//调用真正的出队方法
x = dequeue();
//队列元素个数减一
c = count.getAndDecrement();
//如果队列元素个数大于1,那么唤醒一个出队线程
if (c > 1)
notEmpty.signal();
}
} finally {
//释放锁
takeLock.unlock();
}
//如果当前队列元素个数等于队列容量大小
if (c == capacity)
//唤醒一个新的线程
signalNotFull();
return x;
}
//超时时间timemout,超时时间单位unit
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//声明一个引用
E x = null;
int c = -1;
//获取超时时间转换为纳秒
long nanos = unit.toNanos(timeout);
//获取队列元素个数
final AtomicInteger count = this.count;
//获取出队锁
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lockInterruptibly();
try {
//如果队列元素个数为0,通过自旋的方式阻塞线程
while (count.get() == 0) {
//如果超过超时时间,直接返回null
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
//真正的出队方法
x = dequeue();
//队列元素个数减一
c = count.getAndDecrement();
if (c > 1)
//唤醒一个出队线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//唤醒入队线程
signalNotFull();
return x;
}
public E take() throws InterruptedException {
//声明一个引用
E x;
int c = -1;
//获取队列元素个数
final AtomicInteger count = this.count;
//获取出队锁
final ReentrantLock takeLock = this.takeLock;
//加锁
takeLock.lockInterruptibly();
try {
//如果队列元素个数为0,通过自旋的方式阻塞线程
while (count.get() == 0) {
notEmpty.await();
}
//真正的出队方法
x = dequeue();
//队列元素个数减一
c = count.getAndDecrement();
if (c > 1)
//唤醒也给出队线程
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
if (c == capacity)
//唤醒一个入队线程
signalNotFull();
return x;
}
//删除指定元素
public boolean remove(Object o) {
//如果o为null,直接返回false,因为队列中没有值为null的元素
if (o == null) return false;
//加锁
//这个加锁是把入队锁和出队锁都加锁
fullyLock();
try {
//遍历链表,通过equalse方法找到对应的元素然后删除
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
//释放锁
fullyUnlock();
}
}
//真正的出队方法
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h;
head = first;
E x = first.item;
first.item = null;
return x;
}

小结

  • LinkedBlockingQueue,我们使用的时候,既可以有界也可以无界,因为它是一个单向链表,同样的,队列中也不能存放null值的元素
  • 使用俩把锁来保证线程安全,一个入队锁,一个出队锁,这样子做,他的并发量就比ArrayBlockingQueue并发量高很多
  • 他和有界阻塞队列不同,不能指定lock锁是非公平锁还是公平锁,他的入队锁和出队锁都是默认非公平锁