Go 实战项目 rosedb 源码剖析 7—启动流程

理解了前面的 rosedb 数据写入、读取、merge 流程之后,再来看看数据库启动的流程。

Bitcask 模型需要在启动的时候去执行全量数据扫描,并且加载索引,在 rosedb 当中,主要涉及到加载 merge 文件,并且如果有 HINT 文件的话,可以直接从 HINT 文件中加载索引,否则从数据文件中加载索引。

rosedb 在启动的时候通过 Open 方法,启动的时候可以传递参数配置,目前有如下配置:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Options specifies the options for opening a database.  
type Options struct {  
    // DirPath specifies the directory path where the WAL segment files will be stored.  
    DirPath string  

    // SegmentSize specifies the maximum size of each segment file in bytes.  
    SegmentSize int64  

    // Sync is whether to synchronize writes through os buffer cache and down onto the actual disk.  
    // Setting sync is required for durability of a single write operation, but also results in slower writes.  
    //  
    // If false, and the machine crashes, then some recent writes may be lost.  
    // Note that if it is just the process that crashes (machine does not) then no writes will be lost.  
    //  
    // In other words, Sync being false has the same semantics as a write  
    // system call. Sync being true means write followed by fsync.  
    Sync bool  

    // BytesPerSync specifies the number of bytes to write before calling fsync.  
    BytesPerSync uint32  

    // WatchQueueSize the cache length of the watch queue.  
    // if the size greater than 0, which means enable the watch.  
    WatchQueueSize uint64  

    // AutoMergeEnable enable the auto merge.  
    // auto merge will be triggered when cron expr is satisfied.  
    // cron expression follows the standard cron expression.  
    // e.g. "0 0 * * *" means merge at 00:00:00 every day.  
    // it also supports seconds optionally.  
    // when enable the second field, the cron expression will be like this: "0/10 * * * * *" (every 10 seconds).  
    // when auto merge is enabled, the db will be closed and reopened after merge done.  
    // do not set this shecule too frequently, it will affect the performance.  
    // refer to https://en.wikipedia.org/wiki/Cron  
    AutoMergeCronExpr string  
}
  • DirPath:表示文件的存放目录,不存在的话会自动创建
  • SegmentSize:表示一个 wal 数据文件的最大容量
  • Sync:是否每次写入都将操作系统缓存的数据刷到磁盘中
  • BytesPerSync:累计写入多少字节后调用 Sync
  • WatchQueueSize:这个参数主要是支持 watch 功能,即 key 对应的值有变化之后通知调用方
  • AutoMergeCronExpr:此参数启用自动 merge 功能,填写一个 cron 表达式,后台线程自动进行 merge

在 Open 方法的具体代码中,首先会检查参数配置是否有效,如果传递的目录不存在的话,则会创建这个目录。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    // check options  
    if err := checkOptions(options); err != nil {  
        return nil, err  
    }  

    // create data directory if not exist  
    if _, err := os.Stat(options.DirPath); err != nil {  
        if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {  
            return nil, err  
        }  
    }

然后会使用文件锁机制,确保一个目录中只能有一个进程,即一个目录之上只能运行一个 rosedb 实例,确保数据安全。

1
2
3
4
5
6
7
8
9
    // create file lock, prevent multiple processes from using the same database directory  
    fileLock := flock.New(filepath.Join(options.DirPath, fileLockName))  
    hold, err := fileLock.TryLock()  
    if err != nil {  
        return nil, err  
    }  
    if !hold {  
        return nil, ErrDatabaseIsUsing  
    }

然后是加载 merge 文件。

1
2
3
4
    // load merge files if exists  
    if err = loadMergeFiles(options.DirPath); err != nil {  
        return nil, err  
    }

在前面讲解 merge 的流程当中,提到过 merge 完成之后,如果没有立即清理文件的话,merge 之后的文件和旧的数据文件都存在,这时候需要进行替换,将旧的数据文件删除,将 merge 的文件作为新的数据文件。

在 loadMergeFiles 方法中,会加载上一次 merge 终止的 SegmentID。因为我们为了检查 merge 操作是否已经完成,会在 merge 结束之后写一个文件,这个文件是以 .MERGEFIN 后缀结尾的。

img

这个文件中记录了上一次 merge 操作完成的 SegmentID,例如上一次 merge 操作的文件是 0-5,那么这个文件中记录的就是 5。

