基于 Kafka 与时间轮实现高性能延迟消息

作者:master_hl日期:2025/11/19

1.整体链路

Kafka → RocksDB → SystemTimer → TimingWheel → Kafka

1public void sendDelay(long delayMs, String topic, String message) {
2    try {
3        DelayMessage delayMessage = new DelayMessage(delayMs, topic, message);
4        rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes());
5        systemTimer.add(delayMessage);
6    } catch (Exception e) {
7        log.error("sendDelayMessage error delayMs:{}, topic:{}, message:{}", delayMs, topic, message, e);
8        throw new RuntimeException(e);
9    }
10}
11
  • Kafka 生产者调用自定义延迟消息方法
  • 延迟消息入库(RocksDB)
  • 封装任务加入时间轮
  • 时间轮触发
  • run方法执行,将消息发送kafka,清除RocksDB

2.DelayMessage:延迟任务的最小数据模型

抽象类 TimerTask 的一种任务实现

1private final String message; // 消息体
2private final long expireTime; // 绝对延迟时间 currentTimeMillis + delayMs
3private final String topic; // 主题
4private final Long partitionKey; // 分区
5private final long delayMs; // 相对延迟时间
6

3.RocksDB:消息持久化组件

  • 服务恢复后重新加载延迟任务
  • 删除超出延迟上限的任务
    • 4.SystemTimer:延迟任务调度入口

1.systemTimer.add(delayMessage)

将 DelayMessage/TimerTask 封装为 TimerTaskEntry,

也就是环形数组中,每个桶下面的链表中的一个节点对象

2.advanceClock() 触发 bucket

定时任务线程池,每个500ms检查一次 delayQueue

如果队列中有链表到期(桶的左边界),则推进时间轮,并执行槽内任务

5.TimerTask / TimerTaskEntry:任务包装结构

  • TimerTask:最终执行逻辑(调用 KafkaTemplate 发送)
  • TimerTaskEntry:环形双向链表节点 + expirationMs(具体的延迟时间)

TimerTask 与 TimerTaskEntry 双向绑定

TimerTask 持有 TimerTaskEntry 的引用

TimerTaskEntry 持有 TimerTask 的引用

1.取消任务

TimerTask 取消任务时,会将持有的 TimerTaskEntry 任务条目从链表中移除,并将引用置为空,

判断是否取消任务时,会根据 TimerTask 持有的引用是否 == this 来判断

2.加入时间轮与时间轮降级

bucket.add(timerTaskEntry) 时,需要先移除当前任务条目在链表中的旧引用,防止重复执行

3.保证线程安全

setTimerTaskEntry 设置条目时,synchronized 加锁,保证一个任务只属于一个条目

6.TimerTaskList:槽位结构与过期时间机制

img

时间轮(TimingWheel)是一个 存储定时任务的环形队列,

底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)

TimerTaskList 是一个环形的双向链表,

链表中的每一项表示的都是定时任务条目(TimerTaskEntry),其中封装了真正的定时任务 TimerTask。

ReentrantReadWriteLock 读写锁

✅添加任务时,加读锁,禁止并发写(flush)干扰。

✅推进时间轮时,flush执行槽内任务时,加写锁,禁止并发读(add)写(flush)修改结构。

7.TimingWheel:时间轮核心实现

基本属性组成

1private Long tickMs; // 时间轮每一格的时间间隔(基本时间跨度)
2private Integer wheelSize; // 时间轮的格子数
3private Long interval; // 时间轮一圈的总体时间跨度
4private Long startMs; // 时间轮的起始时间
5private AtomicInteger taskCounter; // 全局待执行任务计数器
6private DelayQueue<TimerTaskList> queue; // 多层时间轮任务共享队列
7private Long currentTime; // 时间轮当前指针指向的时间,是 tickMs 的整数倍
8private volatile TimingWheel overflowWheel; // 上一轮(基本时间跨度是当前轮的总体时间跨度)
9private TimerTaskList[] buckets; // 环形数组
10

add 方法

expiration:任务的绝对到期时间 (delayMs + Time.getHiresClockMs())

1.expiration < currentTime + tickMs:任务到期,如果没有取消,交由线程池执行

2.expiration < currentTime + interval:任务落到主轮,未到期,加入时间轮,设置到期时间,加入队列

3.expiration >= currentTime + interval:任务超过主轮覆盖范围,加入上一轮(递归调用 add)

advanceClock 方法

尝试推进时间轮,当 timeMs >= currentTime + tickMs 时,

任务条目绝对到期时间 >= 当前时间 + 基本时间跨度,更新时间轮,

如果有父轮,递归尝试推进父轮,父轮的一格,通常等于主轮的一圈。

