TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

leveldb源码分析(8) 迭代器 iterator

项目有用到leveldb,打算最近看看源码,整理个系列文章,这是第8篇迭代器 iterator。源码注释地址

代码

纯基类的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class LEVELDB_EXPORT Iterator {
public:
Iterator();
Iterator(const Iterator&) = delete;
Iterator& operator=(const Iterator&) = delete;
virtual ~Iterator();
virtual bool Valid() const = 0;
virtual void SeekToFirst() = 0;
virtual void SeekToLast() = 0;
virtual void Seek(const Slice& target) = 0;
virtual void Next() = 0;
virtual void Prev() = 0;
virtual Slice key() const = 0;
virtual Slice value() const = 0;
virtual Status status() const = 0;
using CleanupFunction = void (*)(void* arg1, void* arg2);
void RegisterCleanup(CleanupFunction function, void* arg1, void* arg2);

private:
struct CleanupNode {
bool IsEmpty() const { return function == nullptr; }
void Run() {
assert(function != nullptr);
(*function)(arg1, arg2);
}

CleanupFunction function;
void* arg1;
void* arg2;
CleanupNode* next;
};
CleanupNode cleanup_head_;
};

涉及的子类:

1
2
3
4
5
6
7
8
9
class MemTableIterator : public Iterator
class DBIter : public Iterator
class ModelIter : public Iterator
class Version::LevelFileNumIterator : public Iterator
class Block::Iter : public Iterator
class EmptyIterator : public Iterator
class MergingIterator : public Iterator
class KeyConvertingIterator : public Iterator
class TwoLevelIterator : public Iterator

MemTableIterator

1
2
3
4
5
6
7
// 对跳表的iterator进行了简单的封装
class MemTableIterator : public Iterator {
// ...
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};

就是封装了MemTable::Table::Iterator,也就是skiplist的封装,skiplist的构造见https://riverferry.site/2021-10-13-leveldb%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90(4)%20memtable%20and%20log/

DBIter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Memtables and sstables that make the DB representation contain
// (userkey,seq,type) => uservalue entries. DBIter
// combines multiple entries for the same userkey found in the DB
// representation into a single entry while accounting for sequence
// numbers, deletion markers, overwrites, etc.
class DBIter : public Iterator {
public:
// Which direction is the iterator currently moving?
// (1) When moving forward, the internal iterator is positioned at
// the exact entry that yields this->key(), this->value()
// (2) When moving backwards, the internal iterator is positioned
// just before all entries whose user key == this->key().
enum Direction { kForward, kReverse };

DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
uint32_t seed)
: db_(db),
user_comparator_(cmp),
iter_(iter),
sequence_(s),
direction_(kForward),
valid_(false),
rnd_(seed),
bytes_until_read_sampling_(RandomCompactionPeriod()) {}

DBIter(const DBIter&) = delete;
DBIter& operator=(const DBIter&) = delete;

~DBIter() override { delete iter_; }
bool Valid() const override { return valid_; }
Slice key() const override {
assert(valid_);
return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
}
Slice value() const override {
assert(valid_);
return (direction_ == kForward) ? iter_->value() : saved_value_;
}
Status status() const override {
if (status_.ok()) {
return iter_->status();
} else {
return status_;
}
}

void Next() override;
void Prev() override;
void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;

private:
void FindNextUserEntry(bool skipping, std::string* skip);
void FindPrevUserEntry();
bool ParseKey(ParsedInternalKey* key);

inline void SaveKey(const Slice& k, std::string* dst) {
dst->assign(k.data(), k.size());
}

inline void ClearSavedValue() {
if (saved_value_.capacity() > 1048576) {
std::string empty;
swap(empty, saved_value_);
} else {
saved_value_.clear();
}
}

// Picks the number of bytes that can be read until a compaction is scheduled.
size_t RandomCompactionPeriod() {
return rnd_.Uniform(2 * config::kReadBytesPeriod);
}

DBImpl* db_;
const Comparator* const user_comparator_;
Iterator* const iter_;
SequenceNumber const sequence_;
Status status_;
std::string saved_key_; // == current key when direction_==kReverse
std::string saved_value_; // == current raw value when direction_==kReverse
Direction direction_;
bool valid_;
Random rnd_;
size_t bytes_until_read_sampling_;
};

Version::LevelFileNumIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// An internal iterator.  For a given version/level pair, yields
// information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an
// 16-byte value containing the file number and file size, both
// encoded using EncodeFixed64.
class Version::LevelFileNumIterator : public Iterator {
public:
LevelFileNumIterator(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>* flist)
: icmp_(icmp), flist_(flist), index_(flist->size()) { // Marks as invalid
}
bool Valid() const override { return index_ < flist_->size(); }
void Seek(const Slice& target) override {
index_ = FindFile(icmp_, *flist_, target);
}
void SeekToFirst() override { index_ = 0; }
void SeekToLast() override {
index_ = flist_->empty() ? 0 : flist_->size() - 1;
}
void Next() override {
assert(Valid());
index_++;
}
void Prev() override {
assert(Valid());
if (index_ == 0) {
index_ = flist_->size(); // Marks as invalid
} else {
index_--;
}
}
Slice key() const override {
assert(Valid());
return (*flist_)[index_]->largest.Encode();
}
Slice value() const override {
assert(Valid());
EncodeFixed64(value_buf_, (*flist_)[index_]->number);
EncodeFixed64(value_buf_ + 8, (*flist_)[index_]->file_size);
return Slice(value_buf_, sizeof(value_buf_));
}
Status status() const override { return Status::OK(); }

private:
const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_;
uint32_t index_;

// Backing store for value(). Holds the file number and size.
mutable char value_buf_[16];
};