1
2
3
4
5
    // get the merge finished segment id  
    mergeFinSegmentId, err := getMergeFinSegmentId(mergeDirPath)  
    if err != nil {  
        return err  
    }

如果这个文件并不存在,或者读取的时候失败了,那么说明 merge 操作并没有成功完成,只需要删除掉 merge 目录即可。

然后读取到了这个 mergeFinSegmentId,只需要将这些文件移动到原数据目录当中,这样原来的文件就被删除,merge 目录中的文件就成了新的数据文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    // now we get the merge finished segment id, so all the segment id less than the merge finished segment id  
    // should be moved to the original data directory, and the original data files should be deleted.  
    for fileId := uint32(1); fileId <= mergeFinSegmentId; fileId++ {  
        destFile := wal.SegmentFileName(dirPath, dataFileNameSuffix, fileId)  

        // remove the original data file  
        if _, err = os.Stat(destFile); err == nil {  
            if err = os.Remove(destFile); err != nil {  
                return err  
            }  
        }  
        // move the merge data file to the original data directory  
        copyFile(dataFileNameSuffix, fileId, false)  
    }

Merge 文件加载完成之后,数据文件的状态已经全部准备好了,这时候通过 wal 打开这些文件。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (db *DB) openWalFiles() (*wal.WAL, error) {  
    // open data files from WAL  
    walFiles, err := wal.Open(wal.Options{  
        DirPath:        db.options.DirPath,  
        SegmentSize:    db.options.SegmentSize,  
        SegmentFileExt: dataFileNameSuffix,  
        Sync:           db.options.Sync,  
        BytesPerSync:   db.options.BytesPerSync,  
    })  
    if err != nil {  
        return nil, err  
    }  
    return walFiles, nil  
}

打开数据文件之后,就能够加载索引了,主要的逻辑在 loadIndex 方法中,加载索引主要是分为了两部分,第一是如果存在 HINT 文件的话,则直接从 HINT 文件中加载,然后再从数据文件中加载。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func (db *DB) loadIndex() error {  
    // load index frm hint file  
    if err := db.loadIndexFromHintFile(); err != nil {  
        return err  
    }  
    // load index from data files  
    if err := db.loadIndexFromWAL(); err != nil {  
        return err  
    }  
    return nil  
}

HINT 文件是在 merge 的过程中生成的,存储的是 key+position 信息,position 就是 value 在 wal 文件中的位置。需要注意 HINT 文件中的位置索引信息肯定是最新的,因为 merge 的时候是直接写入的最新的 value,所以得到的索引信息肯定也是最新的状态,这时候直接插入到索引当中即可。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    for {  
        chunk, _, err := reader.Next()  
        if err != nil {  
            if err == io.EOF {  
                break  
            }  
            return err  
        }  

        key, position := decodeHintRecord(chunk)  
        // All the hint records are valid because it is generated by the merge operation.  
        // So just put them into the index without checking.  
        db.index.Put(key, position)  
    }

HINT 文件其实也是通过 wal 组件打开的,所以可以直接调用 wal 的遍历方法,加载其中的数据,这时候加载的并不是之前数据文件中提到的 LogRecord,而是只包含 key 和索引信息的 hint 文件的 record 信息,这里并没有使用一个专门的结构体来表示,而是直接对这两个信息进行编码,读出来之后按照同样的方式进行解码。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func encodeHintRecord(key []byte, pos *wal.ChunkPosition) []byte {  
    // SegmentId BlockNumber ChunkOffset ChunkSize  
    //    5          5           10          5      =    25  
    // see binary.MaxVarintLen64 and binary.MaxVarintLen32  
    buf := make([]byte, 25)  
    var idx = 0  

    // SegmentId  
    idx += binary.PutUvarint(buf[idx:], uint64(pos.SegmentId))  
    // BlockNumber  
    idx += binary.PutUvarint(buf[idx:], uint64(pos.BlockNumber))  
    // ChunkOffset  
    idx += binary.PutUvarint(buf[idx:], uint64(pos.ChunkOffset))  
    // ChunkSize  
    idx += binary.PutUvarint(buf[idx:], uint64(pos.ChunkSize))  

    // key  
    result := make([]byte, idx+len(key))  
    copy(result, buf[:idx])  
    copy(result[idx:], key)  
    return result  
}