8.多层时间轮:任务降级流程

假设有三层时间轮

主轮:tickMs(1s)wheelSize(20s)interval(20s)

父轮:tickMs(20s)wheelSize(20s)interval(400s)

爷轮:tickMs(400s)wheelSize(20s)interval(8000s)

📌第一次插入

  • expireTime = 450s
  • 主轮覆盖 0~20s → 放不下
  • 调父轮(20~400s)仍放不下
  • 调爷轮(400~8000s)命中
  • 爷轮 slot = 400 (覆盖 400~800)
  • 插入成功

📌到 400s 时触发 flush

  • 链表销毁重建,从爷轮回溯到父轮
  • 任务剩余:450 - 400 = 50s

📌重新 add → 回溯至父轮

  • currentTime = 400s,父轮覆盖 400~800
  • 剩余 50s 落入父轮
  • slot 覆盖范围 40~60s
  • 插入成功

📌到 440s 时触发 flush

  • 任务剩余:50 - 40 = 10s
  • 剩余 10s 落入主轮
  • Slot 覆盖范围 9~10s
  • 插入成功

📌主轮在 450s 触发 flush

  • 执行 TimerTask
  • 通过 IrcpKafkaTemplate 发送 Kafka

9.服务重启恢复机制:RocksDB → SystemTimer

  • RocksIterator 扫描所有 DelayMessage
  • 判定是否超出最大延迟
  • 加入时间轮
  • 删除过期 Key

10.代码实现

IrcpKafkaTemplate (自定义kafka工具类)

1@Service
2@Slf4j
3public class IrcpKafkaTemplate {
4
5    private static KafkaTemplate<String, Object> staticKafkaTemplate;
6    private static RocksDB rocksDB;
7    private final RocksDBManager rocksdbManager;
8    private final SystemTimer systemTimer;
9    private final ScheduledExecutorService executorService;
10    
11    @Value("${delay.mq.max.delay.time:86400000}")
12    private long delayMqMaxDelayTime; // 消息从rocksdb中恢复时,若过期时间超过了配置,则会丢弃
13    @Resource
14    private KafkaTemplate<String, Object> kafkaTemplate;
15    @Autowired
16    public IrcpKafkaTemplate(RocksDBManager rocksdbManager) {
17        this.rocksdbManager = rocksdbManager;
18        this.systemTimer = new SystemTimer("kafka-delay-timer");
19        this.executorService = Executors.newSingleThreadScheduledExecutor();
20    }
21
22    @PostConstruct
23    public void init() throws RocksDBException {
24        rocksDB = rocksdbManager.getRocksDB();
25        staticKafkaTemplate = kafkaTemplate;
26
27        // 服务启动时,从RocksDB恢复数据到时间轮
28        List<byte[]> keysToDelete = new ArrayList<>();
29        try (RocksIterator it = rocksDB.newIterator()) {
30            long l = System.currentTimeMillis();
31            for (it.seekToFirst(); it.isValid(); it.next()) {
32                String json = new String(it.value());
33                DelayMessage delayMessage = DelayMessage.fromJson(json);
34                if (delayMessage.getExpireTime() - l > delayMqMaxDelayTime) {
35                    log.warn("recover from rocksdb will discard. expireTime:{} - currentTimeMillis:{} > configMaxDelayTime:{}, delayMessage:{}", delayMessage.getExpireTime(), l, delayMqMaxDelayTime, delayMessage.toJson());
36                    keysToDelete.add(delayMessage.getKey());
37                    continue;
38                }
39                log.info("recover from rocksdb delayMessage:{}", delayMessage.toJson());
40                systemTimer.add(delayMessage);
41            }
42        }
43        keysToDelete.forEach(key -> {
44            try {
45                rocksDB.delete(key);
46            } catch (RocksDBException e) {
47                log.error("failed to delete key from RocksDB. key:{}", key, e);
48            }
49        });
50
51        executorService.scheduleAtFixedRate(() -> {
52            try {
53                systemTimer.advanceClock(200);
54            } catch (Exception e) {
55                log.error("advanceClock error", e);
56            }
57        }, 5000, 500, TimeUnit.MILLISECONDS);
58    }
59
60    public static KafkaTemplate<String, Object> getKafkaTemplate() {
61        return staticKafkaTemplate;
62    }
63
64    /**
65     * 消息延迟发送
66     *
67     * @param delayMs 延迟时间(毫秒), 延迟时间最低1000ms,建议一般 >=5s
68     * @param topic   需要发送的主题
69     * @param message 需要发送的消息
70     */
71    public void sendDelay(long delayMs, String topic, String message) {
72        try {
73            DelayMessage delayMessage = new DelayMessage(delayMs, topic, message);
74            rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes());
75            // 多态
76            systemTimer.add(delayMessage);
77        } catch (Exception e) {
78            log.error("sendDelayMessage error delayMs:{}, topic:{}, message:{}", delayMs, topic, message, e);
79            throw new RuntimeException(e);
80        }
81    }
82
83    /**
84     * 消息延迟发送 且 指定分区
85     *
86     * @param delayMs      延迟时间, 延迟时间最低1000ms,建议一般 >=5s
87     * @param topic        需要发送的主题
88     * @param partitionKey 根据key计算分区
89     * @param message      需要发送的消息
90     */
91    public void sendDelay(long delayMs, String topic, Long partitionKey, String message) {
92        try {
93            DelayMessage delayMessage = new DelayMessage(delayMs, topic, partitionKey, message);
94            rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes());
95            systemTimer.add(delayMessage);
96        } catch (Exception e) {
97            log.error("sendDelayMessage error delayMs:{}, topic:{}, partitionKey:{}, message:{}", delayMs, topic, partitionKey, message, e);
98            throw new RuntimeException(e);
99        }
100    }
101
102    /**
103     * 消息正常发送
104     *
105     * @param topic   主题
106     * @param message 消息
107     */
108    public void send(String topic, String message) {
109        kafkaTemplate.send(topic, message);
110    }
111
112    /**
113     * 消息正常发送 且 指定分区
114     *
115     * @param topic        主题
116     * @param partitionKey 根据key计算分区
117     * @param message      消息
118     */
119    public void send(String topic, Long partitionKey, String message) {
120        kafkaTemplate.send(topic, String.valueOf(partitionKey), message);
121    }
122
123    public static void removeFromRocksdb(DelayMessage delayMessage) throws RocksDBException {
124        rocksDB.delete(delayMessage.getKey());
125    }
126
127    @PreDestroy
128    public void stop() {
129        rocksdbManager.close();
130        executorService.shutdown();
131        try {
132            if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
133                executorService.shutdownNow();
134            }
135        } catch (InterruptedException e) {
136            log.error("stop executorService error", e);
137            executorService.shutdownNow();
138        }
139    }
140}
141

