又超時了!Etcd分布式鎖你用明白了嗎?

現象
線上程序報錯,錯誤信息:lock failed: context deadline exceeded, retry
問題排查
很明顯的是獲取鎖超時了,由于用的 etcd 的分布式鎖,可能是 etcd 出問題了,此時看到大量 etcd 日志,rejected connection from "ip:port" (error "tls: first record does not look like a TLS handshake", ServerName ""),懷疑是不是這個問題導致的,經過查詢報錯的 IP,均為線上容器 IP,登陸容器內看發(fā)現都是管理員平臺的代碼,沒報錯的集群也有相同的日志,這不是導致超時的原因。在排除各種可能之后,最后去 etcd 查看鎖對應的 key 的情況,發(fā)現有兩個 key:
/notifier/locker/{leaseid}
/notifier/locker/rwl/{leaseid}
其中第一個 key 是 notifier 自己添加的,第二個 key 在代碼中搜不到,但是看起來像是 redis whitelist 的簡寫,先把第一個 key 刪了,然后看 notifier 日志,仍然獲取不到鎖,所以懷疑是第二個 key 已經獲得了鎖,雖然 key 不一樣。于是刪除了第二個 key,再看 notifier 日志,終于獲得了鎖,開始正常工作,于是得出猜想,etcd 的分布式鎖,在子目錄下加了鎖之后,父目錄會加鎖失敗。然后用 etcdctl lock 來驗證了下,確實如此,/a/b 下加了鎖,/a 再加鎖就會失敗,但是/a 下加了鎖,/a/b 再加鎖會成功。基本上可以驗證上面的猜想,剩下的就是從 etcd 源碼中找到對應處理的代碼了。
etcd 源碼部分
在查詢源碼之前,第一反應就是這肯定是在服務端實現的,于是開始了從 etcd 服務端找相關源碼的過程,從 etcdctl 命令開始追溯到所涉及的服務端,一直沒有發(fā)現問題。又在網上搜了相關 etcd 服務端源碼實現的文章,結合本地代碼均沒有想找的代碼,于是反過來從 client 找起。
首先從 etcdctl lock 命令開始,選擇主要函數展示
// 代碼位置go.etcd.io/etcd/etcdctl/ctlv3/command/lock_command.go
func lockUntilSignal(c *clientv3.Client, lockname string, cmdArgs []string) error {
...
if err := m.Lock(ctx); err != nil {
return err
}
...
}
接下來進入到 Lock 函數,這是個關鍵函數,etcd 的分布式鎖就是在這里實現的
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
// 這里的pfx就是prefix,就是傳進來的前綴,后面的s.Lease()會返回一個租約,是一個int64的整數,和session有關
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
// 這里比較上面prefix/lease的createrevision是否為0,為0表示目前不存在該key,需要執(zhí)行Put操作,下面可以看到
// 不為0表示已經有對應的key了,只需要執(zhí)行Get就行
// createrevision是自增的
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// 獲取所得持有者
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
// 比較如果當前沒有人獲得鎖或者鎖的owner的createrevision等于當前的kv的revision,則表示已獲得鎖,就可以退出了
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// 為了驗證自己加的打印信息
//fmt.Printf("ownerKey: %s\n", ownerKey)
// 走到這里代表沒有獲得鎖,需要等待之前的鎖被釋放,即revision小于當前revision的kv被刪除
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
// waitDeletes 等待所有當前比當前key的revision小的key被刪除后,鎖釋放后才返回
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
// 為了調試自己加的這句
fmt.Printf("wait for %s to delete\n", lastKey)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()
var wr v3.WatchResponse
// wch是個channel,key被刪除后會往這個chan發(fā)數據
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}
看完上面的代碼基本知道了 etcd 分布式鎖的實現機制了,但是還沒看到哪里和前綴 Prefix 相關了。其實答案就藏在 getOwner 里,看上述代碼,不管是執(zhí)行 Put 還是 Get,最終都有個 getOwner 的過程,看一下這個 getOwner,options 模式里有個 v3.WithFirstCreate 函數調用,看下這個函數
// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }
// withTop gets the first key over the get's prefix given a sort order
func withTop(target SortTarget, order SortOrder) []OpOption {
return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}
// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
return func(op *Op) {
if len(op.key) == 0 {
op.key, op.end = []byte{0}, []byte{0}
return
}
op.end = getPrefix(op.key)
}
}
看到上面的是三個函數后,大致就找到了對應的源碼的感覺,因為看到了 WithPrefix 函數,和上面的猜測正好匹配。所以 getOwner 的具體執(zhí)行效果是會把所有以 lockkey 開頭的 kv 都拿到,且按照 createrevision 升序排列,取第一個值,這個意思就很明白了,就是要拿到當前以 lockkey 為 prefix 的且 createrevision 最小的那個 key,就是目前已經拿到鎖的 key。
看了上面的源碼就可以明白為什么/a/b 加了鎖之后,/a 加鎖會超時了,因為在 getOwner 時,拿到了/a/b,且 createrevision 小于/a 的 revision,于是/a 就會等待/a/b 被刪除后,watch chanel 有數據后才能獲得鎖。
看到這里還有個需要確認的問題,那就是如果/ab 加鎖了,那么再對/a 加鎖會怎么樣?/a 肯定是/ab 的 prefix 啊,是不是也會加鎖失敗呢?
結論是會加鎖成功,看下源碼
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
可以看到在 NewMutex 時并不是直接拿傳進來的 pfx 作為 prefix 的,而且在后面加了個"/",所以/ab 加了鎖,/a 加鎖還是可以成功的。一般查找 prefix 或 suffix 時都會加上固定的分隔符,要不然就會出現誤判。
總結
通過分析問題,看源碼,可以了解到 etcd 鎖的實現原理,以及可能存在的小坑。Etcd 把鎖的實現放在了 client 端,這樣的話,可以直接修改 client 端代碼來修改其鎖的實現,或者使用不同版本的 etcd client 時就可能出現雖然共用一個服務端,但是 etcd 鎖行為卻不一致的問題,不過又有誰會閑著沒事這么玩呢。這里也是提醒大家注意一下此類問題,萬一以后碰到了可以快速定位解決。


你可能還喜歡
點擊下方圖片即可閱讀

云原生是一種信仰 ??
關注公眾號
后臺回復?k8s?獲取史上最方便快捷的 Kubernetes 高可用部署工具,只需一條命令,連 ssh 都不需要!


點擊 "閱讀原文" 獲取更好的閱讀體驗!
發(fā)現朋友圈變“安靜”了嗎?


