Java队列


Queue

先来看一下Queue的定义:

public interface Queue<E> extends Collection<E> {

      //插入成功返回true,如果容量不足,抛出异常
    boolean add(E e);

      //插入成功返回true,否则返回false,对于容量有限的队列,建议使用add(E e)
    boolean offer(E e);

      //获取并移除队列头,如果队列为空,则抛出NoSuchElementException异常
    E remove();

      //获取并移除队列头,如果队列为空,则返回null
    E poll();

      //获取但不移除队列头,如果队列为空,则抛出NoSuchElementException异常
    E element();

      //获取但不移除队列头,如果队列为空,则返回null
    E peek();
}

这些方法的区别:

操作 抛出异常 返回具体值
插入 add(e) offer(e)
移除 remove() poll()
检查(只查看) element() peek()

BlockingQueue

BlockingQueue也是一个接口,继承自Queue。

public interface BlockingQueue<E> extends Queue<E> {

    boolean add(E e);

    boolean offer(E e);

      // 阻塞插入,等待队列有剩余容量才插入
    void put(E e) throws InterruptedException;

    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

      // 阻塞获取,一直等到数列不为空才出队
    E take() throws InterruptedException;

    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

      // 获取剩余容量,但是在多线程的情况下不准
    int remainingCapacity();

    boolean remove(Object o);

    public boolean contains(Object o);

      //移除队列中所有的元素,并添加到指定的集合中
    int drainTo(Collection<? super E> c);

    //从队列中最多移除maxElements个元素,并添加到指定集合中
    int drainTo(Collection<? super E> c, int maxElements);
}

如何理解这个“blocking”?

对于插入(入队)操作:如果是无边界的队列,则直接插入;如果是有边界的队列,则需要等待队列中又空余容量的时候才能插入。

对于出队操作:当队列不为空时,才会出队。

BlockingQueue还有如下特点:

  • 线程安全
  • 不允许null值

LinkedBlockingQueue

这是基于链表的BlockingQueue实现。

这里还有一个吞吐量的概念:队列每秒可以处理的消息数量,包括两个方面,一方面是发送的数量,一方面是接收的数量。

可以在构造方法传入一个容量,用来避免队列过度扩大,如果不指定容量,默认容量就位Integer.MAX_VALUE

通过ReentranceLockCondition来保证线程安全。

先来看一下链表节点:

static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

还有一些主要的fields:

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

由此知道LinkedBlockingQueue中包含了元素个数,容量边界,以及头尾节点等filed,还包含了用来保证线程安全性的takeLock,putLock,以及条件队列notEmpty和notFull。

插入

    public void put(E e) throws InterruptedException {
            // 不允许null值
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);

          // 线程安全操作
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                  //如果队列已满,则阻塞
                notFull.await();
            }

              // 入队操作
            enqueue(node);

              // 获取当前元素个数
            c = count.getAndIncrement();

              //如果队列还未满,则唤醒其他线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

        private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

可以看到,队列不允许null值,并且通过ReentrantLock和Condition保证线程安全。

移除

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

操作和插入操作差不多。

ArrayBlockingQueue

基于数组实现的BlockingQueue。

属于典型的有界缓冲区,有固定的容量,一旦创建,容量无法修改。

在构造方法中必须传入一个容量参数,还可以传入一个参数用来指示队列的公平性访问问题;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

        public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

可以看到,传入的fair参数实际上是用在ReentrantLock上的,用来表示用的是公平锁,还是非公平锁。

如果一个线程申请一把公平锁,那么当锁释放的时候,先申请的线程先得到锁,非常公平;

如果一个线程申请的是一把非公平锁,那么当锁释放的时候,后申请的线程可能会先得到锁,有随机性;

使用非公平锁的队列的吞吐量比使用公平锁的对垒的吞吐量要大,通常情况下都是优先使用非公平锁。

对于synchronized关键字来说,它也是一种非公平锁,而且没有任何方式可以将它变成公平锁。

插入

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
              // 队列满则阻塞
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

移除

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

SynchronousQueue

SynchronousQueue也是一种BlockingQueue,但是它比较特殊,它有如下特点:

  • 并不存储任何元素,即容量为0,数据直接在配对的生产者/消费者之间传递,不会输入到缓冲队列中;
  • 入队和出队线程必须一一匹配,要不然先到的线程会阻塞;
  • 支持公平,非公平策略,默认为非公平策略。公平策略,基于内部的TransferStack结构实现,非公平策略基于内部的TransferQueue结构实现;
  • 基于一种无锁算法实现

从构造方法中可以看出其公平策略不同导致实现的不同:

    public SynchronousQueue() {
        this(false);
    }

    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