DelayMessage

1@Slf4j
2public class DelayMessage extends TimerTask {
3
4    public static final String EXPIRE_TIME = "expireTime";
5    public static final String PARTITION_KEY = "partitionKey";
6    public static final String TOPIC = "topic";
7    public static final String MESSAGE = "message";
8    public static final String DELAY_MS = "delayMs";
9
10    // 延迟任务信息
11    private final String message;
12    private final long expireTime;
13    private final String topic;
14    private final Long partitionKey;
15    private final long delayMs;
16
17    public DelayMessage(long delay, String topic, String message) {
18        this.delayMs = delay < 1000 ? 1000 : delay;
19        this.expireTime = System.currentTimeMillis() + delayMs;
20        this.topic = topic;
21        this.message = message;
22        this.partitionKey = null;
23    }
24
25    public DelayMessage(long delay, String topic, Long partitionKey, String message) {
26        this.delayMs = delay < 1000 ? 1000 : delay;
27        this.expireTime = System.currentTimeMillis() + delayMs;
28        this.topic = topic;
29        this.partitionKey = partitionKey;
30        this.message = message;
31    }
32
33    public DelayMessage(long delay, long expireTime, String topic, Long partitionKey, String message) {
34        this.delayMs = delay;
35        this.expireTime = expireTime;
36        this.topic = topic;
37        this.partitionKey = partitionKey;
38        this.message = message;
39    }
40
41    public String getTopic() {
42        return topic;
43    }
44
45    public String getMessage() {
46        return message;
47    }
48
49    public byte[] getKey() {
50        return MD5Util.getMd5Str(toJson()).getBytes(StandardCharsets.UTF_8);
51    }
52
53    public byte[] getValue() {
54        return message.getBytes(StandardCharsets.UTF_8);
55    }
56
57    public long getExpireTime() {
58        return expireTime;
59    }
60
61    @Override
62    public Long getDelayMs() {
63        return delayMs;
64    }
65
66    @Override
67    public void run() {
68        log.info("sendToKafka delayMessage: {}", toJson());
69        try {
70            if (partitionKey == null) {
71                IrcpKafkaTemplate.getKafkaTemplate().send(topic, message);
72            } else {
73                IrcpKafkaTemplate.getKafkaTemplate().send(topic, String.valueOf(partitionKey), message);
74            }
75
76            try {
77                IrcpKafkaTemplate.removeFromRocksdb(this);
78            } catch (Exception e) {
79                log.error("after send kafka, remove from rocksdb error message:{}", toJson(), e);
80            }
81
82        } catch (Exception e) {
83            log.error("sendToKafka error delayMessage: {}", toJson(), e);
84        }
85    }
86
87    public String toJson() {
88        Map<String, String> map = new HashMap<>();
89        map.put(MESSAGE, message);
90        map.put(TOPIC, topic);
91        if (partitionKey != null) {
92            map.put(PARTITION_KEY, String.valueOf(partitionKey));
93        }
94        map.put(EXPIRE_TIME, String.valueOf(expireTime));
95        map.put(DELAY_MS, String.valueOf(delayMs));
96        return JSONUtil.toJsonStr(map);
97    }
98
99    public static DelayMessage fromJson(String json) {
100        Map<String, String> map = JSONUtil.toBean(json, Map.class);
101        long delayMs = Long.parseLong(map.get(DELAY_MS));
102        long expireTime = Long.parseLong(map.get(EXPIRE_TIME));
103        String topic = map.get(TOPIC);
104        String message = map.get(MESSAGE);
105        Long partitionKey = map.get(PARTITION_KEY) == null ? null : Long.parseLong(map.get(PARTITION_KEY));
106        return new DelayMessage(delayMs, expireTime, topic, partitionKey, message);
107    }
108}
109

