介绍Java中的队列的用法
包括:DelayQueue,ConcurrentLinkedQueue,BlockingQueue。
DelayQueue
简介
DelayQueue:只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue就是基于PriorityQueue实现的,DelayQueue队列实际上就是将队列元素保存到内部的一个PriorityQueue实例中的(所以也不支持插入null值),DelayQueue只专注于实现队列元素的延时出队。
延迟队列DelayQueue是一个无界阻塞队列,它的队列元素只能在该元素的延迟已经结束(或者说过期)才能被出队。它怎么判断一个元素的延迟是否结束呢,原来DelayQueue队列元素必须是实现了Delayed接口的实例,该接口有一个getDelay方法需要实现,延迟队列就是通过实时的调用元素的该方法来判断当前元素是否延迟已经结束。
既然DelayQueue是基于优先级队列来实现的,那肯定元素也要实现Comparable接口。因为Delayed接口继承了Comparable接口,所以实现Delayed的队列元素也必须要实现Comparable的compareTo方法。延迟队列就是以时间作为比较基准的优先级队列,这个时间即延迟时间,这个时间大都在构造元素的时候就已经设置好,随着程序的运行时间的推移,队列元素的延迟时间逐步到期,DelayQueue就能够基于延迟时间运用优先级队列并配合getDelay方法达到延迟队列中的元素在延迟结束时精准出队。
实例
package org.example.a;
import com.oracle.jrockit.jfr.Producer;
import java.util.concurrent.*;
class DelayTask implements Delayed {
private long delay; //延迟多少纳秒开始执行
private TimeUnit unit;
public DelayTask(long delay, TimeUnit unit){
this.unit = unit;
this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);//统一转换成纳秒计数
}
@Override
public long getDelay(TimeUnit unit) {//延迟剩余时间,单位unit指定
return unit.convert(delay - System.currentTimeMillis(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed o) {//基于getDelay实时延迟剩余时间进行比较
if(this.getDelay(TimeUnit.NANOSECONDS) < o.getDelay(TimeUnit.NANOSECONDS)) //都换算成纳秒计算
return -1;
else if(this.getDelay(TimeUnit.NANOSECONDS) > o.getDelay(TimeUnit.NANOSECONDS)) //都换算成纳秒计算
return 1;
else
return 0;
}
@Override
public String toString() {
return "DelayTask{" +
"delay=" + delay +
", unit=" + unit +
'}';
}
}
public class Demo {
public static void main(String[] args) {
DelayQueue<DelayTask> dq = new DelayQueue();
//入队四个元素,注意它们的延迟时间单位不一样。
dq.offer(new DelayTask(5, TimeUnit.SECONDS));
dq.offer(new DelayTask(2, TimeUnit.MINUTES));
dq.offer(new DelayTask(700, TimeUnit.MILLISECONDS));
dq.offer(new DelayTask(1000, TimeUnit.NANOSECONDS));
while(dq.size() > 0){
try {
System.out.println(dq.take());
}catch (Exception e){
e.printStackTrace();
}
}
}
}
执行结果
DelayTask{delay=1000, unit=NANOSECONDS}
DelayTask{delay=700000000, unit=MILLISECONDS}
DelayTask{delay=5000000000, unit=SECONDS}
DelayTask{delay=120000000000, unit=MINUTES}
ConcurrentLinkedQueue
ConcurrentLinkedQueue与BlockingQueue区别
ConcurrentLinkedQueue与BlockingQueue方法的区别
BlockingQueue
队列类型
BlockingQueue有这几种类型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、DelayedWorkQueue。
ArrayBlockingQueue
简介
ArrayBlockingQueue通过数组实现的FIFO有界阻塞队列,它的大小在实例被初始化的时候就被固定了,不能更改。
该类支持一个可选的公平策略,用于被阻塞等待的线程获取独占锁的排序,因为ArrayBlockingQueue内部的操作都需要获取一个ReentrantLock锁,该锁是支持公平策略的,所以ArrayBlockingQueue的公平策略就直接作用于ReentrantLock锁,决定线程是否有公平获取锁的权利。默认情况下是非公平的,公平模式下队列按照FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。
ArrayBlockingQueue的缺陷
通过源码可以看见,ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。LinkedBlockingQueue就能弥补其低吞吐量的缺陷。
LinkedBlockingQueue
SynchronousQueue
SynchrousQueue是个一个无缓存的队列。因为:SynchrousQueue源码可以看到:isEmpty()始终为true;size()始终返回0。
PriorityBlockingQueue
简介
PriorityBlockingQueue是一个无限容量的阻塞队列。
容量是无限的,所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError。
该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException。
值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。
PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准
PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序
DelayedWorkQueue
简介
为什么不直接使用DelayQueue而要重新实现一个DelayedWorkQueue呢,可能是了方便在实现过程中加入一些扩展。
使用场景
实现重试机制文章来源:https://uudwc.com/A/LakRg
比如当调用接口失败后,把当前调用信息放入delay=10s的元素,然后把元素放入队列,那么这个队列就是一个重试队列。一个线程通过take方法获取需要重试的接口,take返回则接口进行重试,失败则再次放入队列,同时也可以在元素加上重试次数。文章来源地址https://uudwc.com/A/LakRg