詳解全網(wǎng)最快 Go 泛型跳表【內(nèi)附源碼】
背景
最近一年我們用Go語言實現(xiàn)的業(yè)務(wù)系統(tǒng)至少70%,因此我們Review了大量的Go代碼,也看了很多相關(guān)的技術(shù)資料。Go語言有兩個不友好的點,一個是錯誤處理,另一個是泛型。我們 調(diào)研市面上是否有類似C++中STL的泛型庫,結(jié)果發(fā)現(xiàn)它們要么很薄弱,要么根本就不支持泛型。
于是本人寫了個基于泛型的容器和算法庫,名為stl4go(
文末有源碼鏈接
)。其中的有序 Map 沒有選擇紅黑樹,而是用了跳表。經(jīng)過優(yōu)化后,stl4go算得上是 GitHub 上能找到的、最快的 Go 實現(xiàn)。如果你感興趣,歡迎繼續(xù)往下閱讀。
跳表是什么
跳表(skiplist)是一種 隨機(jī)化的數(shù)據(jù) , 由 William Pugh 在論文Skip lists: a probabilistic alternative to balanced trees中提出。?跳表以有序的方式在層次化的鏈表中保存元素, 效率和平衡樹媲美——查找、刪除、添加等操作都可以在O(logN)期望時間下完成, 綜合能力相當(dāng)于平衡二叉樹。比起平衡樹來說, 跳躍表的實現(xiàn)要簡單直觀得多,核心功能在200行以內(nèi)即可實現(xiàn),遍歷的時間復(fù)雜度是O(N)。代碼簡單、空間節(jié)省,在挺多的場景得到應(yīng)用。
SkipList用于需要有序的場合。在不需要有序的場景下,go自帶的map容器依然是優(yōu)先選擇。

接口設(shè)計
1)創(chuàng)建函數(shù)
對于可以用 <、== 比較的類型,可以使用簡單的創(chuàng)建函數(shù)。對于不可以直接比較的,也可以通過提供自定義的比較函數(shù)來創(chuàng)建。
// 對于Key可以用 < 和 == 運(yùn)算符比較的類型,調(diào)這個函數(shù)來創(chuàng)建
func NewSkipList[K Ordered, V any]() *SkipList[K, V]
// 其他情況,需要自定義Key類型的比較函數(shù)
func NewSkipListFunc[K any, V any](keyCmp CompareFn[K]) *SkipList[K, V]
// 從一個map來構(gòu)建,僅為方便寫Literal,go沒法對自定義類型使用初始化列表。
func NewSkipListFromMap[K Ordered, V any](m map[K]V) *SkipList[K, V]
2)主要方法
IsEmpty() bool // 表是否為空
Len() int // 返回表中元素的個數(shù)
Clear() // 清空跳表
Has(K) bool // 檢查跳表中是否存在指定的key
Find(K) *V // Finds element with specific key.
Insert(K, V) // Inserts a key-value pair in to the container or replace existing value.
Remove(K) bool // Remove element with specific key.
還有迭代器和遍歷區(qū)間查找等功能與本主題關(guān)系不大,本文略去講述。上文, 可以看得出,它完全可以滿足有序Map容器的要求。
3)節(jié)點定義
雖然不少講跳表原理示意圖會把每層的索引節(jié)點單獨(dú)列出來:

但是一般的實現(xiàn)都會把索引節(jié)點實現(xiàn)為最底層節(jié)點的一個數(shù)組,這樣每個元素只需要一個節(jié)點,節(jié)省了單獨(dú)的索引節(jié)點的存儲開銷,也提高了性能。
因此節(jié)點定義如下:
type skipListNode[K any, V any] struct {
key K
value V
// 指向下一個節(jié)點的指針的數(shù)組,不同深度的節(jié)點長度不同,[0]表示最下層
next []*skipListNode[K, V]
}