SystemTimer

1@Slf4j
2public class SystemTimer implements Timer, Function<TimerTaskEntry, Void> {
3
4    private ExecutorService taskExecutor; // 执行任务线程池
5    private Long tickMs; // 基本时间跨度
6    private Integer wheelSize; // 时间轮的格子数
7    private Long startMs; // 时间轮的起始时间
8    private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
9    private AtomicInteger taskCounter = new AtomicInteger(0); // 记录待执行任务的数量
10    private TimingWheel timingWheel; // 时间轮对象(管理所有任务的分配和延迟触发)
11
12    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
13    // 读锁用于添加任务
14    private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
15    // 写锁用于推进时间轮,flush执行任务
16    private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
17
18    public SystemTimer(String executeName) {
19        // 每格1s,总共60格
20        tickMs = 1000L;
21        wheelSize = 60;
22        startMs = Time.getHiresClockMs();
23        taskExecutor = new ThreadPoolExecutor(10, 20,
24                0L, TimeUnit.MILLISECONDS,
25                new LinkedBlockingQueue<>(Integer.MAX_VALUE), r -> new Thread(r, executeName));
26        timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue);
27
28        // 每30s打印一次当前待执行任务数量
29        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
30            log.info("delay message for number of tasks pending execution: {}", size());
31        }, 0, 30, TimeUnit.SECONDS);
32    }
33
34    @Override
35    public void add(TimerTask timerTask) {
36        readLock.lock();
37        try {
38            // 将 timerTask 封装为 TimerTaskEntry,记录到期时间
39            addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.getDelayMs() + Time.getHiresClockMs()));
40        } finally {
41            readLock.unlock();
42        }
43    }
44
45    @Override
46    public boolean advanceClock(long timeoutMs) {
47        try {
48            TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
49            if (bucket != null) {
50                // 推进时间轮,不允许写
51                writeLock.lock();
52                try {
53                    while (bucket != null) {
54                        timingWheel.advanceClock(bucket.getExpiration());
55                        // 执行槽内任务
56                        bucket.flush(this);
57                        bucket = delayQueue.poll();
58                    }
59                } finally {
60                    writeLock.unlock();
61                }
62                return true;
63            } else {
64                return false;
65            }
66        } catch (InterruptedException e) {
67            log.error("advanceClock interrupted", e);
68        }
69        return false;
70    }
71
72    @Override
73    public int size() {
74        return taskCounter.get();
75    }
76
77    @Override
78    public void shutdown() {
79        taskExecutor.shutdown();
80    }
81
82    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
83        if (!timingWheel.add(timerTaskEntry)) {
84            // 任务没被取消,直接提交线程池执行
85            if (!timerTaskEntry.cancelled()) {
86                taskExecutor.submit(timerTaskEntry.getTimerTask());
87            }
88        }
89    }
90
91    @Override
92    public Void apply(TimerTaskEntry timerTaskEntry) {
93        addTimerTaskEntry(timerTaskEntry);
94        return null;
95    }
96}
97

TimingWheel

