TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

leveldb源码分析(10) lru cache

leveldb内部实现了一个简单的lru,整理下

通过内部实现的一个简单的hash_table和一个lru双向链表来实现。并且提供了shard的lru_cache,默认分成16片,减少锁冲突,有点像go里面的一个map的分片实现。

LRUHandle

链表节点数据结构定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
bool in_cache; // Whether entry is in the cache.
uint32_t refs; // References, including cache reference, if present.
uint32_t hash; // Hash of key(); used for fast sharding(分片) and comparisons
char key_data[1]; // Beginning of key

Slice key() const {
// next_ is only equal to this if the LRU handle is the list head of an
// empty list. List heads never have meaningful keys.
assert(next != this);

return Slice(key_data, key_length);
}
};
  • delete支持外部传入自定义的回收函数指针
  • next是lru链表的下一节点
  • prev是lru链表的下一节点
  • next_hash是hash table拉链的下一节点
  • refs引用计数,=1的时候在lru_链表,>1的时候在in_use_链表,=0的时候回收资源,在in_use_链表的时候即使cache超过容量,也不能回收
  • charge占用容量

HandleTable

1
2
3
4
5
6
7
8
9
10
class HandleTable {
public:
// ...
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_; // bucket数量
uint32_t elems_; // 节点数量
LRUHandle** list_; // 保存指针的数组,数组大小是length_
}
  • length_是桶的大小,就是list_数组的大小,初始化为4,后面按2倍扩容
  • elems_节点数量,当节点数量>length_的时候,开始扩容,resize函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 如果没有冲突,Insert后返回空
// 如果冲突,覆盖冲突节点(hash表key不能重复)
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == nullptr ? nullptr : old->next_hash);
*ptr = h;
if (old == nullptr) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
// 扩容的时机比较特殊,没有按装载因子算,元素个数大于桶的大小就扩容了, 链表不会太长
Resize();
}
}
return old;
}

如果是新的key,就插入到桶的拉链的尾。

如果是存在的key,则覆盖原来的值,insert完后调用FinishErase(table_.Insert(e)),来回收原来的key.

1
2
3
4
5
6
7
8
9
10
// 如果是存在的节点,直接覆盖节点的地址为next_hash的地址,return 原来的地址(finish函数remove)
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != nullptr) {
*ptr = result->next_hash;
--elems_;
}
return result;
}

LRUCache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// A single shard of sharded cache.
class LRUCache {
public:
LRUCache();
~LRUCache();

// Separate from constructor so caller can easily make an array of LRUCache
void SetCapacity(size_t capacity) { capacity_ = capacity; }

// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* Insert(const Slice& key, uint32_t hash, void* value,
size_t charge,
void (*deleter)(const Slice& key, void* value));
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}

private:
void LRU_Remove(LRUHandle* e);
void LRU_Append(LRUHandle* list, LRUHandle* e);
void Ref(LRUHandle* e);
void Unref(LRUHandle* e);
bool FinishErase(LRUHandle* e) EXCLUSIVE_LOCKS_REQUIRED(mutex_);

// Initialized before use.
// 容量
size_t capacity_;

// mutex_ protects the following state.
mutable port::Mutex mutex_;
// 使用量
size_t usage_ GUARDED_BY(mutex_);

// Dummy head of LRU list.
// lru.prev is newest entry, lru.next is oldest entry.
// Entries have refs==1 and in_cache==true.
// lru头结点,头结点是dummy不保存外部数据
// refs==1在lru_链表,>1在in_use链表,1代表cache本身的引用,client每使用一次会+1
LRUHandle lru_ GUARDED_BY(mutex_);

// Dummy head of in-use list.
// Entries are in use by clients, and have refs >= 2 and in_cache==true.
// in_use头结点,被client使用的在in_use_链表,其他的在lru_链表
LRUHandle in_use_ GUARDED_BY(mutex_);

HandleTable table_ GUARDED_BY(mutex_);
};

引用计数>1放在in_use_链表,=1放在lru_,=0就开始释放资源了。

1
2
3
4
5
6
7
8
9
10
11
12
void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
// Make "e" newest entry by inserting just before *list
e->next = list;
e->prev = list->prev;
e->prev->next = e;
e->next->prev = e;
}

void LRUCache::LRU_Remove(LRUHandle* e) {
e->next->prev = e->prev;
e->prev->next = e->next;
}

lru_和in_use_是dummy的头结点,初始化的时候,next和prev都指向自己。后面append插入dummy的左边第一个位置,remove移除dummy右边的第一个元素,因为是循环链表,所以dummy即是头也是尾。

1
2
3
4
5
6
7
8
Cache::Handle* LRUCache::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != nullptr) {
Ref(e);
}
return reinterpret_cast<Cache::Handle*>(e);
}

通过hash table来实现O(1)的查找,没查一次引用计数+1,如果再lru_,则重新放入in_use_.

ShardedLRUCache

1
2
3
4
5
6
7
8
9
10
11
12
13
static const int kNumShardBits = 4;
// 分片,16个
static const int kNumShards = 1 << kNumShardBits;

class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;

// ...

}

用数组存了多个LRUCache,基本的实现,一目了然。

----------- ending -----------