如何編碼檢查依賴關(guān)系是否有循環(huán)依賴
什么是永不過時的技能:算法思維。
之前做數(shù)據(jù)倉庫的運維,上線部署時需要處理很多任務(wù)的依賴關(guān)系,所謂任務(wù),就是一個一個 shell 腳本或者存儲過程等批處理任務(wù),他們之間是有依賴關(guān)系的,由于數(shù)據(jù)倉庫的任務(wù)超級多,約 3000 多個任務(wù),這么多的任務(wù)是無法使用一張有向無環(huán)圖來表示,因此依賴關(guān)系除了使用直觀的有向連線來配置,還使用了隱藏式的配置,就是依賴關(guān)系無法使用有向線條來直觀的看到。
既然看不到,就有可能出現(xiàn)循環(huán)依賴而不自知,只要有可能,就一定會有人犯錯,不是你就是他,不是今天就是未來某一天,這就是墨菲定律。這不,我就經(jīng)歷過。
調(diào)度平臺用的是先進數(shù)通的 MoiaControl V5,這是我用過的最好的調(diào)度平臺了,之前用過 ETlplus,Airflow。但 MoiaControl 中出現(xiàn)循環(huán)依賴并不提示,會導(dǎo)致第二天的任務(wù)不會跑批,影響數(shù)據(jù)的時效性。假如你準(zhǔn)備面試先進數(shù)通這家公司,說你可以為該產(chǎn)品增加一項檢查否有循環(huán)依賴的功能,我想這一定是個加分項。
那問題來了,如何編碼檢查任務(wù)依賴關(guān)系是否有循環(huán)依賴?
答案很簡單,就是構(gòu)造一個有向圖,進行拓?fù)渑判颍绻負(fù)渑判蚝鬀]有未訪問的點,那就沒有環(huán),否則就有環(huán)。
下面,我用 Python 來演示這一解決過程,帶你徹底掌握拓?fù)渑判颉?/p>
首先,我們需要借助一種數(shù)據(jù)結(jié)構(gòu)來表示有向圖,使用方便即可,這里,我使用字典來表示,比如表達 a->b, a->c, c->d 這樣的依賴關(guān)系,我們可以構(gòu)造字典 edges = { 'a':{'b','c'},'c':{'d'} } 來表示。字典的鍵表示前驅(qū)任務(wù),字典的值是一個集合,表示依賴前驅(qū)的任務(wù)集合。這樣的字典可以借助于標(biāo)準(zhǔn)庫的 collections 來快速初始化:
edges?=?collections.defaultdict(set)
僅保存邊是不夠的,我們還需要保存頂點,這可以借助一個集合,它可以自動去重,后面看是否所有的任務(wù)節(jié)點都參與了拓?fù)渑判颍涂克恕?/p>
self.edges?=?collections.defaultdict(set)
vertex?=?set()
接下來就是拓?fù)渑判虻拇a實現(xiàn)了。
拓?fù)渑判蛞话銇碚f有兩種思路,一種是廣度優(yōu)先遍歷,借助于先進先出的隊列,一種是深度優(yōu)先遍歷,借助于后進先出的棧。無論哪一種思路,都與入度和出度有關(guān)。下面分別進行分析。
廣度優(yōu)先遍歷比較符合人的習(xí)慣思維,從前到后逐層推進。它首先找出不被任何任務(wù)依賴的任務(wù)進入隊列,哪一種任務(wù)不被任何任務(wù)依賴呢?比如 a->b->c ,a 就是不被任何任務(wù)依賴的任務(wù),這樣的任務(wù)有個特點,就是入度為 0,沒有箭頭指向的任務(wù)的入度就是 0。
首先,我們計算所有節(jié)點的入度,把所有入度為 0 的任務(wù)依次放入隊列,然后開始循環(huán)遍歷隊列,取出第一個任務(wù),記為 a,標(biāo)記為已訪問,同時將依賴于 a 的任務(wù)的入度都減少 1,如果減少 1 后入度為 0 的任務(wù)放入隊列。繼續(xù)循環(huán),直到所有的節(jié)點都被訪問。如果循環(huán)結(jié)束,仍有節(jié)點未被遍歷,說明存在循環(huán)依賴,無論如何他們的入度也不可能為 0。
以上的思路,翻譯成代碼,如下所示:
import?collections
class?CheckCycle(object):
????def?__init__(self):
????????self.vertex?=?set()??#?頂點集合
????????self.edges?=?collections.defaultdict(
????????????set
????????)??#?使用字典表示有向邊?如 a ->?{b,c,e}?表示 b,c,e 均依賴 a
????????self.indegree?=?collections.defaultdict(int)??#?計算每個頂點的入度
????def?add_edge(self,?from_job:?str,?to_job:?str)?->?bool:
????????"""
????????添加一條邊
????????"""
????????if?from_job?==?to_job:
????????????return?False
????????if?from_job:
????????????self.vertex.add(from_job)
????????????if?not?from_job?in?self.indegree:
????????????????self.indegree[from_job]?=?0??#?初始化入度為?0
????????if?to_job:
????????????self.vertex.add(to_job)
????????????if?not?to_job?in?self.indegree:??#?初始化入度為0
????????????????self.indegree[to_job]?=?0
????????if?from_job?and?to_job:
????????????if?to_job?not?in?self.edges[from_job]:??#?防止充分添加相同的邊
????????????????self.indegree[to_job]?+=?1??#?入度加 1
????????????????self.edges[from_job].add(to_job)??#?防止充分添加相同的邊
????????return?True
????def?can_finish(self)?->?bool:
????????"""
????????Returns:
????????????True:?表示沒有環(huán),任務(wù)可以完成
????????????False:?表示有環(huán),任務(wù)不可以完成
????????"""
????????q?=?collections.deque([u?for?u?in?self.indegree?if?self.indegree[u]?==?0])
????????visited?=?0
????????possible_sequence?=?[]
????????while?q:
????????????visited?+=?1
????????????u?=?q.popleft()
????????????possible_sequence.append(u)
????????????for?v?in?self.edges[u]:
????????????????self.indegree[v]?-=?1
????????????????if?self.indegree[v]?==?0:
????????????????????q.append(v)
????????print(f'possible?sequence:?{"->".join(possible_sequence)}')
????????return?visited?==?len(self.vertex)
if?__name__?==?"__main__":
????"""
????a->b->c
???????b->d
???????True
????a->b->c
???????b->d->a
???????False
????"""
????check_cycle?=?CheckCycle()
????check_cycle.add_edge(from_job="a",?to_job="b")
????check_cycle.add_edge(from_job="b",?to_job="c")
????check_cycle.add_edge(from_job="b",?to_job="d")
????print(check_cycle.can_finish())
????check_cycle?=?CheckCycle()
????check_cycle.add_edge(from_job="a",?to_job="b")
????check_cycle.add_edge(from_job="b",?to_job="c")
????check_cycle.add_edge(from_job="b",?to_job="d")
????check_cycle.add_edge(from_job="d",?to_job="a")
????print(check_cycle.can_finish())
時間復(fù)雜度和空間復(fù)雜度的分析同廣度優(yōu)先遍歷算法,都是 O(m+n), m 是頂點數(shù),n 是邊數(shù),不在贅述。
另一種方法就是深度優(yōu)先遍歷, 深度優(yōu)先遍歷則是一種逆向思維,為了簡單的理解,先考慮沒有環(huán)的情況,如 a->b->c-d :從任意任務(wù)節(jié)點出發(fā),假如是 b,一條路走到黑,遍歷到最后一個節(jié)點 d ,將其入棧,同時將已經(jīng)訪問過的節(jié)點標(biāo)記為已訪問(b,c 已訪問),將已入棧的節(jié)點標(biāo)記為已完成(d 已完成),還沒有訪問過的節(jié)點標(biāo)記為未訪問 (a 未訪問)。也就是說任何一個節(jié)點,只會有以下三種狀態(tài):
「未訪問」:我們還沒有訪問到這個節(jié)點,使用 0 來表示。
「已訪問」:我們訪問過這個節(jié)點,但還沒有回溯到該節(jié)點,即該節(jié)點還沒有入棧,還有相鄰的節(jié)點沒有完成,使用 1 來表示。
「已完成」:我們訪問過且回溯過這個節(jié)點,即該節(jié)點已經(jīng)入棧,并且所有該節(jié)點的后續(xù)節(jié)點都出現(xiàn)在棧的更底部的位置,滿足拓?fù)渑判虻囊螅褂?2 來表示。
現(xiàn)在回溯到 c,發(fā)現(xiàn) c 已訪問,且 c 的后續(xù)節(jié)點 d 已經(jīng)完成,因此將 c 入棧,標(biāo)記為已完成,依次類推,現(xiàn)在,棧底到棧頂依次為 d,c,b。然后從剩余節(jié)點 a 出發(fā),執(zhí)行同樣的邏輯,a 也入棧,標(biāo)記為完成,最終從棧底到棧頂為 d,c,b,a,將這些節(jié)點依次出棧,即為拓?fù)渑判颉?/p>
現(xiàn)在考慮有環(huán)的情況 a->b->c->d->b,訪問到 d 時,繼續(xù)訪問 b 發(fā)現(xiàn) b 已經(jīng)被訪問,說明有環(huán),退出即可。根據(jù)上面的分析,不難寫出以下深度遍歷的代碼:
def?can_finish2(self)?->?bool:
????"""
????深度優(yōu)先遍歷
????Returns:
????????True:?表示沒有環(huán),任務(wù)可以完成
????????False:?表示有環(huán),任務(wù)不可以完成
????"""
????visited?=?collections.defaultdict(int)??#?保存每個頂點是否被訪問過
????for?job?in?self.vertex:
????????visited[job]?=?0??#?初始化,均未被訪問過
????result?=?list()??#?模擬棧
????valid?=?True
????def?dfs(from_job:?str):
????????nonlocal?valid
????????visited[from_job]?=?1
????????for?to_job?in?self.edges[from_job]:
????????????if?visited[to_job]?==?0:
????????????????dfs(to_job)
????????????????if?not?valid:
????????????????????return
????????????elif?visited[to_job]?==?1:
????????????????valid?=?False
????????????????return
????????visited[from_job]?=?2
????????result.append(from_job)
????for?job?in?self.vertex:
????????if?valid?and?not?visited[job]:
????????????dfs(job)
????#?print(result)
????return?valid
時間復(fù)雜度即為深度優(yōu)先遍歷或廣度優(yōu)先遍歷的時間復(fù)雜度,都為 O(m+n) ,其中 m 是頂點數(shù),n 是邊數(shù),對應(yīng)著任務(wù)數(shù)和任務(wù)的依賴數(shù)。
其實即使寫不出深度優(yōu)先或廣度優(yōu)先的代碼關(guān)系也不大,只有會靈活使用就行,網(wǎng)上都是現(xiàn)成的代碼,最重要的是要理解這些代碼,為我所用。
想使用代碼時不必辛苦的復(fù)制,回復(fù)「拓?fù)渑判颉公@取可執(zhí)行代碼。
感謝你的點贊支持。