1public class TimingWheel {
2    private Long tickMs; // 时间轮每一格的时间间隔(基本时间跨度)
3    private Integer wheelSize; // 时间轮的格子数
4    private Long interval; // 时间轮一圈的总体时间跨度
5    private Long startMs; // 时间轮的起始时间
6    private AtomicInteger taskCounter; // 全局待执行任务计数器
7    private DelayQueue<TimerTaskList> queue; // 多层时间轮任务共享队列
8    private Long currentTime; // 时间轮当前指针指向的时间,是 tickMs 的整数倍
9    private volatile TimingWheel overflowWheel; // 上一轮(基本时间跨度是当前轮的总体时间跨度)
10    private TimerTaskList[] buckets; // 环形数组
11
12    public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) {
13        this.tickMs = tickMs;
14        this.wheelSize = wheelSize;
15        this.startMs = startMs;
16        this.taskCounter = taskCounter;
17        this.queue = queue;
18        this.interval = tickMs * wheelSize;
19        this.currentTime = startMs - (startMs % tickMs);
20        this.buckets = new TimerTaskList[wheelSize];
21        for (int i = 0; i < buckets.length; i++) {
22            buckets[i] = new TimerTaskList(taskCounter);
23        }
24    }
25
26    /**
27     * 将一个任务加入时间轮
28     */
29    public boolean add(TimerTaskEntry timerTaskEntry) {
30        long expiration = timerTaskEntry.getExpirationMs();
31
32        // 如果任务已经取消,则不加入
33        if (timerTaskEntry.cancelled()) {
34            // Cancelled
35            return false;
36        }
37        // 任务到期(到期时间 < 当前轮+下一格时间)
38        else if (expiration < currentTime + tickMs) {
39            // Already expired
40            return false;
41        } else if (expiration < currentTime + interval) {
42            // 计算任务应该落入哪一个槽位
43            long virtualId = expiration / tickMs;
44            TimerTaskList bucket = buckets[(int) (virtualId % wheelSize)];
45            bucket.add(timerTaskEntry);
46
47            // 设置槽位的到期时间
48            if (bucket.setExpiration(virtualId * tickMs)) {
49                queue.offer(bucket);
50            }
51            return true;
52        }
53        // 任务超出当前轮覆盖范围,放入父轮
54        else {
55            // Out of the interval. Put it into the parent timer
56            if (overflowWheel == null) {
57                // 创建父轮,一格时间 = 主轮一圈时间
58                addOverflowWheel();
59            }
60            return overflowWheel.add(timerTaskEntry);
61        }
62    }
63
64    /**
65     * 尝试推进时间轮的时钟(更新时间轮当前的时钟)
66     */
67    public void advanceClock(Long timeMs) {
68        if (timeMs >= currentTime + tickMs) {
69            // 更新为目标格子的左边界
70            currentTime = timeMs - (timeMs % tickMs);
71
72            if (overflowWheel != null) {
73                // 推进父轮
74                overflowWheel.advanceClock(currentTime);
75            }
76        }
77    }
78
79    private void addOverflowWheel() {
80        synchronized (this) {
81            if (overflowWheel == null) {
82                overflowWheel = new TimingWheel(interval, wheelSize, currentTime, taskCounter, queue);
83            }
84        }
85    }
86}
87

TimerTask

1public abstract class TimerTask implements Runnable {
2
3    protected Long delayMs = 3000L;
4    protected TimerTaskEntry timerTaskEntry; // 当前任务所在链表节点
5
6    public void cancel() {
7        synchronized (this) {
8            if (timerTaskEntry != null) {
9                timerTaskEntry.remove(); // 从时间轮桶移除
10            }
11            timerTaskEntry = null;
12        }
13    }
14
15    /**
16     * 双向绑定
17     * TimerTask 持有 TimerTaskEntry 的引用
18     * TimerTaskEntry 持有 TimerTask 的引用
19     * <p>
20     *     TimerTaskEntry:封装了 TimerTask 和它的调度信息(到期时间 expirationMs、所在桶 list 等).
21     *     TimerTask:实际要执行的任务,可能是 DelayMessage、DelayMessage2 等.
22     * </p>
23     * 作用:
24     * 1.取消任务 cancelled() ,通过引用指向判断是否被取消
25     * 2.重入时间轮 SystemTimer#apply(),需要根据 timerTask.remove() --> timerTask.getTimerTaskEntry() 找到旧条目,从原桶中移除,然后再把任务加入新的桶中
26     * 3.保证线程安全,set 方法锁 this,保证一个任务同时只属于一个条目
27     */
28    public void setTimerTaskEntry(TimerTaskEntry entry) {
29        synchronized (this) {
30            if (timerTaskEntry != null && timerTaskEntry != entry) {
31                timerTaskEntry.remove();
32            }
33            timerTaskEntry = entry;
34        }
35    }
36
37    @Override
38    public void run() {}
39
40    public TimerTaskEntry getTimerTaskEntry() {
41        return timerTaskEntry;
42    }
43
44    public Long getDelayMs() {
45        return delayMs;
46    }
47}
48

