
Scroll Down



  • 一段时间后执行。
  • 约定某个时间点执行。



2、Spring Task(不支持集群)








Netty构建延时队列主要用HashedWheelTimer, 构造方法就如下,源码比较简单,我就列举一个构造方法得了,有兴趣可以自行阅读,Netty的HashedWheelTimer实现有两个东西值得关注,分别是pendingTimeouts队列和cancelledTimeouts队列。这两个队列分别记录新添加的定时任务和要取消的定时任务,当workerThread每次循环运行时,它会先将pending-timeouts队列中一定数量的任务移动到它们对应的bucket,并取消掉cancelled-timeouts中所有的任务。由于添加和取消任务可以由任意线程发起,而相应的处理只会在workerThread里,所以为了进一步提高性能,这两个队列都是用了JCTools里面的MPSC(multiple-producer-single-consumer)队列。业务开发的时间轮也要考虑使用队列来提高性能,简化代码我们的时间轮实现可以使用ArrayBlockQuene,要注意的是只有一个Worker线程去执行,好处是处理时间槽不需要考虑同步,缺点在于如果定时任务是超级耗时的,会拖慢时间的性能。

  • ThreadFactory :


  • tickDuration和unit:


  • ticksPerWheel


 public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        if (unit == null) {
            throw new NullPointerException("unit");
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        // Normalize ticksPerWheel to power of two and initialize the wheel.
        wheel = createWheel(ticksPerWheel);
        mask = wheel.length - 1;
        // Convert tickDuration to nanos.
        long duration = unit.toNanos(tickDuration);
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));

        if (duration < MILLISECOND_NANOS) {
            if (logger.isWarnEnabled()) {
                logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
                            tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;

        workerThread = threadFactory.newThread(worker);

        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {



假如当前有三层时间轮:层级滴答设定如下:第一层(0-20 tick=1)第二层(20-400 tick=20)第三层(400-8000 tick=400)

  * 时间轮,采用定长数组记录放置时间格。
private[timer] class TimingWheel(tickMs: Long // 当前时间轮中一格的时间跨度
                                 , wheelSize: Int, // 时间轮的格数
                                 startMs: Long, // 当前时间轮的创建时间
                                 taskCounter: AtomicInteger, // 各层级时间轮共用的任务计数器,用于记录时间轮中总的任务数
                                 queue: DelayQueue[TimerTaskList]) { // 各个层级时间轮共用一个任务队列
    * 当前时间轮的时间跨度,
    * 只能处理时间范围在 [currentTime, currentTime + interval] 之间的延时任务,超过该范围则需要将任务添加到上层时间轮中
  private[this] val interval = tickMs * wheelSize
    * 每一项都对应时间轮中的一格
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
    * 时间轮指针,将时间轮划分为到期部分和未到期部分
  private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs 修剪成 tickMs 的倍数,近似等于创建时间

  // overflowWheel can potentially be updated and read by two concurrent threads through add().
  // Therefore, it needs to be volatile due to the issue of Double-Checked Locking pattern with JVM
    * 对于上层时间轮的引用
  @volatile private[this] var overflowWheel: TimingWheel = null

    * 添加并初始化上层时间轮,在 kafka 的分层时间轮算法设计中,上层时间轮是按需添加的,
    * 只要在当前时间轮容纳不了给定的延时任务时,才会触发将该延时任务提交给上层时间轮管理,
    * 此时如果上层时间轮还未定义,则会调用该方法初始化上层时间轮
    * 这里需要注意的地方就是对应上层时间轮的字段赋值,由方法实现可以看出
    * 上层时间轮中每一个时间格的时间跨度 tickMs 等于当前时间轮的总时间跨度 interval,
    * 而时间格格数仍保持不变,对应的任务计数器 taskCounter 和任务队列 queue 都是全局共用的。
  private[this] def addOverflowWheel(): Unit = {
    synchronized {
      if (overflowWheel == null) {
          * 创建上层时间轮
        overflowWheel = new TimingWheel(
          tickMs = interval, // tickMs 是当前时间轮的时间跨度 interval
          wheelSize = wheelSize, // 时间轮的格数不变
          startMs = currentTime, // 创建时间即当前时间
          taskCounter = taskCounter, // 全局唯一的任务计数器
          queue // 全局唯一的任务队列
    * 用于往时间轮中添加延时任务,该方法接收一个 TimerTaskEntry 类型对象,
    * 即对延时任务 TimerTask 的封装
    * @param timerTaskEntry
    * @return
  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
      * 获取任务的到期时间戳
    val expiration = timerTaskEntry.expirationMs

    if (timerTaskEntry.cancelled) {
      // Cancelled
        * 任务已经被取消,则不应该被添加
    } else if (expiration < currentTime + tickMs) {
      // Already expired
        * 任务已经到期,则不应该被添加
    } else if (expiration < currentTime + interval) {
      // Put in its own bucket
        * 任务正好位于当前时间轮的时间跨度范围内,
        * 依据任务的到期时间查找此任务所属的时间格,并将任务添加到对应的时间格中
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)

      // Set the bucket expiration time
        * 更新对应时间格的时间区间上界,如果是第一次往对应时间格中添加延时任务,则需要将时间格记录到全局任务队列中
      if (bucket.setExpiration(virtualId * tickMs)) {
        // The bucket needs to be enqueued because it was an expired bucket
        // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced
        // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle
        // will pass in the same value and hence return false, thus the bucket with the same expiration will not
        // be enqueued multiple times.
    } else {
        * 已经超出了当前时间轮的时间跨度范围,将任务添加到上层时间轮中
      // Out of the interval. Put it into the parent timer
      if (overflowWheel == null) addOverflowWheel()




业务场景中,由于时间轮是内存的处理,应用重启时候内存的数据需要进行持久化,故而我们的的时间轮需要交互DB。所以在需要使用钩子函数。当应用停机(非kill -9 PID情况),需要把时间轮内部存储的未到期的任务持久化到DB,在应用重启的需要交互DB,获取DB中的记录加载内存,然后更新DB的记录,避免任务被重复处理,加载的数据需要判断是否已经满足完成条件,满足就出发自动处理,反之则不处理。



  • 抽象时间轮主要功能
import java.util.Set;
import java.util.concurrent.TimeUnit;

 * 抽象时间轮
public interface TimeWheel {
     * 添加任务到时间轮中
     * @param task
     * @param delay
     * @param unit
     * @return
    TimeWheelJob addJob(TimeWheelTask task, long delay, TimeUnit unit,TimeWheelCancelHandle cancelHandle);
     * 添加任务到时间轮中
     * @param task
     * @param delay
     * @param unit
     * @return
    TimeWheelJob addJob(TimeWheelTask task, long delay, TimeUnit unit);
     * 停止时间轮
     * @return
    Set<TimeWheelJob> stop();
     * 获取时间轮中要执行的所有的任务
     * @return
    Set<TimeWheelJob> triggerJobs();

  • 抽象时间轮的执行任务
import java.util.Set;
import java.util.concurrent.TimeUnit;

package com.dingxing.common.utils;

 * 抽象时间轮的执行任务
public interface TimeWheelJob {
     * 获取时间轮对象
     * @return
    TimeWheel timer();
     * 获取当前的任务
     * @return
    TimeWheelTask task();

     * 判断任务是否执行
     * @return
    boolean isTrigger();
     * 判断任务是否是否取消
     * @return
    boolean isCancelled();

    boolean cancel();

  • 任务执行
public interface TimeWheelTask {
     * 延时任务
     * @throws Exception
    void run() throws Exception;


  • 任务取消
public interface TimeWheelCancelHandle {
     * 任务取消的时候的处理
     * @throws Exception
    void handle() throws Exception;

  • 时间轮实现.字段展示
     * 时间轮的实例个数
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
     * 时间轮的实例过多标识
    private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
     * 时间轮的实例最大个数
    private static final int INSTANCE_COUNT_LIMIT = 64;
     * 最小时间轮格子
    private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
     * 用于更新work工作状况的原子字段
    private static final AtomicIntegerFieldUpdater<CustomTimerWheel> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(CustomTimerWheel.class, "workerState");
     * work工作任务
    private final Worker worker = new Worker();
     * work工作任务的执行线程
    private Thread workerThread;

     * work工作状况-初始化
    public static final int WORKER_STATE_INIT = 0;
     * work工作状况-运行中
    public static final int WORKER_STATE_STARTED = 1;
     * work工作状况-终止
    public static final int WORKER_STATE_SHUTDOWN = 2;
    // 0 - init, 1 - started, 2 - shut down
    private volatile int workerState;
     * 每刻度的时间间隔
    private long tickDuration;
     * Hash取模的掩码
    private int mask;
     * 启动控制
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
     * 待处理的任务数,原子
    private final AtomicLong pendingTimeJobs = new AtomicLong(0);
     * 最大的待处理的任务数
    private long maxpendingTimeJobs;
    // 时间槽数组
    private final HashWheelBucket[] wheel;

    private volatile long startTime = 0;
     * 初始队列的大小
    private volatile int INIT_BLOCK_QUENE_SIZE = 128;
     * 要执行的延时任务的队列
    private final Queue<HashWheelTimeJob> onTriggerJobs = new ArrayBlockingQueue<HashWheelTimeJob>(INIT_BLOCK_QUENE_SIZE);
     * 要取消的延时任务的队列
    private final Queue<HashWheelTimeJob> cancelledJobs = new ArrayBlockingQueue<HashWheelTimeJob>(INIT_BLOCK_QUENE_SIZE);
  • 构造函数
    public CustomTimerWheel(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxpendingTimeJobs) {

        Optional.ofNullable(threadFactory).orElseThrow((()->new NullPointerException("threadFactory")));
        Optional.ofNullable(unit).orElseThrow((()->new NullPointerException("unit")));
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);

        wheel = createWheel(ticksPerWheel);
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        long duration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));

        if (duration < MILLISECOND_NANOS) {
            if (log.isWarnEnabled()) {
                log.warn("Configured tickDuration %d smaller then %d, using 1ms.",
                        tickDuration, MILLISECOND_NANOS);
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;

        workerThread = threadFactory.newThread(worker);

        this.maxpendingTimeJobs = maxpendingTimeJobs;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {




public class HashedWheelTimerUtils {
    public static void main(String args[])throws Exception{
        CustomTimerWheel customTimerWheel=new CustomTimerWheel();
        CountDownLatch countDownLatch=new CountDownLatch(3);
        System.out.println("start   "+System.currentTimeMillis());
        TimeWheelJob result=customTimerWheel.addJob(()->{System.out.println("invoke1   "+System.currentTimeMillis());countDownLatch.countDown();},2,TimeUnit.SECONDS);
        customTimerWheel.addJob(()->{System.out.println("invoke2   "+System.currentTimeMillis());countDownLatch.countDown();},10 ,TimeUnit.SECONDS);
        customTimerWheel.addJob(()->{System.out.println("invoke3   "+System.currentTimeMillis());countDownLatch.countDown();},20,TimeUnit.SECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                log.info("Shutting down the service... ");
                log.info("The TimeWheel is shutdown!");



start   1591632180380
00:03:00.398 [pool-1-thread-1] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
invoke1   1591632182383
invoke2   1591632190383
Disconnected from the target VM, address: '', transport: 'socket'
00:03:12.053 [Thread-0] INFO com.dingxing.common.utils.HashedWheelTimerUtils - Shutting down the service... 
00:03:12.053 [Thread-0] INFO com.dingxing.common.utils.CustomTimerWheel - begin stop timeWheel...
00:03:12.054 [Thread-0] INFO com.dingxing.common.utils.HashedWheelTimerUtils - The TimeWheel is shutdown!

Process finished with exit code 130