1.整体链路
Kafka → RocksDB → SystemTimer → TimingWheel → Kafka
1public void sendDelay(long delayMs, String topic, String message) { 2 try { 3 DelayMessage delayMessage = new DelayMessage(delayMs, topic, message); 4 rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes()); 5 systemTimer.add(delayMessage); 6 } catch (Exception e) { 7 log.error("sendDelayMessage error delayMs:{}, topic:{}, message:{}", delayMs, topic, message, e); 8 throw new RuntimeException(e); 9 } 10} 11
- Kafka 生产者调用自定义延迟消息方法
- 延迟消息入库(RocksDB)
- 封装任务加入时间轮
- 时间轮触发
- run方法执行,将消息发送kafka,清除RocksDB
2.DelayMessage:延迟任务的最小数据模型
抽象类 TimerTask 的一种任务实现
1private final String message; // 消息体 2private final long expireTime; // 绝对延迟时间 currentTimeMillis + delayMs 3private final String topic; // 主题 4private final Long partitionKey; // 分区 5private final long delayMs; // 相对延迟时间 6
3.RocksDB:消息持久化组件
- 服务恢复后重新加载延迟任务
- 删除超出延迟上限的任务
- 4.SystemTimer:延迟任务调度入口
1.systemTimer.add(delayMessage)
将 DelayMessage/TimerTask 封装为 TimerTaskEntry,
也就是环形数组中,每个桶下面的链表中的一个节点对象
2.advanceClock() 触发 bucket
定时任务线程池,每个500ms检查一次 delayQueue
如果队列中有链表到期(桶的左边界),则推进时间轮,并执行槽内任务
5.TimerTask / TimerTaskEntry:任务包装结构
- TimerTask:最终执行逻辑(调用 KafkaTemplate 发送)
- TimerTaskEntry:环形双向链表节点 + expirationMs(具体的延迟时间)
TimerTask 与 TimerTaskEntry 双向绑定
TimerTask 持有 TimerTaskEntry 的引用
TimerTaskEntry 持有 TimerTask 的引用
1.取消任务
TimerTask 取消任务时,会将持有的 TimerTaskEntry 任务条目从链表中移除,并将引用置为空,
判断是否取消任务时,会根据 TimerTask 持有的引用是否 == this 来判断
2.加入时间轮与时间轮降级
bucket.add(timerTaskEntry) 时,需要先移除当前任务条目在链表中的旧引用,防止重复执行
3.保证线程安全
setTimerTaskEntry 设置条目时,synchronized 加锁,保证一个任务只属于一个条目
6.TimerTaskList:槽位结构与过期时间机制
时间轮(TimingWheel)是一个 存储定时任务的环形队列,
底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
TimerTaskList 是一个环形的双向链表,
链表中的每一项表示的都是定时任务条目(TimerTaskEntry),其中封装了真正的定时任务 TimerTask。
ReentrantReadWriteLock 读写锁
✅添加任务时,加读锁,禁止并发写(flush)干扰。
✅推进时间轮时,flush执行槽内任务时,加写锁,禁止并发读(add)写(flush)修改结构。
7.TimingWheel:时间轮核心实现
基本属性组成
1private Long tickMs; // 时间轮每一格的时间间隔(基本时间跨度) 2private Integer wheelSize; // 时间轮的格子数 3private Long interval; // 时间轮一圈的总体时间跨度 4private Long startMs; // 时间轮的起始时间 5private AtomicInteger taskCounter; // 全局待执行任务计数器 6private DelayQueue<TimerTaskList> queue; // 多层时间轮任务共享队列 7private Long currentTime; // 时间轮当前指针指向的时间,是 tickMs 的整数倍 8private volatile TimingWheel overflowWheel; // 上一轮(基本时间跨度是当前轮的总体时间跨度) 9private TimerTaskList[] buckets; // 环形数组 10
add 方法
expiration:任务的绝对到期时间 (delayMs + Time.getHiresClockMs())
1.expiration < currentTime + tickMs:任务到期,如果没有取消,交由线程池执行
2.expiration < currentTime + interval:任务落到主轮,未到期,加入时间轮,设置到期时间,加入队列
3.expiration >= currentTime + interval:任务超过主轮覆盖范围,加入上一轮(递归调用 add)
advanceClock 方法
尝试推进时间轮,当 timeMs >= currentTime + tickMs 时,
任务条目绝对到期时间 >= 当前时间 + 基本时间跨度,更新时间轮,
如果有父轮,递归尝试推进父轮,父轮的一格,通常等于主轮的一圈。
8.多层时间轮:任务降级流程
假设有三层时间轮
主轮:tickMs(1s)wheelSize(20s)interval(20s)
父轮:tickMs(20s)wheelSize(20s)interval(400s)
爷轮:tickMs(400s)wheelSize(20s)interval(8000s)
📌第一次插入
- expireTime = 450s
- 主轮覆盖 0~20s → 放不下
- 调父轮(20~400s)仍放不下
- 调爷轮(400~8000s)命中
- 爷轮 slot = 400 (覆盖 400~800)
- 插入成功
📌到 400s 时触发 flush
- 链表销毁重建,从爷轮回溯到父轮
- 任务剩余:450 - 400 = 50s
📌重新 add → 回溯至父轮
- currentTime = 400s,父轮覆盖 400~800
- 剩余 50s 落入父轮
- slot 覆盖范围 40~60s
- 插入成功
📌到 440s 时触发 flush
- 任务剩余:50 - 40 = 10s
- 剩余 10s 落入主轮
- Slot 覆盖范围 9~10s
- 插入成功
📌主轮在 450s 触发 flush
- 执行 TimerTask
- 通过 IrcpKafkaTemplate 发送 Kafka
9.服务重启恢复机制:RocksDB → SystemTimer
- RocksIterator 扫描所有 DelayMessage
- 判定是否超出最大延迟
- 加入时间轮
- 删除过期 Key
10.代码实现
IrcpKafkaTemplate (自定义kafka工具类)
1@Service 2@Slf4j 3public class IrcpKafkaTemplate { 4 5 private static KafkaTemplate<String, Object> staticKafkaTemplate; 6 private static RocksDB rocksDB; 7 private final RocksDBManager rocksdbManager; 8 private final SystemTimer systemTimer; 9 private final ScheduledExecutorService executorService; 10 11 @Value("${delay.mq.max.delay.time:86400000}") 12 private long delayMqMaxDelayTime; // 消息从rocksdb中恢复时,若过期时间超过了配置,则会丢弃 13 @Resource 14 private KafkaTemplate<String, Object> kafkaTemplate; 15 @Autowired 16 public IrcpKafkaTemplate(RocksDBManager rocksdbManager) { 17 this.rocksdbManager = rocksdbManager; 18 this.systemTimer = new SystemTimer("kafka-delay-timer"); 19 this.executorService = Executors.newSingleThreadScheduledExecutor(); 20 } 21 22 @PostConstruct 23 public void init() throws RocksDBException { 24 rocksDB = rocksdbManager.getRocksDB(); 25 staticKafkaTemplate = kafkaTemplate; 26 27 // 服务启动时,从RocksDB恢复数据到时间轮 28 List<byte[]> keysToDelete = new ArrayList<>(); 29 try (RocksIterator it = rocksDB.newIterator()) { 30 long l = System.currentTimeMillis(); 31 for (it.seekToFirst(); it.isValid(); it.next()) { 32 String json = new String(it.value()); 33 DelayMessage delayMessage = DelayMessage.fromJson(json); 34 if (delayMessage.getExpireTime() - l > delayMqMaxDelayTime) { 35 log.warn("recover from rocksdb will discard. expireTime:{} - currentTimeMillis:{} > configMaxDelayTime:{}, delayMessage:{}", delayMessage.getExpireTime(), l, delayMqMaxDelayTime, delayMessage.toJson()); 36 keysToDelete.add(delayMessage.getKey()); 37 continue; 38 } 39 log.info("recover from rocksdb delayMessage:{}", delayMessage.toJson()); 40 systemTimer.add(delayMessage); 41 } 42 } 43 keysToDelete.forEach(key -> { 44 try { 45 rocksDB.delete(key); 46 } catch (RocksDBException e) { 47 log.error("failed to delete key from RocksDB. key:{}", key, e); 48 } 49 }); 50 51 executorService.scheduleAtFixedRate(() -> { 52 try { 53 systemTimer.advanceClock(200); 54 } catch (Exception e) { 55 log.error("advanceClock error", e); 56 } 57 }, 5000, 500, TimeUnit.MILLISECONDS); 58 } 59 60 public static KafkaTemplate<String, Object> getKafkaTemplate() { 61 return staticKafkaTemplate; 62 } 63 64 /** 65 * 消息延迟发送 66 * 67 * @param delayMs 延迟时间(毫秒), 延迟时间最低1000ms,建议一般 >=5s 68 * @param topic 需要发送的主题 69 * @param message 需要发送的消息 70 */ 71 public void sendDelay(long delayMs, String topic, String message) { 72 try { 73 DelayMessage delayMessage = new DelayMessage(delayMs, topic, message); 74 rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes()); 75 // 多态 76 systemTimer.add(delayMessage); 77 } catch (Exception e) { 78 log.error("sendDelayMessage error delayMs:{}, topic:{}, message:{}", delayMs, topic, message, e); 79 throw new RuntimeException(e); 80 } 81 } 82 83 /** 84 * 消息延迟发送 且 指定分区 85 * 86 * @param delayMs 延迟时间, 延迟时间最低1000ms,建议一般 >=5s 87 * @param topic 需要发送的主题 88 * @param partitionKey 根据key计算分区 89 * @param message 需要发送的消息 90 */ 91 public void sendDelay(long delayMs, String topic, Long partitionKey, String message) { 92 try { 93 DelayMessage delayMessage = new DelayMessage(delayMs, topic, partitionKey, message); 94 rocksDB.put(delayMessage.getKey(), delayMessage.toJson().getBytes()); 95 systemTimer.add(delayMessage); 96 } catch (Exception e) { 97 log.error("sendDelayMessage error delayMs:{}, topic:{}, partitionKey:{}, message:{}", delayMs, topic, partitionKey, message, e); 98 throw new RuntimeException(e); 99 } 100 } 101 102 /** 103 * 消息正常发送 104 * 105 * @param topic 主题 106 * @param message 消息 107 */ 108 public void send(String topic, String message) { 109 kafkaTemplate.send(topic, message); 110 } 111 112 /** 113 * 消息正常发送 且 指定分区 114 * 115 * @param topic 主题 116 * @param partitionKey 根据key计算分区 117 * @param message 消息 118 */ 119 public void send(String topic, Long partitionKey, String message) { 120 kafkaTemplate.send(topic, String.valueOf(partitionKey), message); 121 } 122 123 public static void removeFromRocksdb(DelayMessage delayMessage) throws RocksDBException { 124 rocksDB.delete(delayMessage.getKey()); 125 } 126 127 @PreDestroy 128 public void stop() { 129 rocksdbManager.close(); 130 executorService.shutdown(); 131 try { 132 if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { 133 executorService.shutdownNow(); 134 } 135 } catch (InterruptedException e) { 136 log.error("stop executorService error", e); 137 executorService.shutdownNow(); 138 } 139 } 140} 141
DelayMessage
1@Slf4j 2public class DelayMessage extends TimerTask { 3 4 public static final String EXPIRE_TIME = "expireTime"; 5 public static final String PARTITION_KEY = "partitionKey"; 6 public static final String TOPIC = "topic"; 7 public static final String MESSAGE = "message"; 8 public static final String DELAY_MS = "delayMs"; 9 10 // 延迟任务信息 11 private final String message; 12 private final long expireTime; 13 private final String topic; 14 private final Long partitionKey; 15 private final long delayMs; 16 17 public DelayMessage(long delay, String topic, String message) { 18 this.delayMs = delay < 1000 ? 1000 : delay; 19 this.expireTime = System.currentTimeMillis() + delayMs; 20 this.topic = topic; 21 this.message = message; 22 this.partitionKey = null; 23 } 24 25 public DelayMessage(long delay, String topic, Long partitionKey, String message) { 26 this.delayMs = delay < 1000 ? 1000 : delay; 27 this.expireTime = System.currentTimeMillis() + delayMs; 28 this.topic = topic; 29 this.partitionKey = partitionKey; 30 this.message = message; 31 } 32 33 public DelayMessage(long delay, long expireTime, String topic, Long partitionKey, String message) { 34 this.delayMs = delay; 35 this.expireTime = expireTime; 36 this.topic = topic; 37 this.partitionKey = partitionKey; 38 this.message = message; 39 } 40 41 public String getTopic() { 42 return topic; 43 } 44 45 public String getMessage() { 46 return message; 47 } 48 49 public byte[] getKey() { 50 return MD5Util.getMd5Str(toJson()).getBytes(StandardCharsets.UTF_8); 51 } 52 53 public byte[] getValue() { 54 return message.getBytes(StandardCharsets.UTF_8); 55 } 56 57 public long getExpireTime() { 58 return expireTime; 59 } 60 61 @Override 62 public Long getDelayMs() { 63 return delayMs; 64 } 65 66 @Override 67 public void run() { 68 log.info("sendToKafka delayMessage: {}", toJson()); 69 try { 70 if (partitionKey == null) { 71 IrcpKafkaTemplate.getKafkaTemplate().send(topic, message); 72 } else { 73 IrcpKafkaTemplate.getKafkaTemplate().send(topic, String.valueOf(partitionKey), message); 74 } 75 76 try { 77 IrcpKafkaTemplate.removeFromRocksdb(this); 78 } catch (Exception e) { 79 log.error("after send kafka, remove from rocksdb error message:{}", toJson(), e); 80 } 81 82 } catch (Exception e) { 83 log.error("sendToKafka error delayMessage: {}", toJson(), e); 84 } 85 } 86 87 public String toJson() { 88 Map<String, String> map = new HashMap<>(); 89 map.put(MESSAGE, message); 90 map.put(TOPIC, topic); 91 if (partitionKey != null) { 92 map.put(PARTITION_KEY, String.valueOf(partitionKey)); 93 } 94 map.put(EXPIRE_TIME, String.valueOf(expireTime)); 95 map.put(DELAY_MS, String.valueOf(delayMs)); 96 return JSONUtil.toJsonStr(map); 97 } 98 99 public static DelayMessage fromJson(String json) { 100 Map<String, String> map = JSONUtil.toBean(json, Map.class); 101 long delayMs = Long.parseLong(map.get(DELAY_MS)); 102 long expireTime = Long.parseLong(map.get(EXPIRE_TIME)); 103 String topic = map.get(TOPIC); 104 String message = map.get(MESSAGE); 105 Long partitionKey = map.get(PARTITION_KEY) == null ? null : Long.parseLong(map.get(PARTITION_KEY)); 106 return new DelayMessage(delayMs, expireTime, topic, partitionKey, message); 107 } 108} 109
SystemTimer
1@Slf4j 2public class SystemTimer implements Timer, Function<TimerTaskEntry, Void> { 3 4 private ExecutorService taskExecutor; // 执行任务线程池 5 private Long tickMs; // 基本时间跨度 6 private Integer wheelSize; // 时间轮的格子数 7 private Long startMs; // 时间轮的起始时间 8 private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>(); 9 private AtomicInteger taskCounter = new AtomicInteger(0); // 记录待执行任务的数量 10 private TimingWheel timingWheel; // 时间轮对象(管理所有任务的分配和延迟触发) 11 12 private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 13 // 读锁用于添加任务 14 private final ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock(); 15 // 写锁用于推进时间轮,flush执行任务 16 private final ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock(); 17 18 public SystemTimer(String executeName) { 19 // 每格1s,总共60格 20 tickMs = 1000L; 21 wheelSize = 60; 22 startMs = Time.getHiresClockMs(); 23 taskExecutor = new ThreadPoolExecutor(10, 20, 24 0L, TimeUnit.MILLISECONDS, 25 new LinkedBlockingQueue<>(Integer.MAX_VALUE), r -> new Thread(r, executeName)); 26 timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue); 27 28 // 每30s打印一次当前待执行任务数量 29 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { 30 log.info("delay message for number of tasks pending execution: {}", size()); 31 }, 0, 30, TimeUnit.SECONDS); 32 } 33 34 @Override 35 public void add(TimerTask timerTask) { 36 readLock.lock(); 37 try { 38 // 将 timerTask 封装为 TimerTaskEntry,记录到期时间 39 addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.getDelayMs() + Time.getHiresClockMs())); 40 } finally { 41 readLock.unlock(); 42 } 43 } 44 45 @Override 46 public boolean advanceClock(long timeoutMs) { 47 try { 48 TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS); 49 if (bucket != null) { 50 // 推进时间轮,不允许写 51 writeLock.lock(); 52 try { 53 while (bucket != null) { 54 timingWheel.advanceClock(bucket.getExpiration()); 55 // 执行槽内任务 56 bucket.flush(this); 57 bucket = delayQueue.poll(); 58 } 59 } finally { 60 writeLock.unlock(); 61 } 62 return true; 63 } else { 64 return false; 65 } 66 } catch (InterruptedException e) { 67 log.error("advanceClock interrupted", e); 68 } 69 return false; 70 } 71 72 @Override 73 public int size() { 74 return taskCounter.get(); 75 } 76 77 @Override 78 public void shutdown() { 79 taskExecutor.shutdown(); 80 } 81 82 private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) { 83 if (!timingWheel.add(timerTaskEntry)) { 84 // 任务没被取消,直接提交线程池执行 85 if (!timerTaskEntry.cancelled()) { 86 taskExecutor.submit(timerTaskEntry.getTimerTask()); 87 } 88 } 89 } 90 91 @Override 92 public Void apply(TimerTaskEntry timerTaskEntry) { 93 addTimerTaskEntry(timerTaskEntry); 94 return null; 95 } 96} 97
TimingWheel
1public class TimingWheel { 2 private Long tickMs; // 时间轮每一格的时间间隔(基本时间跨度) 3 private Integer wheelSize; // 时间轮的格子数 4 private Long interval; // 时间轮一圈的总体时间跨度 5 private Long startMs; // 时间轮的起始时间 6 private AtomicInteger taskCounter; // 全局待执行任务计数器 7 private DelayQueue<TimerTaskList> queue; // 多层时间轮任务共享队列 8 private Long currentTime; // 时间轮当前指针指向的时间,是 tickMs 的整数倍 9 private volatile TimingWheel overflowWheel; // 上一轮(基本时间跨度是当前轮的总体时间跨度) 10 private TimerTaskList[] buckets; // 环形数组 11 12 public TimingWheel(Long tickMs, Integer wheelSize, Long startMs, AtomicInteger taskCounter, DelayQueue<TimerTaskList> queue) { 13 this.tickMs = tickMs; 14 this.wheelSize = wheelSize; 15 this.startMs = startMs; 16 this.taskCounter = taskCounter; 17 this.queue = queue; 18 this.interval = tickMs * wheelSize; 19 this.currentTime = startMs - (startMs % tickMs); 20 this.buckets = new TimerTaskList[wheelSize]; 21 for (int i = 0; i < buckets.length; i++) { 22 buckets[i] = new TimerTaskList(taskCounter); 23 } 24 } 25 26 /** 27 * 将一个任务加入时间轮 28 */ 29 public boolean add(TimerTaskEntry timerTaskEntry) { 30 long expiration = timerTaskEntry.getExpirationMs(); 31 32 // 如果任务已经取消,则不加入 33 if (timerTaskEntry.cancelled()) { 34 // Cancelled 35 return false; 36 } 37 // 任务到期(到期时间 < 当前轮+下一格时间) 38 else if (expiration < currentTime + tickMs) { 39 // Already expired 40 return false; 41 } else if (expiration < currentTime + interval) { 42 // 计算任务应该落入哪一个槽位 43 long virtualId = expiration / tickMs; 44 TimerTaskList bucket = buckets[(int) (virtualId % wheelSize)]; 45 bucket.add(timerTaskEntry); 46 47 // 设置槽位的到期时间 48 if (bucket.setExpiration(virtualId * tickMs)) { 49 queue.offer(bucket); 50 } 51 return true; 52 } 53 // 任务超出当前轮覆盖范围,放入父轮 54 else { 55 // Out of the interval. Put it into the parent timer 56 if (overflowWheel == null) { 57 // 创建父轮,一格时间 = 主轮一圈时间 58 addOverflowWheel(); 59 } 60 return overflowWheel.add(timerTaskEntry); 61 } 62 } 63 64 /** 65 * 尝试推进时间轮的时钟(更新时间轮当前的时钟) 66 */ 67 public void advanceClock(Long timeMs) { 68 if (timeMs >= currentTime + tickMs) { 69 // 更新为目标格子的左边界 70 currentTime = timeMs - (timeMs % tickMs); 71 72 if (overflowWheel != null) { 73 // 推进父轮 74 overflowWheel.advanceClock(currentTime); 75 } 76 } 77 } 78 79 private void addOverflowWheel() { 80 synchronized (this) { 81 if (overflowWheel == null) { 82 overflowWheel = new TimingWheel(interval, wheelSize, currentTime, taskCounter, queue); 83 } 84 } 85 } 86} 87
TimerTask
1public abstract class TimerTask implements Runnable { 2 3 protected Long delayMs = 3000L; 4 protected TimerTaskEntry timerTaskEntry; // 当前任务所在链表节点 5 6 public void cancel() { 7 synchronized (this) { 8 if (timerTaskEntry != null) { 9 timerTaskEntry.remove(); // 从时间轮桶移除 10 } 11 timerTaskEntry = null; 12 } 13 } 14 15 /** 16 * 双向绑定 17 * TimerTask 持有 TimerTaskEntry 的引用 18 * TimerTaskEntry 持有 TimerTask 的引用 19 * <p> 20 * TimerTaskEntry:封装了 TimerTask 和它的调度信息(到期时间 expirationMs、所在桶 list 等). 21 * TimerTask:实际要执行的任务,可能是 DelayMessage、DelayMessage2 等. 22 * </p> 23 * 作用: 24 * 1.取消任务 cancelled() ,通过引用指向判断是否被取消 25 * 2.重入时间轮 SystemTimer#apply(),需要根据 timerTask.remove() --> timerTask.getTimerTaskEntry() 找到旧条目,从原桶中移除,然后再把任务加入新的桶中 26 * 3.保证线程安全,set 方法锁 this,保证一个任务同时只属于一个条目 27 */ 28 public void setTimerTaskEntry(TimerTaskEntry entry) { 29 synchronized (this) { 30 if (timerTaskEntry != null && timerTaskEntry != entry) { 31 timerTaskEntry.remove(); 32 } 33 timerTaskEntry = entry; 34 } 35 } 36 37 @Override 38 public void run() {} 39 40 public TimerTaskEntry getTimerTaskEntry() { 41 return timerTaskEntry; 42 } 43 44 public Long getDelayMs() { 45 return delayMs; 46 } 47} 48
TimerTaskEntry
1public class TimerTaskEntry implements Comparable<TimerTaskEntry> { 2 3 private volatile TimerTaskList list; // 当前所在桶 4 public TimerTaskEntry next; // 前驱 5 public TimerTaskEntry prev; // 后继 6 private TimerTask timerTask; // 包含的任务 7 private Long expirationMs; // 到期时间,用于排序 8 9 public TimerTaskEntry() {} 10 11 public TimerTaskEntry(TimerTask timerTask, Long expirationMs) { 12 if (timerTask != null) { 13 timerTask.setTimerTaskEntry(this); 14 } 15 this.timerTask = timerTask; 16 this.expirationMs = expirationMs; 17 18 } 19 20 /** 21 * 每个 timerTask 内部会持有一个引用 timerTaskEntry,表示当前任务被哪个条目管理 22 * 如果当前任务被取消,则 timerTaskEntry 会被设置为 null,或者 timerTaskEntry 会被设置为其他 timerTaskEntry 23 * @return 24 */ 25 public boolean cancelled() { 26 return timerTask.getTimerTaskEntry() != this; 27 } 28 29 public void remove() { 30 TimerTaskList currentList = list; 31 while (currentList != null) { 32 currentList.remove(this); 33 currentList = list; 34 } 35 } 36 37 @Override 38 public int compareTo(TimerTaskEntry that) { 39 if (that == null) { 40 throw new NullPointerException("TimerTaskEntry is null"); 41 } 42 Long expirationMs1 = this.expirationMs; 43 Long expirationMs2 = that.expirationMs; 44 if (expirationMs1 < expirationMs2) { 45 return -1; 46 } 47 if (expirationMs1 > expirationMs2) { 48 return 1; 49 } 50 return 0; 51 } 52 53 54 public Long getExpirationMs() { 55 return expirationMs; 56 } 57 58 public TimerTask getTimerTask() { 59 return timerTask; 60 } 61 62 public TimerTaskList getList() { 63 return list; 64 } 65 66 public void setList(TimerTaskList list) { 67 this.list = list; 68 } 69} 70
TimerTaskList
1class TimerTaskList implements Delayed { 2 3 private AtomicInteger taskCounter; // 桶内任务数量计数器 4 private TimerTaskEntry root; // 哨兵节点,形成循环双向链表 5 private AtomicLong expiration; // 桶的过期时间(每个格子的左边界) 6 7 public TimerTaskList() {} 8 9 public TimerTaskList(AtomicInteger taskCounter) { 10 this.taskCounter = taskCounter; 11 this.root = new TimerTaskEntry(null, -1L); 12 this.root.next = root; 13 this.root.prev = root; 14 this.expiration = new AtomicLong(-1L); 15 } 16 17 public boolean setExpiration(Long expirationMs) { 18 return expiration.getAndSet(expirationMs) != expirationMs; 19 } 20 21 public Long getExpiration() { 22 return expiration.get(); 23 } 24 25 public synchronized void foreach(Function<TimerTask, Void> f) { 26 TimerTaskEntry entry = root.next; 27 // 遍历循环链表 28 while (entry != root) { 29 TimerTaskEntry nextEntry = entry.next; 30 // 任务未被取消 31 if (!entry.cancelled()) { 32 f.apply(entry.getTimerTask()); 33 } 34 35 entry = nextEntry; 36 } 37 } 38 39 public void add(TimerTaskEntry timerTaskEntry) { 40 boolean done = false; 41 // 循环直到成功加入链表(主要是为了处理并发情况下的重试) 42 while (!done) { 43 // 先移除任务在链表中旧的引用,防止重复执行 44 timerTaskEntry.remove(); 45 // 锁桶 46 synchronized (this) { 47 // 锁住任务条目本身,防止多线程操作统一条目 48 synchronized (timerTaskEntry) { 49 // 检查任务是否已经绑定到桶 50 if (timerTaskEntry.getList() == null) { 51 // 插入到链表尾部(root.prev 指向尾) 52 TimerTaskEntry tail = root.prev; 53 timerTaskEntry.next = root; 54 timerTaskEntry.prev = tail; 55 // 绑定任务和当前桶 56 timerTaskEntry.setList(this); 57 tail.next = timerTaskEntry; 58 root.prev = timerTaskEntry; 59 60 // 任务数 + 1 61 taskCounter.incrementAndGet(); 62 done = true; 63 } 64 } 65 } 66 } 67 } 68 69 public void remove(TimerTaskEntry timerTaskEntry) { 70 synchronized (this) { 71 synchronized (timerTaskEntry) { 72 if (timerTaskEntry.getList() == this) { 73 timerTaskEntry.next.prev = timerTaskEntry.prev; 74 timerTaskEntry.prev.next = timerTaskEntry.next; 75 timerTaskEntry.next = null; 76 timerTaskEntry.prev = null; 77 timerTaskEntry.setList(null); 78 taskCounter.decrementAndGet(); 79 } 80 } 81 } 82 } 83 84 /** 85 * slot【销毁-->重建】 86 * 循环链表判断每个节点是否可以执行 87 * 如果不能执行,降级到下一层时间轮,重新加入链表,否则提交线程池执行 88 */ 89 public void flush(Function<TimerTaskEntry, Void> f) { 90 synchronized (this) { 91 TimerTaskEntry head = root.next; 92 while (head != root) { 93 remove(head); 94 f.apply(head); 95 head = root.next; 96 } 97 // 重置桶过期时间 98 expiration.set(-1L); 99 } 100 } 101 102 @Override 103 public long getDelay(TimeUnit unit) { 104 return unit.convert(Long.max(getExpiration() - Time.getHiresClockMs(), 0), TimeUnit.MILLISECONDS); 105 } 106 107 /** 108 * 按桶的到期时间排序 109 */ 110 @Override 111 public int compareTo(Delayed d) { 112 TimerTaskList other; 113 if (d instanceof TimerTaskList) { 114 other = (TimerTaskList) d; 115 } else { 116 throw new ClassCastException("can not cast to TimerTaskList"); 117 } 118 119 if (getExpiration() < other.getExpiration()) { 120 return -1; 121 } else if (getExpiration() > other.getExpiration()) { 122 return 1; 123 } else { 124 return 0; 125 } 126 } 127} 128
RocksDBManager(管理 RocksDB 实例)
1@Component 2@Slf4j 3public class RocksDBManager { 4 5 private final RocksDB rocksDB; 6 7 public RocksDBManager(@Value("${rocksdb.path}") String rocksPath) throws RocksDBException { 8 RocksDB.loadLibrary(); 9 Options options = new Options(); 10 options.setCreateIfMissing(true); 11 String dbPath = rocksPath; 12 log.info("RocksDBManager get dbPath:{}", dbPath); 13 if (dbPath == null) { 14 dbPath = "rocksdb"; 15 } 16 File dbFile = new File(dbPath); 17 if (!dbFile.exists()) { 18 if (!dbFile.mkdirs()) { 19 throw new RuntimeException("Failed to create rocksdb.path directory: " + dbPath); 20 } 21 } 22 rocksDB = RocksDB.open(options, dbPath); 23 } 24 25 public RocksDB getRocksDB() { 26 return rocksDB; 27 } 28 29 public void close() { 30 try { 31 rocksDB.close(); 32 log.info("rocksDB closed."); 33 } catch (Exception e) { 34 log.error("close RocksDB error", e); 35 } 36 } 37} 38
Timer
1public interface Timer { 2 3 void add(TimerTask timerTask); 4 5 boolean advanceClock(long timeoutMs); 6 7 int size(); 8 9 void shutdown(); 10} 11
Time
1public class Time { 2 3 public static Long getHiresClockMs() { 4 return TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 5 } 6} 7
《基于 Kafka 与时间轮实现高性能延迟消息》 是转载文章,点击查看原文。
