type
status
date
slug
summary
tags
category
icon
password
notion image
先来看一下Queue的定义:
public interface Queue<E> extends Collection<E> { //插入成功返回true,如果容量不足,抛出异常 boolean add(E e); //插入成功返回true,否则返回false,对于容量有限的队列,建议使用add(E e) boolean offer(E e); //获取并移除队列头,如果队列为空,则抛出NoSuchElementException异常 E remove(); //获取并移除队列头,如果队列为空,则返回null E poll(); //获取但不移除队列头,如果队列为空,则抛出NoSuchElementException异常 E element(); //获取但不移除队列头,如果队列为空,则返回null E peek(); }
这些方法的区别:
操作
抛出异常
返回具体值
插入
add(e)
offer(e)
移除
remove()
poll()
检查(只查看)
element()
peek()

BlockingQueue

BlockingQueue也是一个接口,继承自Queue。
public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); // 阻塞插入,等待队列有剩余容量才插入 void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; // 阻塞获取,一直等到数列不为空才出队 E take() throws InterruptedException; E poll(long timeout, TimeUnit unit) throws InterruptedException; // 获取剩余容量,但是在多线程的情况下不准 int remainingCapacity(); boolean remove(Object o); public boolean contains(Object o); //移除队列中所有的元素,并添加到指定的集合中 int drainTo(Collection<? super E> c); //从队列中最多移除maxElements个元素,并添加到指定集合中 int drainTo(Collection<? super E> c, int maxElements); }

如何理解这个“blocking”?

  • *对于插入(入队)操作:**如果是无边界的队列,则直接插入;如果是有边界的队列,则需要等待队列中又空余容量的时候才能插入。
  • *对于出队操作:**当队列不为空时,才会出队。
BlockingQueue还有如下特点:
  • 线程安全
  • 不允许null值

LinkedBlockingQueue

这是基于链表的BlockingQueue实现。
这里还有一个吞吐量的概念:队列每秒可以处理的消息数量,包括两个方面,一方面是发送的数量,一方面是接收的数量。
可以在构造方法传入一个容量,用来避免队列过度扩大,如果不指定容量,默认容量就位Integer.MAX_VALUE
通过ReentranceLockCondition来保证线程安全。
先来看一下链表节点:
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
还有一些主要的fields:
/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); /** * Head of linked list. * Invariant: head.item == null */ transient Node<E> head; /** * Tail of linked list. * Invariant: last.next == null */ private transient Node<E> last; /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
由此知道LinkedBlockingQueue中包含了元素个数,容量边界,以及头尾节点等filed,还包含了用来保证线程安全性的takeLock,putLock,以及条件队列notEmpty和notFull。

插入

public void put(E e) throws InterruptedException { // 不允许null值 if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node<E>(e); // 线程安全操作 final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //如果队列已满,则阻塞 notFull.await(); } // 入队操作 enqueue(node); // 获取当前元素个数 c = count.getAndIncrement(); //如果队列还未满,则唤醒其他线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; }
可以看到,队列不允许null值,并且通过ReentrantLock和Condition保证线程安全。

移除

public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; } private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
操作和插入操作差不多。

ArrayBlockingQueue

基于数组实现的BlockingQueue。
属于典型的有界缓冲区,有固定的容量,一旦创建,容量无法修改。
在构造方法中必须传入一个容量参数,还可以传入一个参数用来指示队列的公平性访问问题;
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
可以看到,传入的fair参数实际上是用在ReentrantLock上的,用来表示用的是公平锁,还是非公平锁。
如果一个线程申请一把公平锁,那么当锁释放的时候,先申请的线程先得到锁,非常公平;
如果一个线程申请的是一把非公平锁,那么当锁释放的时候,后申请的线程可能会先得到锁,有随机性;
使用非公平锁的队列的吞吐量比使用公平锁的对垒的吞吐量要大,通常情况下都是优先使用非公平锁。
对于synchronized关键字来说,它也是一种非公平锁,而且没有任何方式可以将它变成公平锁。

插入

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 队列满则阻塞 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

移除

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }

SynchronousQueue

SynchronousQueue也是一种BlockingQueue,但是它比较特殊,它有如下特点:
  • 并不存储任何元素,即容量为0,数据直接在配对的生产者/消费者之间传递,不会输入到缓冲队列中;
  • 入队和出队线程必须一一匹配,要不然先到的线程会阻塞;
  • 支持公平,非公平策略,默认为非公平策略。公平策略,基于内部的TransferStack结构实现,非公平策略基于内部的TransferQueue结构实现;
  • 基于一种无锁算法实现
