美文网首页Java 杂谈java多线程
Java 多线程(九):ArrayBlockingQueue 与

Java 多线程(九):ArrayBlockingQueue 与

作者: 聪明的奇瑞 | 来源:发表于2018-07-11 00:40 被阅读16次

什么是阻塞队列?

  • 阻塞队列与我们平常接触到的普通队列(ArrayList)的最大不同点在于阻塞队列的添加和删除方法都是阻塞的
    • 阻塞添加:当阻塞队列元素已满时,队列会阻塞加入元素的线程,直到队列元素不满时才重新唤醒线程执行元素加入操作
    • 阻塞删除:当队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作

BlockingQueue

  • BlockingQueue 能够解决多线程中如何高效安全的传输数据的问题,帮助我们快速搭建高质量的多线程程序
  • 通过队列,可以使得数据由队列的一端输入,从另一端输出
  • 适用场景:生产者线程在一端生成,消费者线程在另一端消费
BlockingQueue

BlockingQueue 主要方法

  • 插入方法:
    • add(E e):
      • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
      • 成功返回 true,失败抛 IllegalStateException 异常
    • offer(E e,long timeout,TimeUnit unit):
      • 将元素插入到队尾(如果立即可行且不会超过该队列的容量)
      • 可设置超时时间,该方法可以中断
      • 成功返回 true,如果队列已满,返回 false
    • put(E e):
      • 将元素插入到队尾,如果队列已满则一直等待(阻塞)
  • 删除方法:
    • remove(Object o):
      • 移除指定元素,成功返回 true,失败返回 false
    • poll(long timeout, TimeUnit unit):
      • 获取并移除此队列的头元素,在指定等待的时间前一直等到获取元素
    • take():
      • 获取并移除队列头元素,若没有元素则一直阻塞
  • 检查方法:
    • element():
      • 获取但不移除队列的头元素,没有元素则抛异常
    • peek():
      • 获取但不移除队列的头,若队列为空则返回 null

BlockingQueue 基本使用

  • 该例子中使用 ArrayBlockingQueue,生产者(Producer)将字符串插入共享队列中,消费者将它们取出
public class BlockingQueueExample {
    public static void main(String[] args) throws Exception {
        BlockingQueue queue = new ArrayBlockingQueue(3);
        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(producer).start();
        new Thread(consumer).start();
        Thread.sleep(4000);
    }
}
  • 生产者通过 put() 方法将元素插入共享队列中
public class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            queue.put("1");
            Thread.sleep(1000);
            queue.put("2");
            Thread.sleep(1000);
            queue.put("3");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 消费者通过 take() 方法从队列中取出元素,take() 方法会阻塞直到获取元素为止
public class Consumer implements Runnable {

    private BlockingQueue queue = null;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

BlockingQueue 接口实现类与源码分析

ArrayBlockingQueue

  • 分析:
    • 基于数组的阻塞队列实现,内部维护了一个数组用于缓存队列中的数据对象
    • 有两个 Integer 类型的索引,指向添加、获取下一个元素的位置的索引
    • 并通过一个重入锁 ReentrantLock 和两个 Condition 条件来实现阻塞
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 存储数据的数组 */
    final Object[] items;

    /**获取数据的索引,主要用于take,poll,peek,remove方法 */
    int takeIndex;

    /**添加数据的索引,主要用于 put, offer, or add 方法*/
    int putIndex;

    /** 队列元素的个数 */
    int count;


    /** 控制并非访问的锁 */
    final ReentrantLock lock;

    /**notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 */
    private final Condition notEmpty;

    /**notFull条件对象,用于通知put方法队列未满,可执行添加操作 */
    private final Condition notFull;

    /**
       迭代器
     */
    transient Itrs itrs = null;

}

image
  • 分析:
    • add 方法实际上是调用了 offer 方法
    • enqueue(E x) 方法内部通过 putIndex 索引直接将元素添加到数组 item 中,当 putIndex 索引大小等于数组长度时,需要将 putIndex 重新设置为 0,这是因为当前队列元素总是从队头获取,从队尾添加
//add方法实现,间接调用了offer(e)
public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

//offer方法
public boolean offer(E e) {
     checkNotNull(e);//检查元素是否为null
     final ReentrantLock lock = this.lock;
     lock.lock();//加锁
     try {
         if (count == items.length)//判断队列是否满
             return false;
         else {
             enqueue(e);//添加元素到队列
             return true;
         }
     } finally {
         lock.unlock();
     }
 }

//入队操作
private void enqueue(E x) {
    //获取当前数组
    final Object[] items = this.items;
    //通过putIndex索引对数组进行赋值
    items[putIndex] = x;
    //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;//队列中元素数量加1
    //唤醒调用take()方法的线程,执行元素获取操作。
    notEmpty.signal();
}

SJziX.md.png
  • 分析:
    • put 方法是一个阻塞方法,如果队列元素已满,那么当前线程将会被 notFull 条件对象挂起加入到等待队列中,直到有空挡才会唤醒执行添加操作
//put方法,阻塞时可中断
 public void put(E e) throws InterruptedException {
     checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();//该方法可中断
      try {
          //当队列元素个数与数组长度相等时,无法添加元素
          while (count == items.length)
              //将当前调用线程挂起,添加到notFull条件队列中等待唤醒
              notFull.await();
          enqueue(e);//如果队列没有满直接添加。。
      } finally {
          lock.unlock();
      }
  }

LinkedBlockingQueue

  • 基于链表的阻塞队列,内部维护着一个链表构成的缓冲队列,用于缓存队列中的数据对象
  • 在正常情况下链表阻塞队列的吞吐量要高于数组的阻塞队列(ArrayBlockingQueue),因为其内部实现添加和删除操作使用了两个 ReentrantLock 来控制并发执行(插入、获取各有一个锁),而 ArrayBlockingQueue 内部只使用一个 ReentrantLock 控制并发
  • 它与 ArrayBlockingQueue 的 API 几乎一致但内部实现原理不太相同
  • 当创建一个 LinkedBlockingQueue 时,默认阻塞队列中元素的数量大小为 Interger.MAX_VALUE

LinkedBlockingQueue 和 ArrayBlockingQueue 的区别

  • 队列大小有所不同:ArrayBlockingQueue 必须指定队列大小,而 LinkedBlockingQueue 默认为 Integer.MAX_VALUE(当元素添加速度大于移除速度时,需要注意一下,以免内存溢出)
  • 实现结构不同:ArrayBlockingQueue 采用数组实现、而 LinkedBlockingQueue 采用链表实现
  • 由于 ArrayBlockingQueue 采用数组存储队列元素,因此再插入、删除元素时不会产生或销毁任何额外的对象实例,而 LinkedBlockingQueue 每次插入都会生成一个新的结点(Node)对象,这会影响日后 GC 垃圾回收
  • ArrayBlockingQueue 中添加、删除操作只使用一个锁(ReentrantLock),而 LinkedBlockingQueue 添加、删除操作各使用一个锁,因此 LinkedBlockingQueue 的并发吞吐量大于 ArrayBlockingQueue

相关文章

网友评论

    本文标题:Java 多线程(九):ArrayBlockingQueue 与

    本文链接:https://www.haomeiwen.com/subject/nmdcpftx.html