代碼優(yōu)化
代碼并非完全從頭開始寫的,我們是以[email protected]的gostl的實現(xiàn)為基礎(chǔ)。其 實現(xiàn)比較簡潔,只有200多行代碼。支持自定義數(shù)據(jù)類型比較,但是不支持泛型。我們在他的基礎(chǔ)上, 做了一系列的算法和內(nèi)存分配等方面的優(yōu)化,并增加了迭代器、區(qū)間查找等功能 。
liyue代碼地址:https://github.com/liyue201/gostl/blob/master/ds/skiplist/skiplist.go
1)算法優(yōu)化
-
生成隨機(jī)Level的優(yōu)化
每次跳表插入元素時,需要隨機(jī)生成一個本次的層數(shù),最樸素的實現(xiàn)方式是拋硬幣。也就是根據(jù)連續(xù)獲得正面的次數(shù)來決定層數(shù)。
func randomLevel() int {
level := 0
for math.Float64() < 0.5 {
level++
}
return level
}
Redis里的算法類似,只不過用的是1/4的級數(shù)差,索引少一半,可以節(jié)省一些內(nèi)存,源代碼如下:
https://github.com/redis/redis/blob/7.0/src/t_zset.c#L118-L128
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
static const int threshold = ZSKIPLIST_P*RAND_MAX;
int level = 1;
while (random() < threshold)
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
上述方法簡單直白,但是存在兩個問題:
第一,math.Float64() (以及任何全局隨機(jī)函數(shù))內(nèi)部為共享的隨機(jī)數(shù)生成器對象,每次調(diào)用都會加鎖解鎖,在競爭情況下性能下降很厲害。第二,多次生成隨機(jī)數(shù)??赐晗挛哪銜l(fā)現(xiàn),只用生成一次就可以。詳情參見源代碼如下:
https: //cs.opensource.google/go/go/+/refs/tags/go1.19:src/math/rand/rand.go
所以我們在gostl的實現(xiàn)中改用了生成一個某范圍內(nèi)的隨機(jī)數(shù)。根據(jù)其均勻分布的特點來計算level:
func (sl *Skiplist) randomLevel() int {
total := uint64(1)<<uint64(sl.maxLevel) - 1 // 2^n-1
k := sl.rander.Uint64() % total
levelN := uint64(1) << (uint64(sl.maxLevel) - 1)
level := 1
for total -= levelN; total > k; level++ {
levelN >>= 1
total -= levelN
}
return level
}
這段for循環(huán)有些拗口,改寫一下就更清晰了:
level := 0
for k < total {
levelN >>= 1
total -= levelN
level++
}
也就是生成的隨機(jī)數(shù)越小則level越高。比如maxLevel為10時,total=1023,那么:
512<k<1023之間的概率為1/2,level=1
256<k<511之間的概率為1/4,level=2
128<k<255之間的概率為1/8,level=3
...
當(dāng)level比較高時,循環(huán)次數(shù)就會增加 。不過,我們可以觀察到在生成的隨機(jī)二進(jìn)制中,數(shù)值增減一半正好等于改變一個bit位。因此我們改用直接調(diào)用math/bits里的Len64()函數(shù),來計算生成的隨機(jī)數(shù)的最小位數(shù)的方式來實現(xiàn):
func (sl *SkipList[K, V]) randomLevel() int {
total := uint64(1)<<uint64(skipListMaxLevel) - 1 // 2^n-1
k := sl.rander.Uint64() % total
return skipListMaxLevel - bits.Len64(k) + 1
}
而Len64函數(shù)是用查表實現(xiàn)的,相當(dāng)?shù)目欤?/span>
https://github.com/golang/go/blob/go1.19/src/math/bits/bits.go#L330-L345
// Len64 returns the minimum number of bits required to represent x;
// the result is 0 for x == 0.
func Len64(x uint64) (n int) {
// ...
return n + int(len8tab[x])
}
當(dāng)level>1時,時間開銷就從循環(huán)變成固定開銷,會快一點點。
-
自適應(yīng)Level
很多實現(xiàn)都把level硬編碼成全局或者實例級別的常量,比如在gostl中就是如此。 sl.maxLevel是一個實例級別的固定常量,跳表創(chuàng)建后便不再修改,因此有兩個問題:
首先,當(dāng)實際元素很少時,查找函數(shù)中循環(huán)的前幾次cur變量基本上都是空指針,白白浪費(fèi)時間查找,所以他的實現(xiàn)里 defaultMaxLevel設(shè)置的很小 。
其次,由于默認(rèn)的maxLevel很?。ㄖ挥?0),插入1024個元素后,最上層基本上就接近平衡二叉樹的情況了。如果再繼續(xù)插入大量的元素,每層索引節(jié)點數(shù)量都快速增加,性能急劇下降。如果在構(gòu)造時就根據(jù)預(yù)估容量設(shè)置一個足夠大的maxLevel,可避免這個問題。但是很多時候這個數(shù)不是那么好預(yù)估,而且用起來不方便,漏設(shè)置又可能會導(dǎo)致意料之外的性能惡化。
因此我們把level設(shè)計為根據(jù)元素的個數(shù)動態(tài)自適應(yīng)調(diào)整: 設(shè)置一個level成員記錄最高的level值; 當(dāng)插入元素時,如果出現(xiàn)了更高的層,再插入后就調(diào)大level; 當(dāng)刪除元素時,如果最頂層的索引變空了,就減少level。 通過這種方式,就解決了上述問題。
// Insert inserts a key-value pair into the skiplist.
// If the key is already in the skip list, it's value will be updated.
func (sl *SkipList[K, V]) Insert(key K, value V) {
// 處理key已存在的情況,略去
level := sl.randomLevel()
node = newSkipListNode(level, key, value)
// 插入鏈表,略去
if level > sl.level {
// Increase the level
for i := sl.level; i < level; i++ {
sl.head.next[i] = node
}
sl.level = level
}
sl.len++
}
為了防止在一開始元素個數(shù)很小時就生成了很大的隨機(jī)level,我們在 randomLevel 里做了一下限制,最大允許生成的level為log2(Len())+2(2是個拍腦袋決定的余量)。
-
插入刪除優(yōu)化
插入時如果key不存在或者刪除時節(jié)點存在,需要找到每層索引中的前一個節(jié)點,放入prevs數(shù)組返回,用于插入或者刪除節(jié)點后各層鏈表的重新組織。 gostl的實現(xiàn),是先在findPrevNodes函數(shù)里的循環(huán)中得到所有的prevs,然后再比較[0]層的值來判斷key是否相等決定更新或者返回。源碼如下:
https://github.com/liyue201/gostl/blob/e5590f19a43ac53f35893c7c679b37d967c4859c/ds/skiplist/skiplist.go#L186-L201
這個函數(shù)會從頂層遍歷到最底層:
func (sl *Skiplist) findPrevNodes(key interface{}) []*Node {
prevs := sl.prevNodesCache
prev := &sl.head
for i := sl.maxLevel - 1; i >= 0; i-- {
if sl.head.next[i] != nil {
for next := prev.next[i]; next != nil; next = next.next[i] {
if sl.keyCmp(next.key, key) >= 0 {
break
}
prev = &next.Node
}
}
prevs[i] = prev
}
return prevs
}
插入時,再取最底層的節(jié)點的下一個,進(jìn)一步比較是否相等:
// Insert inserts a key-value pair into the skiplist
func (sl *Skiplist) Insert(key, value interface{}) {
prevs := sl.findPrevNodes(key)
if prevs[0].next[0] != nil && sl.keyCmp(prevs[0].next[0].key, key) == 0 {
// 如果相等,其實prevs就沒用了,但是findPrevNodes里依然進(jìn)行了查詢
// same key, update value
prevs[0].next[0].value = value
return
}
...
}
值得注意得失,再插入key時如果已經(jīng)節(jié)點存在,或者刪除key時節(jié)點不存在,是不需要調(diào)整每層節(jié)點的,前面辛辛苦苦查找的prevs就沒用了。 我們在這里做了個優(yōu)化,在這種情況下提前返回,不再繼續(xù)找所有的prevs。 以插入為例:
// findInsertPoint returns (*node, nil) to the existed node if the key exists,
// or (nil, []*node) to the previous nodes if the key doesn't exist
func (sl *skipListOrdered[K, V]) findInsertPoint(key K) (*skipListNode[K, V], []*skipListNode[K, V]) {
prevs := sl.prevsCache[0:sl.level]
prev := &sl.head
for i := sl.level - 1; i >= 0; i-- {
for next := prev.next[i]; next != nil; next = next.next[i] {
if next.key == key {
// Key 已經(jīng)存在,停止搜索
return next, nil
}
if next.key > key {
// All other node in this level must be greater than the key,
// search the next level.
break
}
prev = next
}
prevs[i] = prev
}
return nil, prevs
}
node和prevs只會有一個不空:
// Insert inserts a key-value pair into the skiplist.
// If the key is already in the skip list, it's value will be updated.
func (sl *SkipList[K, V]) Insert(key K, value V) {
node, prevs := sl.impl.findInsertPoint(key)
if node != nil {
// Already exist, update the value
node.value = value
return
}
// 生成及插入新節(jié)點,略去
}
刪除操作的優(yōu)化方式類似,不再贅述。
-
數(shù)據(jù)類型特有的優(yōu)化
Ordered類型的跳表如果是升序的,可以直接用NewSkipList來創(chuàng)建。 對于用得較少的降序或者Key是不可比較的類型,就需要通過傳入的比較函數(shù)來比較Key 。
一開始的實現(xiàn)為了簡化。對于Ordered的SkipList,我們是通過調(diào)用SkipListFunc來實現(xiàn)的。這樣可以節(jié)省不少代碼,實現(xiàn)起來很簡單。 但是Benchmark時,跑不過一些較快地實現(xiàn)。主要原因在 比較函數(shù)的函數(shù)調(diào)用 上。以查找為例:
// Get returns the value associated with the passed key if the key is in the skiplist, otherwise returns nil
func (sl *Skiplist) Get(key interface{}) interface{} {
var pre = &sl.head
for i := sl.maxLevel - 1; i >= 0; i-- {
cur := pre.next[i]
for ; cur != nil; cur = cur.next[i] {
cmpRet := sl.keyCmp(cur.key, key)
if cmpRet == 0 {
return cur.value
}
if cmpRet > 0 {
break
}
pre = &cur.Node
}
}
return nil
}
在C++中,比較函數(shù)可以是無狀態(tài)的函數(shù)對象,其()運(yùn)算符是可以inline的。但是在Go中, 比較函數(shù)只能是函數(shù)指針 ,sl.keyCmp調(diào)用無法被inline。因此簡單的類型,其開銷占的比例很大。
我們一開始用的優(yōu)化手法,是在運(yùn)行期間根據(jù)硬編碼的key的類型,進(jìn)行類型轉(zhuǎn)換后調(diào)優(yōu)化的實現(xiàn)。這種方式雖然湊效但是代碼可讀性差。其中,用到了硬編碼的類型列表、運(yùn)行期類型switch等機(jī)制,甚至還需要代碼生成。
后來我們摸索出更優(yōu)的方法。 通過同一個接口,基于比較Key是作為Ordered還是通過Func的方式,來提供不同實現(xiàn)的方式 。這不需要任何強(qiáng)制類型轉(zhuǎn)換:
type skipListImpl[K any, V any] interface {
findNode(key K) *skipListNode[K, V]
lowerBound(key K) *skipListNode[K, V]
upperBound(key K) *skipListNode[K, V]
findInsertPoint(key K) (*skipListNode[K, V], []*skipListNode[K, V])
findRemovePoint(key K) (*skipListNode[K, V], []*skipListNode[K, V])
}
// NewSkipList creates a new SkipList for Ordered key type.
func NewSkipList[K Ordered, V any]() *SkipList[K, V] {
sl := skipListOrdered[K, V]{}
sl.init()
sl.impl = (skipListImpl[K, V])(&sl)
return &sl.SkipList
}
// NewSkipListFunc creates a new SkipList with specified compare function keyCmp.
func NewSkipListFunc[K any, V any](keyCmp CompareFn[K]) *SkipList[K, V] {
sl := skipListFunc[K, V]{}
sl.init()
sl.keyCmp = keyCmp
sl.impl = skipListImpl[K, V](&sl)
return &sl.SkipList
}
對于 Len()、IsEmpty()等,則不放進(jìn)接口里。這有利于編譯器inline優(yōu)化。
2)內(nèi)存分配優(yōu)化
無論是理論上還是實測,內(nèi)存分配對性能的影響還是較大。Go不像Java和C#的堆內(nèi)存分配那么簡單,因此 應(yīng)當(dāng)減少不必要的內(nèi)存分配 。
圖來源:Go 生態(tài)下的字節(jié)跳動大規(guī)模微服務(wù)性能優(yōu)化實踐
-
Cache Prevs節(jié)點
在插入時,如果節(jié)點先前不存在或者刪除時節(jié)點存在,那么就需要獲得所有層的指向該位置的節(jié)點數(shù)組。全網(wǎng)有好幾個實現(xiàn)案例都采用了在SkipList實例級別預(yù)先分配一個slice的辦法,經(jīng)測試比起每次都創(chuàng)建slice返回確實有相當(dāng)明顯的性能提升。
-
節(jié)點分配優(yōu)化
不同level的節(jié)點數(shù)據(jù)類型是相同的,但是其next指針數(shù)組的長度不同。一些簡單粗暴的實現(xiàn)是設(shè)置為固定的最大深度,由于跳表中絕大多數(shù)節(jié)點都只落在最低幾層,浪費(fèi)了較多的內(nèi)存。
另外一種做法是改為動態(tài)分配,那么就多一次內(nèi)存分配。 我們的做法是根據(jù)不同的深度定義不同的結(jié)構(gòu)體,額外包含一個相應(yīng)長度的nexts節(jié)點指針數(shù)組。在node的next切片指向這個數(shù)組,可以就減少一次內(nèi)存分配。 并且由于nexts數(shù)組和node的地址是在一起的,cache局部性也更好。
// newSkipListNode creates a new node initialized with specified key, value and next slice.
func newSkipListNode[K any, V any](level int, key K, value V) *skipListNode[K, V] {
switch level {
case 1:
n := struct {
head skipListNode[K, V]
nexts [1]*skipListNode[K, V]
}{head: skipListNode[K, V]{key, value, nil}}
n.head.next = n.nexts[:]
return &n.head
case 2:
n := struct {
head skipListNode[K, V]
nexts [2]*skipListNode[K, V]
}{head: skipListNode[K, V]{key, value, nil}}
n.head.next = n.nexts[:]
return &n.head
// 一直到 case 40 ...
}
}
這么多啰嗦的代碼
不適合手寫,可以
弄個bash腳本通過go generate生成
。
我們在調(diào)試這段代碼時發(fā)現(xiàn),go的switch case語句對簡單的全數(shù)值,也是通過二分法而非C++常用的跳轉(zhuǎn)表來實現(xiàn)的。也許是因為有更耗時的內(nèi)存分配,我們嘗試把case 1、2等單獨(dú)拿出來也沒有提升。因此,暫且估計這里對性能沒有影響。 如果case非常多的話,可以考慮對最常見的case單獨(dú)處理,或者用函數(shù)指針數(shù)組來優(yōu)化。

