1、迭代器
由于全内存索引的缘故,rosedb 的迭代器实现起来相对简单,因为在内存中已经维护了全量的 key 和索引信息,并且前面提到过 rosedb 中使用了一个 btree 数据结构,btree 本身是有序的,所以在迭代数据的时候只需要根据 key 的信息取出索引,然后从数据文件中读到对应的 Value 即可。
在迭代数据的时候,rosedb 提供了两种方式,分别是调用 Ascend、Descend 这种方法,可以直接传入一个自定义的方法处理 Key 和 Value,定义大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| // Ascend calls handleFn for each key/value pair in the db in ascending order.
func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.Ascend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
|
还有一种方式是使用更常见的 Iterator 迭代器,这种迭代器可以更加灵活的控制,比如可以进行 Seek、可以手动调用 Next 跳转到下一个 Value,大致使用方法如下:
1
2
3
4
5
6
7
8
9
| iter := db.NewIterator(iterOpts)
defer iter.Close()
for iter.Rewind(); iter.Valid(); iter.Next() {
item := iter.Item()
if item != nil {
fmt.Printf("key = %s, value = %s\n", string(item.Key), string(item.Value))
}
}
|
在创建 Iterator 的时候可以传入一个参数配置,目前有如下三个配置项:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // IteratorOptions defines configuration options for creating a new iterator.
type IteratorOptions struct {
// Prefix specifies a key prefix for filtering. If set, the iterator will only
// traverse keys that start with this prefix. Default is empty (no filtering).
Prefix []byte
// Reverse determines the traversal order. If true, the iterator will traverse
// in descending order. Default is false (ascending order).
Reverse bool
// ContinueOnError determines how the iterator handles errors during iteration.
// If true, the iterator will log errors and continue to the next entry.
// If false, the iterator will stop and become invalid when an error occurs.
ContinueOnError bool
}
|
- Prefix:指定扫描的 key 前缀,只有前缀匹配才会输出对应的数据
- Reverse:控制是否进行迭代反转,false 为正向迭代,true 为反向迭代
- ContinueOnError:在遇到某个 value 读取错误的时候,是否继续读取下一条数据
索引迭代器
面向外层调用者的迭代器接口,在 rosedb 内部,实际上是基于索引迭代器进行实现的,所以先来介绍下索引迭代器。
索引迭代器的方法名和外层的方法基本类似,大致的区别是索引返回的 Value 是 ChunkPosition,而外层 DB 的迭代器,需要根据 ChunkPosition 从 wal 当中读取 Value。
索引层面的迭代器也是分为了两种,一种是 Ascend 和 Descend 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // Ascend iterates over items in ascending order and invokes the handler function for each item.
// If the handler function returns false, iteration stops.
Ascend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))
// AscendRange iterates in ascending order within [startKey, endKey], invoking handleFn.
// Stops if handleFn returns false.
AscendRange(startKey, endKey []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))
// AscendGreaterOrEqual iterates in ascending order, starting from key >= given key,
// invoking handleFn. Stops if handleFn returns false.
AscendGreaterOrEqual(key []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))
// Descend iterates over items in descending order and invokes the handler function for each item.
// If the handler function returns false, iteration stops.
Descend(handleFn func(key []byte, pos *wal.ChunkPosition) (bool, error))
// DescendRange iterates in descending order within [startKey, endKey], invoking handleFn.
// Stops if handleFn returns false.
DescendRange(startKey, endKey []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))
// DescendLessOrEqual iterates in descending order, starting from key <= given key,
// invoking handleFn. Stops if handleFn returns false.
DescendLessOrEqual(key []byte, handleFn func(key []byte, position *wal.ChunkPosition) (bool, error))
|
这种方法,直接调用了 BTree 提供的方法进行遍历,以 Ascend 为例:
1
2
3
4
5
6
7
8
9
10
11
12
| func (mt *MemoryBTree) Ascend(handleFn func(key []byte, position *wal.ChunkPosition) (bool, error)) {
mt.lock.RLock()
defer mt.lock.RUnlock()
mt.tree.Ascend(func(i btree.Item) bool {
cont, err := handleFn(i.(*item).key, i.(*item).pos)
if err != nil {
return false
}
return cont
})
}
|
另一种是 Iterator 迭代器接口,定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| // IndexIterator represents a generic index iterator interface.
type IndexIterator interface {
// Rewind resets the iterator to its initial position.
Rewind()
// Seek positions the cursor to the element with the specified key.
Seek(key []byte)
// Next moves the cursor to the next element.
Next()
// Valid checks if the iterator is still valid for reading.
Valid() bool
// Key returns the key of the current element.
Key() []byte
// Value returns the value (chunk position) of the current element.
Value() *wal.ChunkPosition
// Close releases the resources associated with the iterator.
Close()
}
|
- Rewind:迭代器指针重新回到初始位置
- Seek:跳转到第一个大于等于(或小于等于)给定值的位置
- Next:跳转到下一个值
- Valid:是否有效,用于表示遍历是否已经结束
- Key:用户实际的 key
- Value:内存中的 Value,返回值是
ChunkPosition - Close:关闭迭代器,清理资源
这种 Iterator 迭代器实际上底层的 BTree 并没有直接提供类似的接口,我们进行了一层封装,利用 BTree 已有的方法去实现了迭代器中的接口,例如 Rewind 方法让迭代器指针重新回到初始位置,调用了 Btree 提供的 Min 或 Max 方法:
1
2
3
4
5
6
7
8
9
10
11
12
| func (it *memoryBTreeIterator) Rewind() {
if it.tree == nil || it.tree.Len() == 0 {
return
}
if it.reverse {
it.current = it.tree.Max().(*item)
} else {
it.current = it.tree.Min().(*item)
}
it.valid = true
}
|
例如 Next 方法,直接找到比当前值更大(或更小)的值即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| it.valid = false
if it.reverse {
it.tree.DescendLessOrEqual(it.current, func(i btree.Item) bool {
if !i.(*item).Less(it.current) {
return true
}
it.current = i.(*item)
it.valid = true
return false
})
} else {
it.tree.AscendGreaterOrEqual(it.current, func(i btree.Item) bool {
if !it.current.Less(i.(*item)) {
return true
}
it.current = i.(*item)
it.valid = true
return false
})
}
|
DB 迭代器接口
在索引迭代器的基础之上,DB 迭代器的接口就是面向实际用户的了,实现上也比较简单,直接对索引迭代器进行封装。
1
2
3
4
5
6
7
8
9
10
| // Iterator represents a database-level iterator that
// provides methods to traverse over the key/value pairs in the database.
// It wraps the index iterator and adds functionality to
// retrieve the actual values from the database.
type Iterator struct {
indexIter index.IndexIterator // index iterator for traversing keys
db *DB // database instance for retrieving values
options IteratorOptions // user-defined configuration options
lastError error // stores the last error encountered during iteration
}
|
提供的方法是类似的,唯一的区别是索引迭代器的返回值是内存索引 ChunkPosition,这里需要根据 ChunkPosition 从 wal 文件中读取实际的 Value。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| position := it.indexIter.Value()
if position == nil {
it.indexIter.Next()
continue
}
// read the record from data file
chunk, err := it.db.dataFiles.Read(position)
if err != nil {
it.lastError = err
if !it.options.ContinueOnError {
it.Close()
return nil
}
log.Printf("Error reading data file at key %q: %v", key, err)
it.indexIter.Next()
continue
}
|
2、过期时间
rosedb 支持过期时间,也就是说一个 key 可以存活一段时间,然后自动失效。
在实现上,主要是对 LogRecord 加上一个额外的字段 expire,表示其过期时间,然后在写入的时候将这个信息存储起来,并且在读取的时候,需要结合这个字段进行判断,如果 expire 时间已经超过了当前时间,那么就认为 key 是无效的。
1
2
3
4
5
6
7
8
9
10
| // LogRecord is the log record of the key/value pair.
// It contains the key, the value, the record type and the batch id
// It will be encoded to byte slice and written to the wal.
type LogRecord struct {
Key []byte
Value []byte
Type LogRecordType
BatchId uint64
Expire int64
}
|
PutWithTTL
PutWithTTL 和 Put 方法类似,但是可以额外指定一个 ttl,表示 key 的存活时间。
在填充 LogRecord 的时候,只需要结合 ttl 和当前时间就能够计算得到 expire 信息。
1
2
3
4
5
6
7
8
9
| // PutWithTTL adds a key-value pair with ttl to the batch for writing.
func (b *Batch) PutWithTTL(key []byte, value []byte, ttl time.Duration) error {
// 其他代码省略.....
record.Key, record.Value = key, value
record.Type, record.Expire = LogRecordNormal, time.Now().Add(ttl).UnixNano()
b.mu.Unlock()
return nil
}
|
在读取的时候,从 wal 中拿到的是 chunk 信息,也就是一个字节数组,然后我们需要解码得到 LogRecord 的信息。
这时候需要判断 expire 是否已经超过了当前时间,如果 key 已经过期了,那么直接将其从索引中删除,并且返回 Key 不存在的错误。
1
2
3
4
5
6
7
8
9
| // check if the record is deleted or expired
record = decodeLogRecord(chunk)
if record.Type == LogRecordDeleted {
panic("Deleted data cannot exist in the index")
}
if record.IsExpired(now) {
b.db.index.Delete(record.Key)
return nil, ErrKeyNotFound
}
|
Expire
Expire 方法主要是对一个已经存在的 key 设置过期时间。
如果针对的是已经存在于 pendingWrites 临时数组中的数据,那么可以直接更新其过期时间,并且在 Commit 阶段再写入到 wal 文件中。
1
2
3
4
5
6
7
8
9
| // if the key exists in pendingWrites, update the expiry time directly
if record != nil {
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(time.Now().UnixNano()) {
return ErrKeyNotFound
}
record.Expire = time.Now().Add(ttl).UnixNano()
return nil
}
|
否则的话,则需要从 wal 文件中读取数据,如果 key 不存在的话直接返回错误,否则更新数据的过期时间。
然后写入到 pendingWrites 中,待 Commit 时进行写入。
1
2
3
4
5
6
7
8
9
10
11
12
| now := time.Now()
record = decodeLogRecord(chunk)
// if the record is deleted or expired, we can assume that the key does not exist,
// and delete the key from the index
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return ErrKeyNotFound
}
// now we get the value from wal, update the expiry time
// and rewrite the record to pendingWrites
record.Expire = now.Add(ttl).UnixNano()
b.appendPendingWrites(key, record)
|
TTL
TTL 方法返回一个 key 的存活时间。
具体逻辑也比较简单,首先还是从 pendingWrites 中获取,如果能拿到数据的话,则直接根据 expire 信息得到 tll 值。
1
2
3
4
5
6
7
8
9
10
11
12
| var record = b.lookupPendingWrites(key)
if record != nil {
if record.Expire == 0 {
return -1, nil
}
// return key not found if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now.UnixNano()) {
return -1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
return time.Duration(record.Expire - now.UnixNano()), nil
}
|
否则需要从 wal 文件中获取数据,并且解码得到 LogRecord,并且判断其是否已经过期了,然后计算得到 ttl。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return-1, err
}
// return key not found if the record is deleted or expired
record = decodeLogRecord(chunk)
if record.Type == LogRecordDeleted {
return-1, ErrKeyNotFound
}
if record.IsExpired(now.UnixNano()) {
b.db.index.Delete(key)
return-1, ErrKeyNotFound
}
// now we get the valid expiry time, we can calculate the ttl
if record.Expire > 0 {
return time.Duration(record.Expire - now.UnixNano()), nil
}
|
Persist
Persist 方法主要用于移除一个 key 的过期时间,这样 key 就能够永久存活了。
首先还是从 pendingWrites 中获取,如果存在的话则直接将 expire 信息设置为 0。
1
2
3
4
5
6
7
8
9
| // if the key exists in pendingWrites, update the expiry time directly
var record = b.lookupPendingWrites(key)
if record != nil {
if record.Type == LogRecordDeleted && record.IsExpired(time.Now().UnixNano()) {
return ErrKeyNotFound
}
record.Expire = 0
return nil
}
|
否则从 wal 文件中读取数据,解码得到 LogRecord,然后将 expire 设置为 0,并且存到临时数组中,待 Commit 时写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
| chunk, err := b.db.dataFiles.Read(position)
if err != nil {
return err
}
record = decodeLogRecord(chunk)
now := time.Now().UnixNano()
// check if the record is deleted or expired
if record.Type == LogRecordDeleted || record.IsExpired(now) {
b.db.index.Delete(record.Key)
return ErrKeyNotFound
}
// if the expiration time is 0, it means that the key has no expiration time,
// so we can return directly
if record.Expire == 0 {
return nil
}
// set the expiration time to 0, and rewrite the record to wal
record.Expire = 0
b.appendPendingWrites(key, record)
|
DB 过期时间接口
前面提到的几个方法都是 Batch 中的,在 DB 实例中也有一个同样的方法,其实和上一节提到的关于 Put、Delete、Get 方法类似,都是对 Batch 接口做了一层简单的封装,类似只有单个操作的 Batch。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // PutWithTTL a key-value pair into the database, with a ttl.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one PutWithTTL operation.
func (db *DB) PutWithTTL(key []byte, value []byte, ttl time.Duration) error {
batch := db.batchPool.Get().(*Batch)
deferfunc() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single put operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.PutWithTTL(key, value, ttl); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
|
过期 key 清理
如果过期的 key 没有及时清理的话,key 和索引都会残留在内存中,造成不必要的空间浪费。
在 rosedb 当中,有两种方式可以对过期的 key 进行清理。
一种是自动的,比如在调用 Get 或者 Exist 的时候,如果判断到 key 是存在的,但是已经过期了,那么会直接删除掉索引中对应的 key。
1
2
3
4
| if record.IsExpired(now) {
b.db.index.Delete(record.Key)
return nil, ErrKeyNotFound
}
|
第二种方式是调用者手动进行清理,rosedb 当中提供了方法 DeleteExpiredKeys ,这个方法主要是扫描索引中的数据,并且判断是否已经过期了,如果是的话则从索引中将其删除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| // delete from index if the key is expired.
for _, pos := range positions {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
innerErr = err
done <- struct{}{}
return
}
record := decodeLogRecord(chunk)
if record.IsExpired(now) {
db.index.Delete(record.Key)
}
db.expiredCursorKey = record.Key
}
|
调用者可以在外层封装一个定时任务,实现自动清理过期 key 的功能。