TimerTaskEntry

1public class TimerTaskEntry implements Comparable<TimerTaskEntry> {
2
3    private volatile TimerTaskList list; // 当前所在桶
4    public TimerTaskEntry next; // 前驱
5    public TimerTaskEntry prev; // 后继
6    private TimerTask timerTask; // 包含的任务
7    private Long expirationMs; // 到期时间,用于排序
8
9    public TimerTaskEntry() {}
10
11    public TimerTaskEntry(TimerTask timerTask, Long expirationMs) {
12        if (timerTask != null) {
13            timerTask.setTimerTaskEntry(this);
14        }
15        this.timerTask = timerTask;
16        this.expirationMs = expirationMs;
17
18    }
19
20    /**
21     * 每个 timerTask 内部会持有一个引用 timerTaskEntry,表示当前任务被哪个条目管理
22     * 如果当前任务被取消,则 timerTaskEntry 会被设置为 null,或者 timerTaskEntry 会被设置为其他 timerTaskEntry
23     * @return
24     */
25    public boolean cancelled() {
26        return timerTask.getTimerTaskEntry() != this;
27    }
28
29    public void remove() {
30        TimerTaskList currentList = list;
31        while (currentList != null) {
32            currentList.remove(this);
33            currentList = list;
34        }
35    }
36
37    @Override
38    public int compareTo(TimerTaskEntry that) {
39        if (that == null) {
40            throw new NullPointerException("TimerTaskEntry is null");
41        }
42        Long expirationMs1 = this.expirationMs;
43        Long expirationMs2 = that.expirationMs;
44        if (expirationMs1 < expirationMs2) {
45            return -1;
46        }
47        if (expirationMs1 > expirationMs2) {
48            return 1;
49        }
50        return 0;
51    }
52
53
54    public Long getExpirationMs() {
55        return expirationMs;
56    }
57
58    public TimerTask getTimerTask() {
59        return timerTask;
60    }
61
62    public TimerTaskList getList() {
63        return list;
64    }
65
66    public void setList(TimerTaskList list) {
67        this.list = list;
68    }
69}
70

TimerTaskList

1class TimerTaskList implements Delayed {
2    
3    private AtomicInteger taskCounter; // 桶内任务数量计数器
4    private TimerTaskEntry root; // 哨兵节点,形成循环双向链表
5    private AtomicLong expiration; // 桶的过期时间(每个格子的左边界)
6    
7    public TimerTaskList() {}
8
9    public TimerTaskList(AtomicInteger taskCounter) {
10        this.taskCounter = taskCounter;
11        this.root = new TimerTaskEntry(null, -1L);
12        this.root.next = root;
13        this.root.prev = root;
14        this.expiration = new AtomicLong(-1L);
15    }
16
17    public boolean setExpiration(Long expirationMs) {
18        return expiration.getAndSet(expirationMs) != expirationMs;
19    }
20
21    public Long getExpiration() {
22        return expiration.get();
23    }
24
25    public synchronized void foreach(Function<TimerTask, Void> f) {
26        TimerTaskEntry entry = root.next;
27        // 遍历循环链表
28        while (entry != root) {
29            TimerTaskEntry nextEntry = entry.next;
30            // 任务未被取消
31            if (!entry.cancelled()) {
32                f.apply(entry.getTimerTask());
33            }
34
35            entry = nextEntry;
36        }
37    }
38
39    public void add(TimerTaskEntry timerTaskEntry) {
40        boolean done = false;
41        // 循环直到成功加入链表(主要是为了处理并发情况下的重试)
42        while (!done) {
43            // 先移除任务在链表中旧的引用,防止重复执行
44            timerTaskEntry.remove();
45            // 锁桶
46            synchronized (this) {
47                // 锁住任务条目本身,防止多线程操作统一条目
48                synchronized (timerTaskEntry) {
49                    // 检查任务是否已经绑定到桶
50                    if (timerTaskEntry.getList() == null) {
51                        // 插入到链表尾部(root.prev 指向尾)
52                        TimerTaskEntry tail = root.prev;
53                        timerTaskEntry.next = root;
54                        timerTaskEntry.prev = tail;
55                        // 绑定任务和当前桶
56                        timerTaskEntry.setList(this);
57                        tail.next = timerTaskEntry;
58                        root.prev = timerTaskEntry;
59
60                        // 任务数 + 1
61                        taskCounter.incrementAndGet();
62                        done = true;
63                    }
64                }
65            }
66        }
67    }
68
69    public void remove(TimerTaskEntry timerTaskEntry) {
70        synchronized (this) {
71            synchronized (timerTaskEntry) {
72                if (timerTaskEntry.getList() == this) {
73                    timerTaskEntry.next.prev = timerTaskEntry.prev;
74                    timerTaskEntry.prev.next = timerTaskEntry.next;
75                    timerTaskEntry.next = null;
76                    timerTaskEntry.prev = null;
77                    timerTaskEntry.setList(null);
78                    taskCounter.decrementAndGet();
79                }
80            }
81        }
82    }
83
84    /**
85     * slot【销毁-->重建】
86     * 循环链表判断每个节点是否可以执行
87     * 如果不能执行,降级到下一层时间轮,重新加入链表,否则提交线程池执行
88     */
89    public void flush(Function<TimerTaskEntry, Void> f) {
90        synchronized (this) {
91            TimerTaskEntry head = root.next;
92            while (head != root) {
93                remove(head);
94                f.apply(head);
95                head = root.next;
96            }
97            // 重置桶过期时间
98            expiration.set(-1L);
99        }
100    }
101
102    @Override
103    public long getDelay(TimeUnit unit) {
104        return unit.convert(Long.max(getExpiration() - Time.getHiresClockMs(), 0), TimeUnit.MILLISECONDS);
105    }
106
107    /**
108     * 按桶的到期时间排序
109     */
110    @Override
111    public int compareTo(Delayed d) {
112        TimerTaskList other;
113        if (d instanceof TimerTaskList) {
114            other = (TimerTaskList) d;
115        } else {
116            throw new ClassCastException("can not cast to TimerTaskList");
117        }
118
119        if (getExpiration() < other.getExpiration()) {
120            return -1;
121        } else if (getExpiration() > other.getExpiration()) {
122            return 1;
123        } else {
124            return 0;
125        }
126    }
127}
128