C++實現(xiàn)
類似的代碼在C++中由于支持模板非類型參數(shù),可以簡單不少:
template <typename K, typename V>
struct Node {
K key;
V value;
size_t level;
Node* nexts[0];
SkipListNode(key, V value) : level(level), key(std::move(key)), value(std::move(value)) {}
};
template <typename K, typename V, int N> // 注意 N 可以作為模板參數(shù)
struct NodeN : public Node {
NodeN(K key, V value) : Node(N, key, value) {}
Node* nexts[N] = {};
};
Node* NewNode(int level, K key, V value) {
switch (level) {
case 1: return new NodeN<K, V, 1>(key, value);
case 2: return new NodeN<K, V, 2>(key, value);
case 3: return new NodeN<K, V, 3>(key, value);
...
}
}
用C(當(dāng)然在C++中也可以用)的flexible array代碼則會更簡單一些:
Node* NewNode(int level, K key, V value) {
auto p = malloc(sizeof(Node*) + level * sizeof(Node*));
return new(p) Node(std::move(key), std::move(value));
}
由于C和C++中的next數(shù)組,不需要通過切片(相當(dāng)于指針)來指向nexts數(shù)組,少了一次內(nèi)存尋址,所以理論上性能更好一些。 C++實現(xiàn)為Go代碼的手工轉(zhuǎn)譯,功能未做充分的驗證,僅供對比評測。