这两个数据结构均是内部抽象类Transfer的实现:

    abstract static class Transferer<E> {
        /**
         * 执行put和take操作
         *
         * @param e 如果不为空,则交由消费者处理;
         *          如果为空,请求返回生产者添加的一个item.
         * @param timed 是否支持超时
         * @param nanos 超时时间,纳秒
         * @return 如果不为空,表示值被提供或者被接收了; 如果为空,
         *         则表示操作由于超时失败,或者线程调用Thread.interrupted方法
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }

再来看一下它的size方法:

    public int size() {
        return 0;
    }

不会存储任何元素,容量为0.

插入

    // 入队e,如果没有另一个线程接收(出队),则阻塞
        public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        if (transferer.transfer(e, false, 0) == null) {
            Thread.interrupted();
            throw new InterruptedException();
        }
    }

移除

    // 出队,如果没有另一个线程入队,则阻塞
    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

PriorityQueue

基于数组(平衡二叉堆,优先堆)实现,元素是有序的,你需要实现Comparable,以方便元素进行比较,如果不实现,则按照自然顺序,如果队列不为空,则第一个元素为最小的元素。

这是一个无界的队列,默认容量大小为11,添加元素的时候会进行扩容。

总结一下特点吧:

  • 基于数组实现,因而可以自由扩容,属于无界队列
  • 有序(根据优先级,而优先级是通过Comparable实现进行比较),小的元素再前面
  • 线程不安全
  • 不支持null值

插入

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }

可以看到其中有扩容操作,还有对堆的操作:

扩容方式

    private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // 如果之前容量小于64,则容量翻倍; 否则增长50%
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }

移除

    public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }

依然是对数组的处理,关键点在处理堆的过程上。

PriorityBlockingQueue

PriorityBlockingQueue是一个阻塞队列,既然是阻塞队列,那么肯定有阻塞操作put和take。同时也是线程安全的。

它也是基于数组实现的,和PriorityQueue的存储结构无异,区别有以下几点:

  • PriorityBlockingQueue是线程安全的(ReentrantLock保证);
  • 支持阻塞操作put和take

DelayQueue

DelayQueue也是一个BlockingQueue,内部存储基于PriorityQueue,并通过ReentrantLock保证线程安全操作。

它有如下特点:

  • 支持延迟获取元素(在创建的时候有一个过期时间,过期之后才可以获取元素)
  • PriorityQueue是无界的,因此DelayQueue也是无界的
  • 线程安全的

插入的元素类型必须实现Delay接口:

public interface Delayed extends Comparable<Delayed> {

    /**
     * 返回剩余的delay时间
     *
     * @param unit 时间单元
     * @return 剩余的时间; 为0或者负数,则表示已经过期
     */
    long getDelay(TimeUnit unit);
}

关键在于出队操作:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                      // 此处是关键,获取剩余时间,如已经没有剩余时间则出队
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

使用DelayQueue最常见的的场景有:

  • 缓存系统的设计

    使用DelayQueue保存带有效期的缓存元素,使用一个线程轮询DelayQueue,一旦获取到元素则表示缓存到期了;

  • 定时任务调度

    使用DelayQueue保存当天要执行的任务和时间,一旦获取到任务就开始执行,Timer中的TimingQueue就是基于DelayQueue实现的。

使用场景总结

队列一般用作基于生产-消费者模型的系统中,下面举一些比较常见的例子:

  • 线程池的设计

    JDK中的线程池设计使用到了各种BlockingQueue的实现,用来作为线程的容器。

    比如固定线程数量和单个线程的ExecutorService,使用的是LinkedBlockingQueue;

    缓存线程个数的ExecutorService,使用的是SynchronousQueue;

  • 缓存系统的设计

    比如前面提到的DelayQueue,元素可以带缓存时间。

  • 定时调度的设计(如Timer)

    比如前面提到的DelayQueue

  • Android Framework中队列的使用

    • Android 的AsyncLayoutInflater中使用的ArrayBlockingQueue;
    • AnimationThread中使用了PriorityQueue;

文章作者: 姜康
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 姜康 !
评论
 上一篇
Java中的List Java中的List
List ArrayList通过动态数组存储数据的,数组默认长度为10,实际使用过程中可以通过trimToSize()方法剪裁到实际的list大小。 ArrayList是线程不安全的; ArrayList由于通过数组索引定位,所以查找效率比
2020-05-15
下一篇 
Gradle读取配置文件 Gradle读取配置文件
gradle中使用了gradle.properties和local.properties文件. build.gradle 可以直接获取同级目录和上层目录的properties文件. 如果properties文件在其他目录,或者不是.prop
2020-05-15
  目录