前言
定时任务的业务场景主要有如下俩种:
- 一段时间后执行。
- 约定某个时间点执行。
定时任务的主要实践方式:
1、Linux系统中的crontab
2、Spring Task(不支持集群)
3、Quartz(支持集群)
4、Elastic-Job(分布式定时任务)
5、延时队列
时间轮
时间轮顾名思义可以看作是钟表。比如秒针走一圈用时为60秒,走过的刻度数为60,每一个刻度表示1秒。那么时间精度就是1秒。
我们以秒针来看运行来看,当添加一个延时任务A,假如会延迟90秒后才会执行,秒针一圈用时是60秒,那么此时会根据时间轮长度和刻度得到一个圈数LoopNumber
和对应的指针位置index
,此时时间轮会记录该任务的LoopNumber
和index
信息(LoopNumber=1
,index=30
),也是就任务A会绕一圈指向6小时的刻度上。LoopNumber=0,index=30,秒针指向6小时的刻度上,任务A并不会执行,LoopNumber=0不满足要求。每一个刻度代表的是一个时间,比如例子内秒针在30秒和90秒都会指向6刻度上,但是30秒不满足条件是不会执行的。
netty实现
Netty构建延时队列主要用HashedWheelTimer, 构造方法就如下,源码比较简单,我就列举一个构造方法得了,有兴趣可以自行阅读,Netty的HashedWheelTimer实现有两个东西值得关注,分别是pendingTimeouts队列和cancelledTimeouts队列。这两个队列分别记录新添加的定时任务和要取消的定时任务,当workerThread每次循环运行时,它会先将pending-timeouts队列中一定数量的任务移动到它们对应的bucket,并取消掉cancelled-timeouts中所有的任务。由于添加和取消任务可以由任意线程发起,而相应的处理只会在workerThread里,所以为了进一步提高性能,这两个队列都是用了JCTools里面的MPSC(multiple-producer-single-consumer)队列。业务开发的时间轮也要考虑使用队列来提高性能,简化代码我们的时间轮实现可以使用ArrayBlockQuene
,要注意的是只有一个Worker线程去执行,好处是处理时间槽不需要考虑同步,缺点在于如果定时任务是超级耗时的,会拖慢时间的性能。
- ThreadFactory :
表示用于生成工作线程工厂
- tickDuration和unit:
每格的时间间隔,默认100ms
- ticksPerWheel
一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
if (logger.isWarnEnabled()) {
logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
tickDuration, MILLISECOND_NANOS);
}
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
Kafak实现
Kafka的时间轮的实现是TimingWhell
,环形队列数组实现,数组内部存储得是TimerTaskList
对象也是双向链表。列表中的entry对象内部timerTask
就是真正的延时任务,TimerTaskList使用expiration
来记录超时时间,entry对象使用expirationMs
来记录超时时间戳,timerTask内部的delayMs
来记录任务的延时时间。基本和Netty的一致。不一样的是Kafka是多层时间轮,参考钟表的时分秒。
假如当前有三层时间轮:层级滴答设定如下:第一层(0-20 tick=1)第二层(20-400 tick=20)第三层(400-8000 tick=400)
多层时间轮这里就涉及到了一个时间轮升级降级的过程。升级:当到期时间超过本层时间轮所能表达额最大值是就需要升级到上层时间轮,降级:当一个时间任务延时445ms,只能增加到最大那一层。执行了一个滴答吗,还有45ms无法执行了,就得逐层下方到能滴答的层里。来一起瞅瞅构造方法和添加方法。
/**
* 时间轮,采用定长数组记录放置时间格。
*/
private[timer] class TimingWheel(tickMs: Long // 当前时间轮中一格的时间跨度
, wheelSize: Int, // 时间轮的格数
startMs: Long, // 当前时间轮的创建时间
taskCounter: AtomicInteger, // 各层级时间轮共用的任务计数器,用于记录时间轮中总的任务数
queue: DelayQueue[TimerTaskList]) { // 各个层级时间轮共用一个任务队列
/**
* 当前时间轮的时间跨度,
* 只能处理时间范围在 [currentTime, currentTime + interval] 之间的延时任务,超过该范围则需要将任务添加到上层时间轮中
*/
private[this] val interval = tickMs * wheelSize
/**
* 每一项都对应时间轮中的一格
*/
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
/**
* 时间轮指针,将时间轮划分为到期部分和未到期部分
*/
private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs 修剪成 tickMs 的倍数,近似等于创建时间
// overflowWheel can potentially be updated and read by two concurrent threads through add().
// Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
/**
* 对于上层时间轮的引用
*/
@volatile private[this] var overflowWheel: TimingWheel = null
/**
* 添加并初始化上层时间轮,在 kafka 的分层时间轮算法设计中,上层时间轮是按需添加的,
* 只要在当前时间轮容纳不了给定的延时任务时,才会触发将该延时任务提交给上层时间轮管理,
* 此时如果上层时间轮还未定义,则会调用该方法初始化上层时间轮
*
*
* 这里需要注意的地方就是对应上层时间轮的字段赋值,由方法实现可以看出
* 上层时间轮中每一个时间格的时间跨度 tickMs 等于当前时间轮的总时间跨度 interval,
* 而时间格格数仍保持不变,对应的任务计数器 taskCounter 和任务队列 queue 都是全局共用的。
*/
private[this] def addOverflowWheel(): Unit = {
synchronized {
if (overflowWheel == null) {
/**
* 创建上层时间轮
*/
overflowWheel = new TimingWheel(
tickMs = interval, // tickMs 是当前时间轮的时间跨度 interval
wheelSize = wheelSize, // 时间轮的格数不变
startMs = currentTime, // 创建时间即当前时间
taskCounter = taskCounter, // 全局唯一的任务计数器
queue // 全局唯一的任务队列
)
}
}
}
/**
* 用于往时间轮中添加延时任务,该方法接收一个 TimerTaskEntry 类型对象,
* 即对延时任务 TimerTask 的封装
* @param timerTaskEntry
* @return
*/
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
/**
* 获取任务的到期时间戳
*/
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
// Cancelled
/**
* 任务已经被取消,则不应该被添加
*/
false
} else if (expiration < currentTime + tickMs) {
// Already expired
/**
* 任务已经到期,则不应该被添加
*/
false
} else if (expiration < currentTime + interval) {
// Put in its own bucket
/**
* 任务正好位于当前时间轮的时间跨度范围内,
* 依据任务的到期时间查找此任务所属的时间格,并将任务添加到对应的时间格中
*/
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// Set the bucket expiration time
/**
* 更新对应时间格的时间区间上界,如果是第一次往对应时间格中添加延时任务,则需要将时间格记录到全局任务队列中
*/
if (bucket.setExpiration(virtualId * tickMs)) {
// The bucket needs to be enqueued because it was an expired bucket
// We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
// and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
// will pass in the same value and hence return false, thus the bucket with the same expiration will not
// be enqueued multiple times.
queue.offer(bucket)
}
true
} else {
/**
* 已经超出了当前时间轮的时间跨度范围,将任务添加到上层时间轮中
*/
// Out of the interval. Put it into the parent timer
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
业务背景
最近项目中有个需求,需要对消息推送后N小时之后进行自动业务处理。使用定时任务定时扫描过期时间,浪费资源,且不实时。考虑使用延时队列处理。但是由于公司使用的是原生的Kafka。不支持延时消息,要依托公司的MQ实现,只能是反复消费消息。这样和Kafka的高性能设计相驳,最后考虑采取简版的Netty的时间论算法来实现。
设计分析
业务场景中,由于时间轮是内存的处理,应用重启时候内存的数据需要进行持久化,故而我们的的时间轮需要交互DB
。所以在需要使用钩子函数。当应用停机(非kill -9 PID
情况),需要把时间轮内部存储的未到期的任务持久化到DB
,在应用重启的需要交互DB
,获取DB
中的记录加载内存,然后更新DB
的记录,避免任务被重复处理,加载的数据需要判断是否已经满足完成条件,满足就出发自动处理,反之则不处理。
代码实现
代码设计关系整体如下:按照设计时候分析在时间轮中增加队列来提高时间轮的处理能力。增加任务取消的回调。
- 抽象时间轮主要功能
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 抽象时间轮
*/
public interface TimeWheel {
/**
* 添加任务到时间轮中
* @param task
* @param delay
* @param unit
* @return
*/
TimeWheelJob addJob(TimeWheelTask task, long delay, TimeUnit unit,TimeWheelCancelHandle cancelHandle);
/**
* 添加任务到时间轮中
* @param task
* @param delay
* @param unit
* @return
*/
TimeWheelJob addJob(TimeWheelTask task, long delay, TimeUnit unit);
/**
* 停止时间轮
* @return
*/
Set<TimeWheelJob> stop();
/**
* 获取时间轮中要执行的所有的任务
* @return
*/
Set<TimeWheelJob> triggerJobs();
}
- 抽象时间轮的执行任务
import java.util.Set;
import java.util.concurrent.TimeUnit;
package com.dingxing.common.utils;
/**
* 抽象时间轮的执行任务
*/
public interface TimeWheelJob {
/**
* 获取时间轮对象
* @return
*/
TimeWheel timer();
/**
* 获取当前的任务
* @return
*/
TimeWheelTask task();
/**
* 判断任务是否执行
* @return
*/
boolean isTrigger();
/**
* 判断任务是否是否取消
* @return
*/
boolean isCancelled();
boolean cancel();
}
- 任务执行
@FunctionalInterface
public interface TimeWheelTask {
/**
* 延时任务
* @throws Exception
*/
void run() throws Exception;
}
- 任务取消
@FunctionalInterface
public interface TimeWheelCancelHandle {
/**
* 任务取消的时候的处理
* @throws Exception
*/
void handle() throws Exception;
}
- 时间轮实现.字段展示
/**
* 时间轮的实例个数
*/
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
/**
* 时间轮的实例过多标识
*/
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
/**
* 时间轮的实例最大个数
*/
private static final int INSTANCE_COUNT_LIMIT = 64;
/**
* 最小时间轮格子
*/
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
/**
* 用于更新work工作状况的原子字段
*/
private static final AtomicIntegerFieldUpdater<CustomTimerWheel> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(CustomTimerWheel.class, "workerState");
/**
* work工作任务
*/
private final Worker worker = new Worker();
/**
* work工作任务的执行线程
*/
private Thread workerThread;
/**
* work工作状况-初始化
*/
public static final int WORKER_STATE_INIT = 0;
/**
* work工作状况-运行中
*/
public static final int WORKER_STATE_STARTED = 1;
/**
* work工作状况-终止
*/
public static final int WORKER_STATE_SHUTDOWN = 2;
// 0 - init, 1 - started, 2 - shut down
private volatile int workerState;
/**
* 每刻度的时间间隔
*/
private long tickDuration;
/**
* Hash取模的掩码
*/
private int mask;
/**
* 启动控制
*/
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
/**
* 待处理的任务数,原子
*/
private final AtomicLong pendingTimeJobs = new AtomicLong(0);
/**
* 最大的待处理的任务数
*/
private long maxpendingTimeJobs;
// 时间槽数组
private final HashWheelBucket[] wheel;
private volatile long startTime = 0;
/**
* 初始队列的大小
*/
private volatile int INIT_BLOCK_QUENE_SIZE = 128;
/**
* 要执行的延时任务的队列
*/
private final Queue<HashWheelTimeJob> onTriggerJobs = new ArrayBlockingQueue<HashWheelTimeJob>(INIT_BLOCK_QUENE_SIZE);
/**
* 要取消的延时任务的队列
*/
private final Queue<HashWheelTimeJob> cancelledJobs = new ArrayBlockingQueue<HashWheelTimeJob>(INIT_BLOCK_QUENE_SIZE);
- 构造函数
public CustomTimerWheel(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxpendingTimeJobs) {
Optional.ofNullable(threadFactory).orElseThrow((()->new NullPointerException("threadFactory")));
Optional.ofNullable(unit).orElseThrow((()->new NullPointerException("unit")));
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
if (log.isWarnEnabled()) {
log.warn("Configured tickDuration %d smaller then %d, using 1ms.",
tickDuration, MILLISECOND_NANOS);
}
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
workerThread = threadFactory.newThread(worker);
this.maxpendingTimeJobs = maxpendingTimeJobs;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
性能测试
测试代码如下
@Slf4j
public class HashedWheelTimerUtils {
public static void main(String args[])throws Exception{
CustomTimerWheel customTimerWheel=new CustomTimerWheel();
CountDownLatch countDownLatch=new CountDownLatch(3);
System.out.println("start "+System.currentTimeMillis());
TimeWheelJob result=customTimerWheel.addJob(()->{System.out.println("invoke1 "+System.currentTimeMillis());countDownLatch.countDown();},2,TimeUnit.SECONDS);
customTimerWheel.addJob(()->{System.out.println("invoke2 "+System.currentTimeMillis());countDownLatch.countDown();},10 ,TimeUnit.SECONDS);
customTimerWheel.addJob(()->{System.out.println("invoke3 "+System.currentTimeMillis());countDownLatch.countDown();},20,TimeUnit.SECONDS);
System.out.println(customTimerWheel.triggerJobs().size());
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
log.info("Shutting down the service... ");
customTimerWheel.stop();
log.info("The TimeWheel is shutdown!");
}
});
countDownLatch.await();
}
}
测试结果
start 1591632180380
3
00:03:00.398 [pool-1-thread-1] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
invoke1 1591632182383
invoke2 1591632190383
Disconnected from the target VM, address: '127.0.0.1:63083', transport: 'socket'
00:03:12.053 [Thread-0] INFO com.dingxing.common.utils.HashedWheelTimerUtils - Shutting down the service...
00:03:12.053 [Thread-0] INFO com.dingxing.common.utils.CustomTimerWheel - begin stop timeWheel...
[]
00:03:12.054 [Thread-0] INFO com.dingxing.common.utils.HashedWheelTimerUtils - The TimeWheel is shutdown!
Process finished with exit code 130