RocksDBManager(管理 RocksDB 实例)

1@Component
2@Slf4j
3public class RocksDBManager {
4
5    private final RocksDB rocksDB;
6
7    public RocksDBManager(@Value("${rocksdb.path}") String rocksPath) throws RocksDBException {
8        RocksDB.loadLibrary();
9        Options options = new Options();
10        options.setCreateIfMissing(true);
11        String dbPath = rocksPath;
12        log.info("RocksDBManager get dbPath:{}", dbPath);
13        if (dbPath == null) {
14            dbPath = "rocksdb";
15        }
16        File dbFile = new File(dbPath);
17        if (!dbFile.exists()) {
18            if (!dbFile.mkdirs()) {
19                throw new RuntimeException("Failed to create rocksdb.path directory: " + dbPath);
20            }
21        }
22        rocksDB = RocksDB.open(options, dbPath);
23    }
24
25    public RocksDB getRocksDB() {
26        return rocksDB;
27    }
28
29    public void close() {
30        try {
31            rocksDB.close();
32            log.info("rocksDB closed.");
33        } catch (Exception e) {
34            log.error("close RocksDB error", e);
35        }
36    }
37}
38

Timer

1public interface Timer {
2
3    void add(TimerTask timerTask);
4    
5    boolean advanceClock(long timeoutMs);
6
7    int size();
8
9    void shutdown();
10}
11

Time

1public class Time {
2
3    public static Long getHiresClockMs() {
4        return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
5    }
6}
7

基于 Kafka 与时间轮实现高性能延迟消息》 是转载文章,点击查看原文


相关推荐


Python 的内置函数 sum
IMPYLH2025/11/17

Python 内建函数列表 > Python 的内置函数 sum Python 的内置函数 sum() 是一个用于计算可迭代对象中所有元素之和的高效工具。这个函数可以接受一个可迭代对象(如列表、元组、集合等)作为参数,并返回其中所有元素的总和。 基本用法 numbers = [1, 2, 3, 4, 5] total = sum(numbers) # 返回 15 可选参数 sum() 函数还接受一个可选的第二个参数 start,用于指定求和的初始值。默认情况下 start 为 0。


ios包体积管理方案
denggun123452025/11/16

iOS 包体积优化是一个系统性的工程,需要从代码、资源、第三方库、构建配置等多个维度进行综合管理。下面我将梳理一个全面的 iOS 包体积管理方案。 一、包体积分析 在进行任何优化之前,必须先了解 App 体积到底由什么构成。 使用 Xcode 的 App Thinning Size Report 操作:Archive -> Distribute App -> App Store Connect -> 选择 Ad Hoc 或 App Store -> Next -> 在 "App T


Bash 入门
hubenchang05152025/11/15