从 HINT 文件中加载索引之后,还需要从数据文件中加载索引。

试想这样一种情况,文件 0-5 是参与了 merge 过程的文件,但是在 merge 的过程当中,有可能有新的数据写入,所以可能生成了数据文件 6,甚至 7,这时候这些新的没有参与 merge 的文件是没有对应的 HINT 索引文件信息的,只能从数据文件本身去加载索引,这部分的逻辑在 loadIndexFromWAL 中。

在遍历数据文件加载索引的过程中,需要判断如果已经是从 HINT 文件中加载过索引了,则直接跳过。

1
2
3
4
5
6
7
// if the current segment id is less than the mergeFinSegmentId,  
// we can skip this segment because it has been merged,  
// and we can load index from the hint file directly.  
if reader.CurrentSegmentId() <= mergeFinSegmentId {  
    reader.SkipCurrentSegment()  
    continue  
}

然后是从数据文件中加载 LogRecord 信息,需要判断如果类型是 LogRecordDeleted,则表示从索引中删除对应的数据。

并且如果一个 Batch 中的数据并没有对应的特殊的标识 Batch 结束的数据的话,则说明这个 Batch 是无效的,直接丢弃这些数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// if we get the end of a batch,  
// all records in this batch are ready to be indexed.  
if record.Type == LogRecordBatchFinished {  
    batchId, err := snowflake.ParseBytes(record.Key)  
    if err != nil {  
        return err  
    }  
    for _, idxRecord := range indexRecords[uint64(batchId)] {  
        if idxRecord.recordType == LogRecordNormal {  
            db.index.Put(idxRecord.key, idxRecord.position)  
        }  
        if idxRecord.recordType == LogRecordDeleted {  
            db.index.Delete(idxRecord.key)  
        }  
    }  
    // delete indexRecords according to batchId after indexing  
    delete(indexRecords, uint64(batchId))  
} elseif record.Type == LogRecordNormal && record.BatchId == mergeFinishedBatchID {  
    // if the record is a normal record and the batch id is 0,  
    // it means that the record is involved in the merge operation.  
    // so put the record into index directly.  
    db.index.Put(record.Key, position)  
} else {  
    // expired records should not be indexed  
    if record.IsExpired(now) {  
        db.index.Delete(record.Key)  
        continue  
    }  
    // put the record into the temporary indexRecords  
    indexRecords[record.BatchId] = append(indexRecords[record.BatchId],  
        &IndexRecord{  
            key:        record.Key,  
            recordType: record.Type,  
            position:   position,  
        })  
}

索引加载完成之后,DB 启动的流程就基本完成了。

随后有一些额外的处理,第一个是如果开启了 watch 功能的话,会启动一个后台 goroutine 去处理 key 变化通知的事件信息。

1
2
3
4
5
6
7
    // enable watch  
    if options.WatchQueueSize > 0 {  
        db.watchCh = make(chan *Event, 100)  
        db.watcher = NewWatcher(options.WatchQueueSize)  
        // run a goroutine to synchronize event information  
        go db.watcher.sendEvent(db.watchCh)  
    }

第二个是如果传递的参数 AutoMergeCronExpr 是有效的话,则也启动一个后台 goroutine 去定时周期性的去调用 merge 进行数据文件的清理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
    // enable auto merge task  
    iflen(options.AutoMergeCronExpr) > 0 {  
        db.cronScheduler = cron.New(  
            cron.WithParser(  
                cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour |  
                    cron.Dom | cron.Month | cron.Dow | cron.Descriptor),  
            ),  
        )  
        _, err = db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() {  
            // maybe we should deal with different errors with different logic, but a background task can't omit its error.  
            // after auto merge, we should close and reopen the db.  
            _ = db.Merge(true)  
        })  
        if err != nil {  
            return nil, err  
        }  
        db.cronScheduler.Start()  
    }

广告区:

《从零实现 KV 存储》,使用 Rust 和 Go 两种语言实现,手把手教学,只需要基础的语法知识,即可学会一个硬核实战项目!

《从零实现 SQL 数据库》,使用 Rust 手写一个数据库系统,超级硬核,Rust 实战项目首选!

课程详情:https://roseduan.cn/course/

使用 Hugo 构建
主题 StackJimmy 设计