一、项目概览
Dragonboat 是纯 Go 实现的(multi-group)Raft 库。
为应用屏蔽 Raft 复杂性,提供易于使用的 NodeHost 和状态机接口。该库(自称)有如下特点:
- 高吞吐、流水线化、批处理;
- 提供了内存/磁盘状态机多种实现;
- 提供了 ReadIndex、成员变更、Leader转移等管理端API;
- 默认使用 Pebble 作为 存储后端。
本次代码串讲以V3的稳定版本为基础,不包括GitHub上v4版本内容。
二、整体架构
三、LogDB 统一存储
LogDB 模块是 Dragonboat 的核心持久化存储层,虽然模块名字有Log,但是它囊括了所有和存储相关的API,负责管理 Raft 协议的所有持久化数据,包括:
Raft状态 (RaftState)
Raft内部状态变更的集合结构
包括但不限于:
- ClusterID/NodeID: 节点ID
- RaftState: Raft任期、投票情况、commit进度
- EntriesToSave:Raft提案日志数据
- Snapshot:快照元数据(包括快照文件路径,快照大小,快照对应的提案Index,快照对应的Raft任期等信息)
- Messages:发给其他节点的Raft消息
- ReadyToReads:ReadIndex就绪的请求
引导信息 (Bootstrap)
1type Bootstrap struct { 2 Addresses map[uint64]string // 初始集群成员 3 Join bool 4 Type StateMachineType 5} 6
ILogDB的API如下:
1type ILogDB interface { 2 3 4 BinaryFormat() uint32 // 返回支持的二进制格式版本号 5 6 7 ListNodeInfo() ([]NodeInfo, error) // 列出 LogDB 中所有可用的节点信息 8 9 10 // 存储集群节点的初始化配置信息,包括是否加入集群、状态机类型等 11 SaveBootstrapInfo(clusterID uint64, nodeID uint64, bootstrap pb.Bootstrap) error 12 13 14 // 获取保存的引导信息 15 GetBootstrapInfo(clusterID uint64, nodeID uint64) (pb.Bootstrap, error) 16 17 18 // 原子性保存 Raft 状态、日志条目和快照元数据 19 SaveRaftState(updates []pb.Update, shardID uint64) error 20 21 22 // 迭代读取指定范围内的连续日志条目 23 IterateEntries(ents []pb.Entry, size uint64, clusterID uint64, nodeID uint64, 24 low uint64, high uint64, maxSize uint64) ([]pb.Entry, uint64, error) 25 26 27 // 读取持久化的 Raft 状态 28 ReadRaftState(clusterID uint64, nodeID uint64, lastIndex uint64) (RaftState, error) 29 30 31 // 删除指定索引之前的所有条目, 日志压缩、快照后清理旧日志 32 RemoveEntriesTo(clusterID uint64, nodeID uint64, index uint64) error 33 34 35 // 回收指定索引之前条目占用的存储空间 36 CompactEntriesTo(clusterID uint64, nodeID uint64, index uint64) (<-chan struct{}, error) 37 38 39 // 保存所有快照元数据 40 SaveSnapshots([]pb.Update) error 41 42 43 // 删除指定的快照元数据 清理过时或无效的快照 44 DeleteSnapshot(clusterID uint64, nodeID uint64, index uint64) error 45 46 47 // 列出指定索引范围内的可用快照 48 ListSnapshots(clusterID uint64, nodeID uint64, index uint64) ([]pb.Snapshot, error) 49 50 51 // 删除节点的所有相关数据 52 RemoveNodeData(clusterID uint64, nodeID uint64) error 53 54 55 // 导入快照并创建所有必需的元数据 56 ImportSnapshot(snapshot pb.Snapshot, nodeID uint64) error 57} 58
3.1索引键
存储的底层本质是一个KVDB (pebble or rocksdb),由于业务的复杂性,要统一各类业务key的设计方法,而且要降低空间使用,所以有了如下的key设计方案。
龙舟中key分为3类:
其中,2字节的header用于区分各类不同业务的key空间。
1entryKeyHeader = [2]byte{0x1, 0x1} // 普通日志条目 2persistentStateKey = [2]byte{0x2, 0x2} // Raft状态 3maxIndexKey = [2]byte{0x3, 0x3} // 最大索引记录 4nodeInfoKey = [2]byte{0x4, 0x4} // 节点元数据 5bootstrapKey = [2]byte{0x5, 0x5} // 启动配置 6snapshotKey = [2]byte{0x6, 0x6} // 快照索引 7entryBatchKey = [2]byte{0x7, 0x7} // 批量日志 8
在key的生成中,采用了useAsXXXKey和SetXXXKey的方式,复用了data这个二进制变量,减少GC。
1type Key struct { 2 data []byte // 底层字节数组复用池 3 key []byte // 有效数据切片 4 pool *sync.Pool // 似乎并没有什么用 5} 6 7 8func (k *Key) useAsEntryKey() { 9 k.key = k.data 10} 11 12 13type IReusableKey interface { 14 SetEntryBatchKey(clusterID uint64, nodeID uint64, index uint64) 15 // SetEntryKey sets the key to be an entry key for the specified Raft node 16 // with the specified entry index. 17 SetEntryKey(clusterID uint64, nodeID uint64, index uint64) 18 // SetStateKey sets the key to be an persistent state key suitable 19 // for the specified Raft cluster node. 20 SetStateKey(clusterID uint64, nodeID uint64) 21 // SetMaxIndexKey sets the key to be the max possible index key for the 22 // specified Raft cluster node. 23 SetMaxIndexKey(clusterID uint64, nodeID uint64) 24 // Key returns the underlying byte slice of the key. 25 Key() []byte 26 // Release releases the key instance so it can be reused in the future. 27 Release() 28} 29 30 31func (k *Key) useAsEntryKey() { 32 k.key = k.data 33} 34 35 36// SetEntryKey sets the key value to the specified entry key. 37func (k *Key) SetEntryKey(clusterID uint64, nodeID uint64, index uint64) { 38 k.useAsEntryKey() 39 k.key[0] = entryKeyHeader[0] 40 k.key[1] = entryKeyHeader[1] 41 k.key[2] = 0 42 k.key[3] = 0 43 binary.BigEndian.PutUint64(k.key[4:], clusterID) 44 // the 8 bytes node ID is actually not required in the key. it is stored as 45 // an extra safenet - we don't know what we don't know, it is used as extra 46 // protection between different node instances when things get ugly. 47 // the wasted 8 bytes per entry is not a big deal - storing the index is 48 // wasteful as well. 49 binary.BigEndian.PutUint64(k.key[12:], nodeID) 50 binary.BigEndian.PutUint64(k.key[20:], index) 51} 52
3.2变量复用IContext
IContext的核心设计目的是实现并发安全的内存复用机制。在高并发场景下,频繁的内存分配和释放会造成较大的GC压力,通过IContext可以实现:
- 键对象复用:通过GetKey()获取可重用的IReusableKey
- 缓冲区复用:通过GetValueBuffer()获取可重用的字节缓冲区
- 批量操作对象复用:EntryBatch和WriteBatch的复用
1// IContext is the per thread context used in the logdb module. 2// IContext is expected to contain a list of reusable keys and byte 3// slices that are owned per thread so they can be safely reused by the 4// same thread when accessing ILogDB. 5type IContext interface { 6 // Destroy destroys the IContext instance. 7 Destroy() 8 // Reset resets the IContext instance, all previous returned keys and 9 // buffers will be put back to the IContext instance and be ready to 10 // be used for the next iteration. 11 Reset() 12 // GetKey returns a reusable key. 13 GetKey() IReusableKey // 这就是上文中的key接口 14 // GetValueBuffer returns a byte buffer with at least sz bytes in length. 15 GetValueBuffer(sz uint64) []byte 16 // GetWriteBatch returns a write batch or transaction instance. 17 GetWriteBatch() interface{} 18 // SetWriteBatch adds the write batch to the IContext instance. 19 SetWriteBatch(wb interface{}) 20 // GetEntryBatch returns an entry batch instance. 21 GetEntryBatch() pb.EntryBatch 22 // GetLastEntryBatch returns an entry batch instance. 23 GetLastEntryBatch() pb.EntryBatch 24} 25 26 27type context struct { 28 size uint64 29 maxSize uint64 30 eb pb.EntryBatch 31 lb pb.EntryBatch 32 key *Key 33 val []byte 34 wb kv.IWriteBatch 35} 36 37 38func (c *context) GetKey() IReusableKey { 39 return c.key 40} 41 42 43func (c *context) GetValueBuffer(sz uint64) []byte { 44 if sz <= c.size { 45 return c.val 46 } 47 val := make([]byte, sz) 48 if sz < c.maxSize { 49 c.size = sz 50 c.val = val 51 } 52 return val 53} 54 55 56func (c *context) GetEntryBatch() pb.EntryBatch { 57 return c.eb 58} 59 60 61func (c *context) GetLastEntryBatch() pb.EntryBatch { 62 return c.lb 63} 64 65 66func (c *context) GetWriteBatch() interface{} { 67 return c.wb 68} 69 70 71func (c *context) SetWriteBatch(wb interface{}) { 72 c.wb = wb.(kv.IWriteBatch) 73} 74
3.3存储引擎封装IKVStore
IKVStore 是 Dragonboat 日志存储系统的抽象接口,它定义了底层键值存储引擎需要实现的所有基本操作。这个接口让 Dragonboat 能够支持不同的存储后端(如 Pebble、RocksDB 等),实现了存储引擎的可插拔性。
1type IKVStore interface { 2 // Name is the IKVStore name. 3 Name() string 4 // Close closes the underlying Key-Value store. 5 Close() error 6 7 8 // 范围扫描 - 支持前缀遍历的迭代器 9 IterateValue(fk []byte, 10 lk []byte, inc bool, op func(key []byte, data []byte) (bool, error)) error 11 12 // 查询操作 - 基于回调的内存高效查询模式 13 GetValue(key []byte, op func([]byte) error) error 14 15 // 写入操作 - 单条记录的原子写入 16 SaveValue(key []byte, value []byte) error 17 18 19 // 删除操作 - 单条记录的精确删除 20 DeleteValue(key []byte) error 21 22 // 获取批量写入器 23 GetWriteBatch() IWriteBatch 24 25 // 原子提交批量操作 26 CommitWriteBatch(wb IWriteBatch) error 27 28 // 批量删除一个范围的键值对 29 BulkRemoveEntries(firstKey []byte, lastKey []byte) error 30 31 // 压缩指定范围的存储空间 32 CompactEntries(firstKey []byte, lastKey []byte) error 33 34 // 全量压缩整个数据库 35 FullCompaction() error 36} 37 38 39type IWriteBatch interface { 40 Destroy() // 清理资源,防止内存泄漏 41 Put(key, value []byte) // 添加写入操作 42 Delete(key []byte) // 添加删除操作 43 Clear() // 清空批处理中的所有操作 44 Count() int // 获取当前批处理中的操作数量 45} 46
openPebbleDB是Dragonboat 中 Pebble 存储引擎的初始化入口,负责根据配置创建一个完整可用的键值存储实例。
1// KV is a pebble based IKVStore type. 2type KV struct { 3 db *pebble.DB 4 dbSet chan struct{} 5 opts *pebble.Options 6 ro *pebble.IterOptions 7 wo *pebble.WriteOptions 8 event *eventListener 9 callback kv.LogDBCallback 10 config config.LogDBConfig 11} 12 13 14var _ kv.IKVStore = (*KV)(nil) 15 16 17// openPebbleDB 18// ============= 19// 将 Dragonboat 的 LogDBConfig → Pebble 引擎实例 20func openPebbleDB( 21 cfg config.LogDBConfig, 22 cb kv.LogDBCallback, // => busy通知:busy(true/false) 23 dir string, // 主数据目录 24 wal string, // WAL 独立目录(可空) 25 fs vfs.IFS, // 文件系统抽象(磁盘/memfs) 26) (kv.IKVStore, error) { 27 28 //-------------------------------------------------- 29 // 2️⃣ << 核心调优参数读入 30 //-------------------------------------------------- 31 blockSz := int(cfg.KVBlockSize) // 数据块(4K/8K…) 32 writeBufSz := int(cfg.KVWriteBufferSize) // 写缓冲 33 bufCnt := int(cfg.KVMaxWriteBufferNumber) // MemTable数量 34 l0Compact := int(cfg.KVLevel0FileNumCompactionTrigger) // L0 层文件数量触发压缩的阈值 35 l0StopWrites := int(cfg.KVLevel0StopWritesTrigger) 36 baseBytes := int64(cfg.KVMaxBytesForLevelBase) 37 fileBaseSz := int64(cfg.KVTargetFileSizeBase) 38 cacheSz := int64(cfg.KVLRUCacheSize) 39 levelMult := int64(cfg.KVTargetFileSizeMultiplier) // 每层文件大小倍数 40 numLevels := int64(cfg.KVNumOfLevels) 41 42 43 //-------------------------------------------------- 44 // 4️⃣ 构建 LSM-tree 层级选项 (每层无压缩) 45 //-------------------------------------------------- 46 levelOpts := []pebble.LevelOptions{} 47 sz := fileBaseSz 48 for lvl := 0; lvl < int(numLevels); lvl++ { 49 levelOpts = append(levelOpts, pebble.LevelOptions{ 50 Compression: pebble.NoCompression, // 写性能优先 51 BlockSize: blockSz, 52 TargetFileSize: sz, // L0 < L1 < … 呈指数增长 53 }) 54 sz *= levelMult 55 } 56 57 //-------------------------------------------------- 58 // 5️⃣ 初始化依赖:LRU Cache + 读写选项 59 //-------------------------------------------------- 60 cache := pebble.NewCache(cacheSz) // block缓存 61 ro := &pebble.IterOptions{} // 迭代器默认配置 62 wo := &pebble.WriteOptions{Sync: true} // ❗fsync强制刷盘 63 64 opts := &pebble.Options{ 65 Levels: levelOpts, 66 Cache: cache, 67 MemTableSize: writeBufSz, 68 MemTableStopWritesThreshold: bufCnt, 69 LBaseMaxBytes: baseBytes, 70 L0CompactionThreshold: l0Compact, 71 L0StopWritesThreshold: l0StopWrites, 72 Logger: PebbleLogger, 73 FS: vfs.NewPebbleFS(fs), 74 MaxManifestFileSize: 128 * 1024 * 1024, 75 // WAL 目录稍后条件注入 76 } 77 78 kv := &KV{ 79 dbSet: make(chan struct{}), // 关闭->初始化完成信号 80 callback: cb, // 上层 raft engine 回调 81 config: cfg, 82 opts: opts, 83 ro: ro, 84 wo: wo, 85 } 86 87 event := &eventListener{ 88 kv: kv, 89 stopper: syncutil.NewStopper(), 90 } 91 92 // => 关键事件触发 93 opts.EventListener = pebble.EventListener{ 94 WALCreated: event.onWALCreated, 95 FlushEnd: event.onFlushEnd, 96 CompactionEnd: event.onCompactionEnd, 97 } 98 99 //-------------------------------------------------- 100 // 7️⃣ 目录准备 101 //-------------------------------------------------- 102 if wal != "" { 103 fs.MkdirAll(wal) // 📁 为 WAL 单独磁盘预留 104 opts.WALDir = wal 105 } 106 fs.MkdirAll(dir) // 📁 主数据目录 107 108 //-------------------------------------------------- 109 // 8️⃣ 真正的数据库实例化 110 //-------------------------------------------------- 111 pdb, err := pebble.Open(dir, opts) 112 if err != nil { return nil, err } 113 114 //-------------------------------------------------- 115 // 9️⃣ 🧹 资源整理 & 启动事件 116 //-------------------------------------------------- 117 cache.Unref() // 去除多余引用,防止泄露 118 kv.db = pdb 119 120 // 🔔 手动触发一次 WALCreated 确保反压逻辑进入首次轮询 121 kv.setEventListener(event) // 内部 close(kv.dbSet) 122 123 return kv, nil 124} 125
其中eventListener是对pebble 内存繁忙的回调,繁忙判断的条件有两个:
- 内存表大小超过阈值(95%)
- L0 层文件数量超过阈值(L0写入最大文件数量-1)
1 2 3func (l *eventListener) notify() { 4 l.stopper.RunWorker(func() { 5 select { 6 case <-l.kv.dbSet: 7 if l.kv.callback != nil { 8 memSizeThreshold := l.kv.config.KVWriteBufferSize * 9 l.kv.config.KVMaxWriteBufferNumber * 19 / 20 10 l0FileNumThreshold := l.kv.config.KVLevel0StopWritesTrigger - 1 11 m := l.kv.db.Metrics() 12 busy := m.MemTable.Size >= memSizeThreshold || 13 uint64(m.Levels[0].NumFiles) >= l0FileNumThreshold 14 l.kv.callback(busy) 15 } 16 default: 17 } 18 }) 19} 20
3.4日志条目存储DB
db结构体是Dragonboat日志数据库的核心管理器,提供Raft日志、快照、状态等数据的持久化存储接口。是桥接了业务和pebble存储的中间层。
1// db is the struct used to manage log DB. 2type db struct { 3 cs *cache // 节点信息、Raft状态信息缓存 4 keys *keyPool // Raft日志索引键变量池 5 kvs kv.IKVStore // pebble的封装 6 entries entryManager // 日志条目读写封装 7} 8 9 10// 这里面的信息不会过期,叫寄存更合适 11type cache struct { 12 nodeInfo map[raftio.NodeInfo]struct{} 13 ps map[raftio.NodeInfo]pb.State 14 lastEntryBatch map[raftio.NodeInfo]pb.EntryBatch 15 maxIndex map[raftio.NodeInfo]uint64 16 mu sync.Mutex 17} 18
- 获取一个批量写容器
实现:
1func (r *db) getWriteBatch(ctx IContext) kv.IWriteBatch { 2 if ctx != nil { 3 wb := ctx.GetWriteBatch() 4 if wb == nil { 5 wb = r.kvs.GetWriteBatch() 6 ctx.SetWriteBatch(wb) 7 } 8 return wb.(kv.IWriteBatch) 9 } 10 return r.kvs.GetWriteBatch() 11} 12
降低GC压力
- 获取所有节点信息
实现:
1func (r *db) listNodeInfo() ([]raftio.NodeInfo, error) { 2 fk := newKey(bootstrapKeySize, nil) 3 lk := newKey(bootstrapKeySize, nil) 4 fk.setBootstrapKey(0, 0) 5 lk.setBootstrapKey(math.MaxUint64, math.MaxUint64) 6 ni := make([]raftio.NodeInfo, 0) 7 op := func(key []byte, data []byte) (bool, error) { 8 cid, nid := parseNodeInfoKey(key) 9 ni = append(ni, raftio.GetNodeInfo(cid, nid)) 10 return true, nil 11 } 12 if err := r.kvs.IterateValue(fk.Key(), lk.Key(), true, op); err != nil { 13 return []raftio.NodeInfo{}, err 14 } 15 return ni, nil 16} 17
- 保存集群状态
实现:
1type Update struct { 2 ClusterID uint64 // 集群ID,标识节点所属的Raft集群 3 NodeID uint64 // 节点ID,标识集群中的具体节点 4 5 6 State // 包含当前任期(Term)、投票节点(Vote)、提交索引(Commit)三个关键持久化状态 7 8 9 EntriesToSave []Entry // 需要持久化到稳定存储的日志条目 10 CommittedEntries []Entry // 已提交位apply的日志条目 11 MoreCommittedEntries bool // 指示是否还有更多已提交条目等待处理 12 13 14 Snapshot Snapshot // 快照元数据,当需要应用快照时设置 15 16 17 ReadyToReads []ReadyToRead // ReadIndex机制实现的线性一致读 18 19 20 Messages []Message // 需要发送给其他节点的Raft消息 21 22 23 UpdateCommit struct { 24 Processed uint64 // 已推送给RSM处理的最后索引 25 LastApplied uint64 // RSM确认已执行的最后索引 26 StableLogTo uint64 // 已稳定存储的日志到哪个索引 27 StableLogTerm uint64 // 已稳定存储的日志任期 28 StableSnapshotTo uint64 // 已稳定存储的快照到哪个索引 29 ReadyToRead uint64 // 已准备好读的ReadIndex请求索引 30 } 31} 32 33 34func (r *db) saveRaftState(updates []pb.Update, ctx IContext) error { 35 // 步骤1:获取写入批次对象,用于批量操作提高性能 36 // 优先从上下文中获取已存在的批次,避免重复创建 37 wb := r.getWriteBatch(ctx) 38 39 // 步骤2:遍历所有更新,处理每个节点的状态和快照 40 for _, ud := range updates { 41 // 保存 Raft 的硬状态(Term、Vote、Commit) 42 // 使用缓存机制避免重复保存相同状态 43 r.saveState(ud.ClusterID, ud.NodeID, ud.State, wb, ctx) 44 45 // 检查是否有快照需要保存 46 if !pb.IsEmptySnapshot(ud.Snapshot) { 47 // 快照索引一致性检查:确保快照索引不超过最后一个日志条目的索引 48 // 这是 Raft 协议的重要约束,防止状态不一致 49 if len(ud.EntriesToSave) > 0 { 50 lastIndex := ud.EntriesToSave[len(ud.EntriesToSave)-1].Index 51 if ud.Snapshot.Index > lastIndex { 52 plog.Panicf("max index not handled, %d, %d", 53 ud.Snapshot.Index, lastIndex) 54 } 55 } 56 57 // 保存快照元数据到数据库 58 r.saveSnapshot(wb, ud) 59 60 // 更新节点的最大日志索引为快照索引 61 r.setMaxIndex(wb, ud, ud.Snapshot.Index, ctx) 62 } 63 } 64 65 // 步骤3:批量保存所有日志条目 66 // 这里会调用 entryManager 接口的 record 方法,根据配置选择批量或单独存储策略 67 r.saveEntries(updates, wb, ctx) 68 69 // 步骤4:提交写入批次到磁盘 70 // 只有在批次中有实际操作时才提交,避免不必要的磁盘 I/O 71 if wb.Count() > 0 { 72 return r.kvs.CommitWriteBatch(wb) 73 } 74 return nil 75 } 76 77
- 保存引导信息
实现:
1func (r *db) saveBootstrapInfo(clusterID uint64, 2 nodeID uint64, bs pb.Bootstrap) error { 3 wb := r.getWriteBatch(nil) 4 r.saveBootstrap(wb, clusterID, nodeID, bs) 5 return r.kvs.CommitWriteBatch(wb) // 提交至Pebble 6} 7 8 9func (r *db) saveBootstrap(wb kv.IWriteBatch, 10 clusterID uint64, nodeID uint64, bs pb.Bootstrap) { 11 k := newKey(maxKeySize, nil) 12 k.setBootstrapKey(clusterID, nodeID) // 序列化集群节点信息 13 data, err := bs.Marshal() 14 if err != nil { 15 panic(err) 16 } 17 wb.Put(k.Key(), data) 18} 19
- 获取Raft状态
实现:
1func (r *db) getState(clusterID uint64, nodeID uint64) (pb.State, error) { 2 k := r.keys.get() 3 defer k.Release() 4 k.SetStateKey(clusterID, nodeID) 5 hs := pb.State{} 6 if err := r.kvs.GetValue(k.Key(), func(data []byte) error { 7 if len(data) == 0 { 8 return raftio.ErrNoSavedLog 9 } 10 if err := hs.Unmarshal(data); err != nil { 11 panic(err) 12 } 13 return nil 14 }); err != nil { 15 return pb.State{}, err 16 } 17 return hs, nil 18} 19
3.5对外存储API实现
龙舟对ILogDB提供了实现:ShardedDB,一个管理了多个pebble bucket的存储单元。
1var _ raftio.ILogDB = (*ShardedDB)(nil) 2// ShardedDB is a LogDB implementation using sharded pebble instances. 3type ShardedDB struct { 4 completedCompactions uint64 // 原子计数器:已完成压缩操作数 5 config config.LogDBConfig // 日志存储配置 6 ctxs []IContext // 分片上下文池,减少GC压力 7 shards []*db // 核心:Pebble实例数组 8 partitioner server.IPartitioner // 智能分片策略器 9 compactionCh chan struct{} // 压缩任务信号通道 10 compactions *compactions // 压缩任务管理器 11 stopper *syncutil.Stopper // 优雅关闭管理器 12} 13
- 初始化过程
实现:
1// 入口函数:创建并初始化分片日志数据库 2OpenShardedDB(config, cb, dirs, lldirs, batched, check, fs, kvf): 3 4 5 // ===阶段1:安全验证=== 6 if 配置为空 then panic 7 if check和batched同时为true then panic 8 9 10 // ===阶段2:预分配资源管理器=== 11 shards := 空数组 12 closeAll := func(all []*db) { //出错清理工具 13 for s in all { 14 s.close() 15 } 16 } 17 18 19 // ===阶段3:逐个创建分片=== 20 loop i := 0 → 分片总数: 21 datadir := pathJoin(dirs[i], "logdb-"+i) //数据目录 22 snapdir := "" //快照目录(可选) 23 if lldirs非空 { 24 snapdir = pathJoin(lldirs[i], "logdb-"+i) 25 } 26 27 28 shardCb := {shard:i, callback:cb} //监控回调 29 db, err := openRDB(...) //创建实际数据库实例 30 if err != nil { //创建失败 31 closeAll(shards) //清理已创建的 32 return nil, err 33 } 34 shards = append(shards, db) 35 36 37 // ===阶段5:核心组件初始化=== 38 partitioner := 新建分区器(execShards数量, logdbShards数量) 39 instance := &ShardedDB{ 40 shards: shards, 41 partitioner: partitioner, 42 compactions: 新建压缩管理器(), 43 compactionCh: 通道缓冲1, 44 ctxs: make([]IContext, 执行分片数), 45 stopper: 新建停止器() 46 } 47 48 49 // ===阶段6:预分配上下文&启动后台=== 50 for j := 0 → 执行分片数: 51 instance.ctxs[j] = 新建Context(saveBufferSize) 52 53 54 instance.stopper.RunWorker(func() { //后台压缩协程 55 instance.compactionWorkerMain() 56 }) 57 58 59 return instance, nil //构造完成 60 61
- 保存集群状态
实现:
1func (s *ShardedDB) SaveRaftState(updates []pb.Update, shardID uint64) error { 2 if shardID-1 >= uint64(len(s.ctxs)) { 3 plog.Panicf("invalid shardID %d, len(s.ctxs): %d", shardID, len(s.ctxs)) 4 } 5 ctx := s.ctxs[shardID-1] 6 ctx.Reset() 7 return s.SaveRaftStateCtx(updates, ctx) 8} 9 10 11func (s *ShardedDB) SaveRaftStateCtx(updates []pb.Update, ctx IContext) error { 12 if len(updates) == 0 { 13 return nil 14 } 15 pid := s.getParititionID(updates) 16 return s.shards[pid].saveRaftState(updates, ctx) 17} 18
以sylas为例子,我们每个分片都是单一cluster,所以logdb只使用了一个分片,龙舟设计初衷是为了解放多cluster的吞吐,我们暂时用不上,tindb可以考虑:
四、总结
LogDB是Dragonboat重要的存储层实现,作者将Pebble引擎包装为一组通用简洁的API,极大方便了上层应用与存储引擎的交互成本。
其中包含了很多Go语言的技巧,例如大量的内存变量复用设计,展示了这个库对高性能的极致追求,是一个十分值得学习的优秀工程案例。
往期回顾
1. 从数字到版面:得物数据产品里数字格式化的那些事
2. 一文解析得物自建 Redis 最新技术演进
3. Golang HTTP请求超时与重试:构建高可靠网络请求|得物技术
4. RN与hawk碰撞的火花之C++异常捕获|得物技术
5. 得物TiDB升级实践
文 /酒米
关注得物技术,每周更新技术干货
要是觉得文章对你有帮助的话,欢迎评论转发点赞~
未经得物技术许可严禁转载,否则依法追究法律责任。
《# 一、项目概览 Dragonboat 是纯 Go 实现的(multi-group)Raft 库。 为应用屏蔽 Raft 复杂性,提供易于使用的 NodeH》 是转载文章,点击查看原文。