#Bash 入门 #Hello World Bash 的内置命令 echo 可以打印文本。例如: $ echo Hello World Hello World echo 命令的 -e 选项激活转义字符的解释。例如: $ echo -e "Hello \n World" Hello World #命令格式 Bash 命令基本遵循以下格式: 命令 参数1 参数2 参数3 ... 例如在 echo Hello World 中,echo 是命令,Hello 是参数1,World 是参数2。 而


Python 的内置函数 issubclass
IMPYLH2025/11/14

Python 内建函数列表 > Python 的内置函数 issubclass Python 的内置函数 issubclass 用于检查一个类是否是另一个类的子类(直接或间接继承)。它是 Python 面向对象编程中类型检查的重要工具。 语法 issubclass(class, classinfo) 参数说明 class:需要检查的类(必须是类对象,不能是实例)classinfo:可以是一个类对象,或者由类对象组成的元组 返回值 返回布尔值: True:如果 class 是 c


Flutter 3.38 发布,快来看看有什么更新吧
恋猫de小郭2025/11/13

在 11 月 13 日的 FlutterFlightPlans 直播中,Flutter 3.38 如期而至,本次版本主要涉及 Dot shorthands、Web 支持增强、性能改进、问题修复和控件预览等方面。 Dot shorthands 在 Dart 3.10 + Flutter 3.38 中开始默认支持 Dot shorthands ,通过 Dot shorthands 可以使用简写方式省略类型前缀,例如使用 .start 而不是 MainAxisAlignment.start : /


HTML 的 <canvas> 标签
hubenchang05152025/11/11

#HTML 的 <canvas> 标签 请查看 HTML 元素帮助手册 了解更多 HTML 元素。 <canvas> 元素可被用来通过 JavaScript(Canvas API 或 WebGL API)绘制图形及图形动画。 #属性 请查看 HTML 元素的全局属性 了解 HTML 元素的全局属性。 height: 该元素占用空间的高度,以 CSS 像素(px)表示,默认为 150。 moz-opaque(废弃): 通过设置这个属性,来控制 canvas 元素是否半透明。如果你不想 c


CCState:为大型 Web 应用设计的状态管理库
温宇飞2025/11/9

CCState 是一个基于 Signal 的状态管理库。它通过三种语义化的信号类型(State、Computed、Command)实现读写能力隔离,并原生支持 async/await 的异步计算,让状态管理变得简单直观。CCState 与框架无关,可与 React、Vue、Solid.js 等任何 UI 框架无缝集成。它在 秒多 等项目中得到验证,为大规模应用而设计。 快速上手 Signal Signal 是一个轻量级的描述对象,它本身不存储值,只是一个"引用"或"标识符"。所有 Signal


为什么你的JavaScript代码总是出bug?这5个隐藏陷阱太坑了!
良山有风来2025/11/7

你是不是经常遇到这样的情况:明明代码看起来没问题,一运行就各种报错?或者测试时好好的,上线后用户反馈bug不断?更气人的是,有时候改了一个小问题,结果引出了三个新问题…… 别担心,这绝对不是你的能力问题。经过多年的观察,我发现大多数JavaScript开发者都会掉进同样的陷阱里。今天我就来帮你揪出这些隐藏的bug制造机,让你的代码质量瞬间提升一个档次! 变量声明那些事儿 很多bug其实从变量声明的那一刻就开始埋下了隐患。看看这段代码,是不是很眼熟? // 反面教材:变量声明混乱 function


【基础算法】DFS中的剪枝与优化
让我们一起加油好吗2025/11/2

文章目录 上文链接一、剪枝与优化1. 排除等效冗余2. 可行性剪枝3. 最优性剪枝4. 优化搜索顺序5. 记忆化搜索 二、OJ 练习1. 数的划分(1) 解题思路(2) 代码实现 2. 小猫爬山(1) 解题思路(2) 代码实现 上文链接 【基础算法】DFS 一、剪枝与优化 剪枝,形象地看,就是剪掉搜索树的分支,从而减小搜索树的规模,排除掉搜索树中没有必要的分支,优化时间复杂度。 在深度优先遍历中,有几种常见的剪枝方法: 1. 排除等效冗余 如


Python 的内置函数 exec
IMPYLH2025/10/30

Python 内建函数列表 > Python 的内置函数 exec Python 的内置函数 exec 是一个强大的动态执行工具,它允许程序在运行时执行以字符串形式提供的 Python 代码。 def eval(source:str|codeobject, /, globals:dict=None, locals:mapping=None): ''' 执行表达式并返回结果 :param source: Python 表达式 :param globals :

首页编辑器站点地图

Copyright © 2025 聚合阅读

License: CC BY-SA 4.0