Benchmark
[email protected]實現(xiàn)了一個以float64為key的跳表,并和其他實現(xiàn)做了個比較,證明自己的最快。相關(guān)地址如下:
https://github.com/sean-public/fast-skiplist
https://github.com/sean-public/skiplist-survey
我們在他的基礎(chǔ)上添加了一些其他的實現(xiàn)和我們的實現(xiàn),做了benchmark。優(yōu)化的數(shù)據(jù)類型優(yōu)化就是基于此評測結(jié)果做的。相關(guān)地址如下:
https://github.com/chen3feng/skiplist-survey
以下是部分評測結(jié)果,數(shù)值越小越好:
雖然有少量指標(biāo)不是最快的,但是總體上(大部分指標(biāo))超越了我們在github上找到的其他實現(xiàn)。并且大部分其他實現(xiàn)key只支持int64或者float64,使得無法用于string等類型。
另外,我們也對C++的實現(xiàn)測了一下性能:
我們發(fā)現(xiàn)Go的實現(xiàn)性能很多指標(biāo)基本接近C++,其中Delete反而更快一些。是因為C++在刪除時要析構(gòu)節(jié)點并釋放內(nèi)存,而Go采用GC的方式延后旁路處理。歡迎交流討論。
源碼:https://github.com/chen3feng/stl4go
推薦閱讀
我為大家整理了一份 從入門到進(jìn)階的Go學(xué)習(xí)資料禮包 ,包含學(xué)習(xí)建議:入門看什么,進(jìn)階看什么。 關(guān)注公眾號 「polarisxu」,回復(fù)? ebook ?獲??;還可以回復(fù)「進(jìn)群」,和數(shù)萬 Gopher 交流學(xué)習(xí)。