flist_用vector保存了这一层所有的sst文件。index_作为迭代器当前指向的位置,value返回16字节的字符串,包含sst文件的编号和大小,key返回sst文件的最大内部key


TwoLevelIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class TwoLevelIterator : public Iterator {
public:
TwoLevelIterator(Iterator* index_iter, BlockFunction block_function,
void* arg, const ReadOptions& options);

~TwoLevelIterator() override;

void Seek(const Slice& target) override;
void SeekToFirst() override;
void SeekToLast() override;
void Next() override;
void Prev() override;

bool Valid() const override { return data_iter_.Valid(); }
Slice key() const override {
assert(Valid());
return data_iter_.key();
}
Slice value() const override {
assert(Valid());
return data_iter_.value();
}
Status status() const override {
// It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_.status().ok()) {
return index_iter_.status();
} else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) {
return data_iter_.status();
} else {
return status_;
}
}

private:
void SaveError(const Status& s) {
if (status_.ok() && !s.ok()) status_ = s;
}
void SkipEmptyDataBlocksForward();
void SkipEmptyDataBlocksBackward();
void SetDataIterator(Iterator* data_iter);
void InitDataBlock();

BlockFunction block_function_;
void* arg_;
const ReadOptions options_;
Status status_;
IteratorWrapper index_iter_;
IteratorWrapper data_iter_; // May be nullptr
// If data_iter_ is non-null, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the data_iter_.
std::string data_block_handle_;
};

关键是data_iter_和index_iter_,后面补充


Block::Iter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Block {
public:
// Initialize the block with the specified contents.
explicit Block(const BlockContents& contents);

Block(const Block&) = delete;
Block& operator=(const Block&) = delete;

~Block();

size_t size() const { return size_; }
Iterator* NewIterator(const Comparator* comparator);

private:
class Iter;

uint32_t NumRestarts() const;

const char* data_;
size_t size_;
uint32_t restart_offset_; // Offset in data_ of restart array
bool owned_; // Block owns data_[]
};

其中的Iter:

1
2
3
class Block::Iter : public Iterator {
// ...
}

EmptyIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class EmptyIterator : public Iterator {
public:
EmptyIterator(const Status& s) : status_(s) {}
~EmptyIterator() override = default;

bool Valid() const override { return false; }
void Seek(const Slice& target) override {}
void SeekToFirst() override {}
void SeekToLast() override {}
void Next() override { assert(false); }
void Prev() override { assert(false); }
Slice key() const override {
assert(false);
return Slice();
}
Slice value() const override {
assert(false);
return Slice();
}
Status status() const override { return status_; }

private:
Status status_;
};

暂时不知道这个有啥用,后面见到了再补充


MergingIterator

1
2
3
class MergingIterator : public Iterator {
// ...
}

后面补充


KeyConvertingIterator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// A helper class that converts internal format keys into user keys
class KeyConvertingIterator : public Iterator {
public:
explicit KeyConvertingIterator(Iterator* iter) : iter_(iter) {}

KeyConvertingIterator(const KeyConvertingIterator&) = delete;
KeyConvertingIterator& operator=(const KeyConvertingIterator&) = delete;

~KeyConvertingIterator() override { delete iter_; }

bool Valid() const override { return iter_->Valid(); }
void Seek(const Slice& target) override {
ParsedInternalKey ikey(target, kMaxSequenceNumber, kTypeValue);
std::string encoded;
AppendInternalKey(&encoded, ikey);
iter_->Seek(encoded);
}
void SeekToFirst() override { iter_->SeekToFirst(); }
void SeekToLast() override { iter_->SeekToLast(); }
void Next() override { iter_->Next(); }
void Prev() override { iter_->Prev(); }

Slice key() const override {
assert(Valid());
ParsedInternalKey key;
if (!ParseInternalKey(iter_->key(), &key)) {
status_ = Status::Corruption("malformed internal key");
return Slice("corrupted key");
}
return key.user_key;
}

Slice value() const override { return iter_->value(); }
Status status() const override {
return status_.ok() ? iter_->status() : status_;
}

private:
mutable Status status_;
Iterator* iter_;
};

ModelIter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class ModelDB : public DB {
// ...
class ModelIter : public Iterator {
public:
ModelIter(const KVMap* map, bool owned)
: map_(map), owned_(owned), iter_(map_->end()) {}
~ModelIter() override {
if (owned_) delete map_;
}
bool Valid() const override { return iter_ != map_->end(); }
void SeekToFirst() override { iter_ = map_->begin(); }
void SeekToLast() override {
if (map_->empty()) {
iter_ = map_->end();
} else {
iter_ = map_->find(map_->rbegin()->first);
}
}
void Seek(const Slice& k) override {
iter_ = map_->lower_bound(k.ToString());
}
void Next() override { ++iter_; }
void Prev() override { --iter_; }
Slice key() const override { return iter_->first; }
Slice value() const override { return iter_->second; }
Status status() const override { return Status::OK(); }

private:
const KVMap* const map_;
const bool owned_; // Do we own map_
KVMap::const_iterator iter_;
};
}

reference

https://web.archive.org/web/20120131105110/http://rdc.taobao.com/blog/cs/wp-content/plugins/leveldb%E5%AE%9E%E7%8E%B0%E8%A7%A3%E6%9E%90.pdf

http://catkang.github.io/2017/01/17/leveldb-data.html

https://www.bookstack.cn/read/Leveldb-handbook/spilt.2.b308da3c3d01f3cd.md

https://izualzhy.cn/leveldb-block

https://hardcore.feishu.cn/docs/doccnWaz1fjIxeyRosRTojk4irs

----------- ending -----------