从构造方法中可以看出其公平策略不同导致实现的不同:
public SynchronousQueue() { this(false); } public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }
这两个数据结构均是内部抽象类Transfer的实现:
abstract static class Transferer<E> { /** * 执行put和take操作 * * @param e 如果不为空,则交由消费者处理; * 如果为空,请求返回生产者添加的一个item. * @param timed 是否支持超时 * @param nanos 超时时间,纳秒 * @return 如果不为空,表示值被提供或者被接收了; 如果为空, * 则表示操作由于超时失败,或者线程调用Thread.interrupted方法 */ abstract E transfer(E e, boolean timed, long nanos); }
再来看一下它的size方法:
public int size() { return 0; }
不会存储任何元素,容量为0.

插入

// 入队e,如果没有另一个线程接收(出队),则阻塞 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }

移除

// 出队,如果没有另一个线程入队,则阻塞 public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }

PriorityQueue

基于数组(平衡二叉堆,优先堆)实现,元素是有序的,你需要实现Comparable,以方便元素进行比较,如果不实现,则按照自然顺序,如果队列不为空,则第一个元素为最小的元素。
这是一个无界的队列,默认容量大小为11,添加元素的时候会进行扩容。
总结一下特点吧:
  • 基于数组实现,因而可以自由扩容,属于无界队列
  • 有序(根据优先级,而优先级是通过Comparable实现进行比较),小的元素再前面
  • 线程不安全
  • 不支持null值

插入

public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1); size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e); return true; }
可以看到其中有扩容操作,还有对堆的操作:

扩容方式

private void grow(int minCapacity) { int oldCapacity = queue.length; // 如果之前容量小于64,则容量翻倍; 否则增长50% int newCapacity = oldCapacity + ((oldCapacity < 64) ? (oldCapacity + 2) : (oldCapacity >> 1)); // overflow-conscious code if (newCapacity - MAX_ARRAY_SIZE > 0) newCapacity = hugeCapacity(minCapacity); queue = Arrays.copyOf(queue, newCapacity); }

移除

public E poll() { if (size == 0) return null; int s = --size; modCount++; E result = (E) queue[0]; E x = (E) queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); return result; }
依然是对数组的处理,关键点在处理堆的过程上。

PriorityBlockingQueue

PriorityBlockingQueue是一个阻塞队列,既然是阻塞队列,那么肯定有阻塞操作put和take。同时也是线程安全的。
它也是基于数组实现的,和PriorityQueue的存储结构无异,区别有以下几点:
  • PriorityBlockingQueue是线程安全的(ReentrantLock保证);
  • 支持阻塞操作put和take

DelayQueue

DelayQueue也是一个BlockingQueue,内部存储基于PriorityQueue,并通过ReentrantLock保证线程安全操作。
它有如下特点:
  • 支持延迟获取元素(在创建的时候有一个过期时间,过期之后才可以获取元素)
  • PriorityQueue是无界的,因此DelayQueue也是无界的
  • 线程安全的
插入的元素类型必须实现Delay接口:
public interface Delayed extends Comparable<Delayed> { /** * 返回剩余的delay时间 * * @param unit 时间单元 * @return 剩余的时间; 为0或者负数,则表示已经过期 */ long getDelay(TimeUnit unit); }
关键在于出队操作:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { // 此处是关键,获取剩余时间,如已经没有剩余时间则出队 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
使用DelayQueue最常见的的场景有:
  • 缓存系统的设计
    • 使用DelayQueue保存带有效期的缓存元素,使用一个线程轮询DelayQueue,一旦获取到元素则表示缓存到期了;
  • 定时任务调度
    • 使用DelayQueue保存当天要执行的任务和时间,一旦获取到任务就开始执行,Timer中的TimingQueue就是基于DelayQueue实现的。

使用场景总结

队列一般用作基于生产-消费者模型的系统中,下面举一些比较常见的例子:
  • 线程池的设计
    • JDK中的线程池设计使用到了各种BlockingQueue的实现,用来作为线程的容器。
      比如固定线程数量和单个线程的ExecutorService,使用的是LinkedBlockingQueue;
      缓存线程个数的ExecutorService,使用的是SynchronousQueue;
  • 缓存系统的设计
    • 比如前面提到的DelayQueue,元素可以带缓存时间。
  • 定时调度的设计(如Timer)
    • 比如前面提到的DelayQueue
  • Android Framework中队列的使用
    • Android 的AsyncLayoutInflater中使用的ArrayBlockingQueue;
    • AnimationThread中使用了PriorityQueue;
Java反射Android Hook之拦截Activity的启动
姜康
姜康
一个软件工程师
公告
type
status
date
slug
summary
tags
category
icon
password
🎉博客网站重新制作了🎉
👏欢迎更新体验👏