圖解Janusgraph系列-分布式id生成策略分析
Python實戰(zhàn)社群
Java實戰(zhàn)社群
長按識別下方二維碼,按需求添加
掃碼關(guān)注添加客服
進Python社群▲
掃碼關(guān)注添加客服
進Java社群▲
作者丨匠心Java
來源丨匠心Java
JanusGraph圖解系列文章~
文章為作者跟蹤源碼和查看官方文檔整理,如有任何問題,請聯(lián)系我或在評論區(qū)指出,感激不盡!
圖數(shù)據(jù)庫網(wǎng)上資源太少,評論區(qū)評論 or 私信我,邀你加入“圖庫交流微信群”,一起交流學習!
源碼分析相關(guān)可查看github(求star~~):https://github.com/YYDreamer/janusgraph
下述流程高清大圖地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629
版本:JanusGraph-0.5.2 、
正文
在介紹JanusGraph的分布式ID生成策略之前,我們來簡單分析一下分布式ID應(yīng)該滿足哪些特征?
全局唯一:必須保證ID是分布式環(huán)境中全局性唯一的,這是基本要求
高性能:高可用低延時,ID生成響應(yīng)快;否則可能會成為業(yè)務(wù)瓶頸 高可用:提供分布式id的生成的服務(wù)要保證高可用,不能隨隨便便就掛掉了,會對業(yè)務(wù)產(chǎn)生影響 趨勢遞增:主要看業(yè)務(wù)場景,類似于圖存儲中節(jié)點的唯一id就盡量保持趨勢遞增;但是如果類似于電商訂單就盡量不要趨勢遞增,因為趨勢遞增會被惡意估算出當天的訂單量和成交量,泄漏公司信息 接入方便:要秉著拿來即用的設(shè)計原則,在系統(tǒng)設(shè)計和實現(xiàn)上要盡可能的簡單
一:常用分布式id生成策略
當前常用的分布式id的生成策略主要分為以下四種:
UUID 數(shù)據(jù)庫+號段模式(優(yōu)化:數(shù)據(jù)庫+號段+雙buffer) 基于Redis實現(xiàn) 雪花算法(SnowFlake)
還有一些其他的比如:基于數(shù)據(jù)庫自增id、數(shù)據(jù)庫多主模式等,這些在小并發(fā)的情況下可以使用,大并發(fā)的情況下就不太ok了
市面上有一些生成分布式id的開源組件,包括滴滴基于數(shù)據(jù)庫+號段實現(xiàn)的TinyID 、百度基于SnowFlake的Uidgenerator、美團支持號段和SnowFlake的Leaf等
那么,在JanusGraph中分布式id的生成是采用的什么方式呢?
二:JanusGraph的分布式id策略
在JanusGraph中,分布式id的生成采用的是數(shù)據(jù)庫+號段+雙buffer優(yōu)化的模式;下面我們來具體分析一下:
分布式id生成使用的數(shù)據(jù)庫就是JanusGraph當前使用的第三方存儲后端,這里我們以使用的存儲后端Hbase為例;
JanusGraph分布式id生成所需元數(shù)據(jù)存儲位置:
在Hbase中有column family 列族的概念;JanusGraph在初始化Hbase表時默認創(chuàng)建了9大列族,用于存儲不同的數(shù)據(jù), 具體看《圖解圖庫JanusGraph系列-一文知曉圖數(shù)據(jù)底層存儲結(jié)構(gòu)》;
其中有一個列族janusgraph_ids簡寫為i這個列族,主要存儲的就是JanusGraph分布式id生成所需要的元數(shù)據(jù)!
JanusGraph的分布式id的組成結(jié)構(gòu):
??//?源碼中有一句話體現(xiàn)
?????/*??---?JanusGraphElement?id?bit?format?---
??????*??[?0?|?count?|?partition?|?ID?padding?(if?any)?]
?????*/
主要分為4部分:0、count、partition、ID padding(每個類型是固定值);
其實這4部分的順序在序列化為二進制數(shù)據(jù)時,順序會有所改變;這里只是標明了id的組成部分!
上述部分的partition + count來保證分布式節(jié)點的唯一性;
partition id:分區(qū)id值,JanusGraph默認分了32個邏輯分區(qū);節(jié)點分到哪個分區(qū)采用的是 隨機分配;count:每個partition都有對應(yīng)的一個count范圍:0-2的55次冪;JanusGraph每次拉取一部分的范圍作為節(jié)點的count取值;JanusGraph保證了針對相同的partition,不會重復(fù)獲取同一個count值!
保證count在partition維度保持全局唯一性,就保證了生成的最終id的全局唯一性!!
則分布式id的唯一性保證,就在于count基于partition維度的唯一性!下面我們的分析也是著重在count的獲??!
JanusGraph分布式id生成的主要邏輯流程如下圖所示:(推薦結(jié)合源碼分析觀看?。?/strong>
分析過程中有一個概念為
id block:指當前獲取的號段范圍

JanusGraph主要使用``PartitionIDPool 類來存儲不同類型的StandardIDPool;在StandardIDPool`中主要包含兩個id Block:
current block:當前生成id使用的block next block:double buffer中的另一個已經(jīng)準備好的block
為什么要有兩個block呢?
主要是如果只有一個block的話,當我們在使用完當前的block時,需要阻塞等待區(qū)獲取下一個block,這樣便會導(dǎo)致分布式id生成較長時間的阻塞等待block的獲??;
怎么優(yōu)化上述問題呢?double buffer;
除了當前使用的block,我們再存儲一個next block;當正在使用的block假設(shè)已經(jīng)使用了50%,觸發(fā)next block的異步獲取,如上圖的藍色部分所示;
這樣當current block使用完成后可以直接無延遲的切換到next block如上圖中綠色部分所示;
在執(zhí)行過程中可能會因為一些異常導(dǎo)致節(jié)點id獲取失敗,則會進行重試;重試次數(shù)默認為1000次;
private?static?final?int?MAX_PARTITION_RENEW_ATTEMPTS?=?1000;
for?(int?attempt?=?0;?attempt????//?獲取id的過程
}
ps:上述所說的IDPool和block是基于當前
圖實例維度共用的!
三:源碼分析
在JanusGraph的源碼中,主要包含兩大部分和其他的一些組件:
Graph相關(guān)類:用于對節(jié)點、屬性、邊的操作 Transaction相關(guān)類:用于在對數(shù)據(jù)或者Schema進行CURD時,進行事務(wù)處理 其他一些:分布式節(jié)點id生成類;序列化類;第三方索引操作類等等
Graph和Transaction相關(guān)類的類圖如下所示:

分布式id涉及到id生成的類圖如下所示:

初始數(shù)據(jù):
????@Test
????public?void?addVertexTest(){
????????List在諸神之圖中添加一個name為lyy節(jié)點;看下執(zhí)行流程,注意,此處主要分析的節(jié)點的分布式id生成代碼!
1、調(diào)用JanusGraphBlueprintsGraph類的AddVertex方法
????@Override
????public?JanusGraphVertex?addVertex(Object...?keyValues)?{
????????//?添加節(jié)點
????????return?getAutoStartTx().addVertex(keyValues);
????}
2、調(diào)用JanusGraphBlueprintsTransaction的addVertex方法
???public?JanusGraphVertex?addVertex(Object...?keyValues)?{
????????//?。。。省略了其他的處理
????????//?該處生成節(jié)點對象,包含節(jié)點的唯一id生成邏輯
????????final?JanusGraphVertex?vertex?=?addVertex(id,?label);?
????????//?。。。省略了其他的處理
????????return?vertex;
????}
3、調(diào)用StandardJanusGraphTx的addVertex方法
????@Override
????public?JanusGraphVertex?addVertex(Long?vertexId,?VertexLabel?label)?{
????????//?。。。省略了其他的處理
????????if?(vertexId?!=?null)?{
????????????vertex.setId(vertexId);
????????}?else?if?(config.hasAssignIDsImmediately()?||?label.isPartitioned())?{
????????????graph.assignID(vertex,label);??//?為節(jié)點分配正式的節(jié)點id!
????????}
?????????//?。。。省略了其他的處理
????????return?vertex;
????}
4、調(diào)用VertexIDAssigner的assignID(InternalElement element, IDManager.VertexIDType vertexIDType)方法
????private?void?assignID(InternalElement?element,?IDManager.VertexIDType?vertexIDType)?{
????????//?開始獲取節(jié)點分布式唯一id
????????//?因為一些異常導(dǎo)致獲取節(jié)點id失敗,進行重試,重試此為默認為1000次
????????for?(int?attempt?=?0;?attempt?????????????//?初始化一個partiiton?id
????????????long?partitionID?=?-1;
????????????//?獲取一個partition?id
????????????//?不同類型的數(shù)據(jù),partition?id的獲取方式也有所不同
????????????if?(element?instanceof?JanusGraphSchemaVertex)?{
????????????????//?為partition?id賦值
????????????}
????????????try?{
????????????????//?正式分配節(jié)點id,?依據(jù)partition?id?和?節(jié)點類型
????????????????assignID(element,?partitionID,?vertexIDType);
????????????}?catch?(IDPoolExhaustedException?e)?{
????????????????continue;?//try?again?on?a?different?partition
????????????}
????????????assert?element.hasId();
????????????//?。。。省略了其他代碼
????????}
????}
5、調(diào)用了VertexIDAssigner的assignID(final InternalElement element, final long partitionIDl, final IDManager.VertexIDType userVertexIDType)方法
????private?void?assignID(final?InternalElement?element,?final?long?partitionIDl,?final?IDManager.VertexIDType?userVertexIDType)?{
??????
????????final?int?partitionID?=?(int)?partitionIDl;
????????//?count為分布式id組成中的一部分,占55個字節(jié)
????????//?分布式id的唯一性保證,就在于`count`基于`partition`維度的唯一性
????????long?count;
????????if?(element?instanceof?JanusGraphSchemaVertex)?{?//?schema節(jié)點處理
????????????Preconditions.checkArgument(partitionID==IDManager.SCHEMA_PARTITION);
????????????count?=?schemaIdPool.nextID();
????????}?else?if?(userVertexIDType==IDManager.VertexIDType.PartitionedVertex)?{?//?配置的熱點節(jié)點,類似于`makeVertexLabel('product').partition()`的處理
????????????count?=?partitionVertexIdPool.nextID();
????????}?else?{?//?普通節(jié)點和邊類型的處理
????????????//?首先獲取當前partition敵營的idPool
????????????PartitionIDPool?partitionPool?=?idPools.get(partitionID);
????????????//?如果當前分區(qū)對應(yīng)的IDPool為空,則創(chuàng)建一個默認的IDPool,默認size?=?0
????????????if?(partitionPool?==?null)?{
????????????????//?在PartitionIDPool中包含多種類型對應(yīng)的StandardIDPool類型
????????????????//?StandardIDPool中包含對應(yīng)的block信息和count信息
????????????????partitionPool?=?new?PartitionIDPool(partitionID,?idAuthority,?idManager,?renewTimeoutMS,?renewBufferPercentage);
????????????????//?緩存下來
????????????????idPools.putIfAbsent(partitionID,partitionPool);
????????????????//?從緩存中再重新拿出
????????????????partitionPool?=?idPools.get(partitionID);
????????????}
????????????//?確保partitionPool不為空
????????????Preconditions.checkNotNull(partitionPool);
????????????//?判斷當前分區(qū)的IDPool是否枯竭;已經(jīng)被用完
????????????if?(partitionPool.isExhausted())?{
????????????????//?如果被用完,則將該分區(qū)id放到對應(yīng)的緩存中,避免之后獲取分區(qū)id再獲取到該分區(qū)id
????????????????placementStrategy.exhaustedPartition(partitionID);
????????????????//?拋出IDPool異常,?最外層捕獲,然后進行重試獲取節(jié)點id
????????????????throw?new?IDPoolExhaustedException("Exhausted?id?pool?for?partition:?"?+?partitionID);
????????????}
????????????//?存儲當前類型對應(yīng)的IDPool,因為partitionPool中保存好幾個類型的IDPool
????????????IDPool?idPool;
????????????if?(element?instanceof?JanusGraphRelation)?{
????????????????idPool?=?partitionPool.getPool(PoolType.RELATION);
????????????}?else?{
????????????????Preconditions.checkArgument(userVertexIDType!=null);
????????????????idPool?=?partitionPool.getPool(PoolType.getPoolTypeFor(userVertexIDType));
????????????}
????????????try?{
????????????????//?重要?。。?!?依據(jù)給定的IDPool獲取count值?。。?!
????????????????//?在此語句中設(shè)計 block的初始化?和 double buffer block的處理!
????????????????count?=?idPool.nextID();
????????????????partitionPool.accessed();
????????????}?catch?(IDPoolExhaustedException?e)?{?//?如果該IDPool被用完,拋出IDPool異常,?最外層捕獲,然后進行重試獲取節(jié)點id
????????????????log.debug("Pool?exhausted?for?partition?id?{}",?partitionID);
????????????????placementStrategy.exhaustedPartition(partitionID);
????????????????partitionPool.exhaustedIdPool();
????????????????throw?e;
????????????}
????????}
????????//?組裝最終的分布式id:[count + partition id + ID padding]
????????long?elementId;
????????if?(element?instanceof?InternalRelation)?{
????????????elementId?=?idManager.getRelationID(count,?partitionID);
????????}?else?if?(element?instanceof?PropertyKey)?{
????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.UserPropertyKey,count);
????????}?else?if?(element?instanceof?EdgeLabel)?{
????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.UserEdgeLabel,?count);
????????}?else?if?(element?instanceof?VertexLabel)?{
????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.VertexLabel,?count);
????????}?else?if?(element?instanceof?JanusGraphSchemaVertex)?{
????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.GenericSchemaType,count);
????????}?else?{
????????????elementId?=?idManager.getVertexID(count,?partitionID,?userVertexIDType);
????????}
????????Preconditions.checkArgument(elementId?>=?0);
????????//?對節(jié)點對象賦值其分布式唯一id
????????element.setId(elementId);
????}
上述代碼,我們拿到了對應(yīng)的IdPool,有兩種情況:
第一次獲取分布式id時,分區(qū)對應(yīng)的IDPool初始化為默認的size = 0的IDPool 分區(qū)對應(yīng)的IDPool不是初次獲取
這兩種情況的處理,都在代碼count = idPool.nextID()的StandardIDPool類中的nextID()方法中被處理!
在分析該代碼之前,我們需要知道 PartitionIDPool和StandardIDPool的關(guān)系:
每個partition都有一個對應(yīng)的PartitionIDPool extends EnumMap 是一個枚舉map類型;
每一個PartitionIDPool都有對應(yīng)的不同類型的StandardIDPool:
NORMAL_VERTEX:用于vertex id的分配 UNMODIFIABLE_VERTEX:用于schema label id的分配 RELATION:用于edge id的分配
在StandardIDPool中包含多個字段,分別代表不同的含義,抽取幾個重要的字段進行介紹:
????private?static?final?int?RENEW_ID_COUNT?=?100;?
????private?final?long?idUpperBound;?//?Block的最大值,默認為2的55次冪
????private?final?int?partition;?//?當前pool對應(yīng)的分區(qū)
????private?final?int?idNamespace;?//?標識pool為那種類型的pool,上述的三種類型NORMAL_VERTEX、UNMODIFIABLE_VERTEX、RELATION;值為當前枚舉值在枚舉中的位置
????private?final?Duration?renewTimeout;//?重新獲取block的超時時間
????private?final?double?renewBufferPercentage;//?雙buffer中,當?shù)谝粋€buffer?block使用的百分比,到達配置的百分比則觸發(fā)other?buffer?block的獲取
????private?IDBlock?currentBlock;?//?當前的block
????private?long?currentIndex;?//?標識當前block使用到那一個位置
????private?long?renewBlockIndex;?//?依據(jù)currentBlock.numIds()*renewBufferPercentage來獲取這個值,主要用于在當前的block在消費到某個index的時候觸發(fā)獲取下一個buffer?block
????private?volatile?IDBlock?nextBlock;//?雙buffer中的另外一個block
????private?final?ThreadPoolExecutor?exec;//?異步獲取雙buffer的線程池
6、調(diào)用了StandardIDPool類中的nextID方法
經(jīng)過上述分析,我們知道,分布式唯一id的唯一性是由在partition維度下的count的值的唯一性來保證的;
上述代碼通過調(diào)用IDPool的nextId來獲取count值;
下述代碼就是獲取count的邏輯;
????@Override
????public?synchronized?long?nextID()?{
????????//?currentIndex標識當前的index小于current?block的最大值
????????assert?currentIndex?<=?currentBlock.numIds();
????????//?此處涉及兩種情況:
????????// 1、分區(qū)對應(yīng)的IDPool是第一次被初始化;則currentIndex =?0;currentBlock.numIds()?=?0;
????????//?2、分區(qū)對應(yīng)的該IDPool不是第一次,但是此次的index正好使用到了current?block的最后一個count
????????if?(currentIndex?==?currentBlock.numIds())?{
????????????try?{
????????????????//?將current?block賦值為next?block
????????????????//?next?block置空?并計算renewBlockIndex
????????????????nextBlock();
????????????}?catch?(InterruptedException?e)?{
????????????????throw?new?JanusGraphException("Could?not?renew?id?block?due?to?interruption",?e);
????????????}
????????}
????????
????????//?在使用current block的過程中,當current index ?==? renewBlockIndex時,觸發(fā)double buffer next block的異步獲取?。。?!
????????if?(currentIndex?==?renewBlockIndex)?{
????????????//?異步獲取next?block
????????????startIDBlockGetter();
????????}
????????
????????//?生成最終的count
????????long?returnId?=?currentBlock.getId(currentIndex);
????????//?current?index?+?1
????????currentIndex++;
????????if?(returnId?>=?idUpperBound)?throw?new?IDPoolExhaustedException("Reached?id?upper?bound?of?"?+?idUpperBound);
????????log.trace("partition({})-namespace({})?Returned?id:?{}",?partition,?idNamespace,?returnId);
????????//?返回最終獲取的分區(qū)維度的全局唯一count
????????return?returnId;
????}
上述代碼中進行了兩次判斷:
currentIndex == currentBlock.numIds(): 第一次生成分布式id:此處判斷即為 0==0;然后生成新的block 非第一次生成分布式id:等于情況下標識當前的block已經(jīng)使用完了,需要切換為next block currentIndex == renewBlockIndex renew index:標識index使用多少后開始獲取下一個double buffer 的next block;有一個默認值100,主要為了兼容第一次分布式id的生成;相等則會觸發(fā)異步獲取下一個next block
下面我們分別對nextBlock();邏輯和startIDBlockGetter();進行分析;
7、調(diào)用了StandardIDPool類中的nextBlock方法
????private?synchronized?void?nextBlock()?throws?InterruptedException?{
????????//?在分區(qū)對應(yīng)的IDPool第一次使用時,double?buffer的nextBlock為空
????????if?(null?==?nextBlock?&&?null?==?idBlockFuture)?{
????????????//?異步啟動?獲取id?block
????????????startIDBlockGetter();
????????}
????????//?也是在分區(qū)對應(yīng)的IDPool第一次使用時,因為上述為異步獲取,所以在執(zhí)行到這一步時nextBlock可能還沒拿到
????????//?所以需要阻塞等待block的獲取
????????if?(null?==?nextBlock)?{
????????????waitForIDBlockGetter();
????????}
????????//?將當前使用block指向next?block
????????currentBlock?=?nextBlock;
????????//?index清零
????????currentIndex?=?0;
????????//?nextBlock置空
????????nextBlock?=?null;
????????//?renewBlockIndex用于雙buffer中,當?shù)谝粋€buffer?block使用的百分比,到達配置的百分比則觸發(fā)other?buffer?block的獲取
????????//?值current?block?對應(yīng)的count數(shù)量?-?(值current?block?對應(yīng)的count數(shù)量?*?為renewBufferPercentage配置的剩余空間百分比)
????????//?在使用current block的時候,當current index ?==? renewBlockIndex時,觸發(fā)double buffer next block的異步獲?。。。?!
????????renewBlockIndex?=?Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT,?Math.round(currentBlock.numIds()*renewBufferPercentage)));
????}
主要是做了三件事:
1、block是否為空,為空的話則異步獲取一個block 2、nextBlock不為空的情況下:next賦值到current、next置空、index置零 3、計算獲取下一個nextBlock的觸發(fā)index renewBlockIndex值
8、調(diào)用了StandardIDPool類中的startIDBlockGetter方法
????private?synchronized?void?startIDBlockGetter()?{
????????Preconditions.checkArgument(idBlockFuture?==?null,?idBlockFuture);
????????if?(closed)?return;?//Don't?renew?anymore?if?closed
????????//Renew?buffer
????????log.debug("Starting?id?block?renewal?thread?upon?{}",?currentIndex);
????????//?創(chuàng)建一個線程對象,包含給定的權(quán)限控制類、分區(qū)、命名空間、超時時間
????????idBlockGetter?=?new?IDBlockGetter(idAuthority,?partition,?idNamespace,?renewTimeout);
????????//?提交獲取double?buffer的線程任務(wù),異步執(zhí)行
????????idBlockFuture?=?exec.submit(idBlockGetter);
????}
其中創(chuàng)建一個線程任務(wù),提交到線程池exec進行異步執(zhí)行;
下面看下,線程類的call方法主要是調(diào)用了idAuthority.getIDBlock方法,這個方法主要是基于Hbase來獲取還未使用的block;
????/**
?????*?獲取double?buffer?block的線程類
?????*/
????private?static?class?IDBlockGetter?implements?Callable<IDBlock>?{
????????//?省略部分代碼
????????@Override
????????public?IDBlock?call()?{
????????????Stopwatch?running?=?Stopwatch.createStarted();
????????????try?{
????????????????//?此處調(diào)用idAuthority?調(diào)用HBase進行占用獲取Block
????????????????IDBlock?idBlock?=?idAuthority.getIDBlock(partition,?idNamespace,?renewTimeout);
????????????????return?idBlock;
????????????}?catch?(BackendException?e)?{}
????????}
????}
9、調(diào)用ConsistentKeyIDAuthority類的getIDBlock方法
????@Override
????public?synchronized?IDBlock?getIDBlock(final?int?partition,?final?int?idNamespace,?Duration?timeout)?throws?BackendException?{
??????
????????//?開始時間
????????final?Timer?methodTime?=?times.getTimer().start();
????????//?獲取當前命名空間配置的blockSize,默認值10000;可自定義配置
????????final?long?blockSize?=?getBlockSize(idNamespace);
????????//?獲取當前命名空間配置的最大id值idUpperBound;值為:2的55次冪大小
????????final?long?idUpperBound?=?getIdUpperBound(idNamespace);
????????// uniqueIdBitWidth標識uniqueId占用的位數(shù);uniqueId為了兼容“關(guān)閉分布式id唯一性保障”的開關(guān)情況,uniqueIdBitWidth默認值=4
????????//?值:64-1(默認0)-5(分區(qū)占用位數(shù))-3(ID Padding占用位數(shù))-4(uniqueIdBitWidth)?= 51;標識block中的上限為2的51次冪大小
????????final?int?maxAvailableBits?=?(VariableLong.unsignedBitLength(idUpperBound)-1)-uniqueIdBitWidth;
????????//?標識block中的上限為2的51次冪大小
????????final?long?idBlockUpperBound?=?(1L?<
????????// UniquePID用盡的UniquePID集合,默認情況下,randomUniqueIDLimit =?0;
????????final?List?exhaustedUniquePIDs?=?new?ArrayList<>(randomUniqueIDLimit);
????????//?默認0.3秒??用于處理TemporaryBackendException異常情況(后端存儲出現(xiàn)問題)下:阻塞一斷時間,然后進行重試
????????Duration?backoffMS?=?idApplicationWaitMS;
????????//?從開始獲取IDBlock開始,持續(xù)超時時間(默認2分鐘)內(nèi)重試獲取IDBlock
????????while?(methodTime.elapsed().compareTo(timeout)?0)?{
????????????final?int?uniquePID?=?getUniquePartitionID();?//?獲取uniquePID,默認情況下“開啟分布式id唯一性控制”,值?=?0;?當“關(guān)閉分布式id唯一性控制”時為一個隨機值
????????????final?StaticBuffer?partitionKey?=?getPartitionKey(partition,idNamespace,uniquePID);?//?依據(jù)partition?+?idNamespace?+?uniquePID組裝一個RowKey
????????????try?{
????????????????long?nextStart?=?getCurrentID(partitionKey);?//?從Hbase中獲取當前partition對應(yīng)的IDPool中被分配的最大值,用來作為當前申請新的block的開始值
????????????????if?(idBlockUpperBound?-?blockSize?<=?nextStart)?{?//?確保還未被分配的id池中的id個數(shù),大于等于blockSize
????????????????????//?相應(yīng)處理
????????????????}
????????????????long?nextEnd?=?nextStart?+?blockSize;?//?獲取當前想要獲取block的最大值
????????????????StaticBuffer?target?=?null;
????????????????//?attempt?to?write?our?claim?on?the?next?id?block
????????????????boolean?success?=?false;
????????????????try?{
????????????????????Timer?writeTimer?=?times.getTimer().start();?//?===開始:開始進行插入自身的block需求到Hbase
????????????????????target?=?getBlockApplication(nextEnd,?writeTimer.getStartTime());?//?組裝對應(yīng)的Column:?-nextEnd?+??當前時間戳?+?uid(唯一標識當前圖實例)
????????????????????final?StaticBuffer?finalTarget?=?target;?//?copy?for?the?inner?class
????????????????????BackendOperation.execute(txh?->?{?//?異步插入當前生成的RowKey?和?Column
????????????????????????idStore.mutate(partitionKey,?Collections.singletonList(StaticArrayEntry.of(finalTarget)),?KeyColumnValueStore.NO_DELETIONS,?txh);
????????????????????????return?true;
????????????????????},this,times);
????????????????????writeTimer.stop();?//?===結(jié)束:插入完成
????????????????????final?boolean?distributed?=?manager.getFeatures().isDistributed();
????????????????????Duration?writeElapsed?=?writeTimer.elapsed();?//?===獲取方才插入的時間耗時
????????????????????if?(idApplicationWaitMS.compareTo(writeElapsed)?0?&&?distributed)?{?//?判斷是否超過配置的超時時間,超過則報錯TemporaryBackendException,然后等待一斷時間進行重試
????????????????????????throw?new?TemporaryBackendException("Wrote?claim?for?id?block?["?+?nextStart?+?",?"?+?nextEnd?+?")?in?"?+?(writeElapsed)?+?"?=>?too?slow,?threshold?is:?"?+?idApplicationWaitMS);
????????????????????}?else?{
????????????????????????assert?0?!=?target.length();
????????????????????????final?StaticBuffer[]?slice?=?getBlockSlice(nextEnd);?//?組裝下述基于上述Rowkey的Column的查找范圍:(-nextEnd +?0?:?0nextEnd +?最大值)??????
????????????????????????final?List?blocks?=?BackendOperation.execute(?//?異步獲取指定Rowkey和指定Column區(qū)間的值
????????????????????????????(BackendOperation.Transactional>)?txh?->?idStore.getSlice(new?KeySliceQuery(partitionKey,?slice[0],?slice[1]),?txh),this,times);
????????????????????????if?(blocks?==?null)?throw?new?TemporaryBackendException("Could?not?read?from?storage");
????????????????????????if?(blocks.isEmpty())
????????????????????????????throw?new?PermanentBackendException("It?seems?there?is?a?race-condition?in?the?block?application.?"?+
????????????????????????????????????"If?you?have?multiple?JanusGraph?instances?running?on?one?physical?machine,?ensure?that?they?have?unique?machine?idAuthorities");
????????????????????????if?(target.equals(blocks.get(0).getColumnAs(StaticBuffer.STATIC_FACTORY)))?{?//?如果獲取的集合中,當前的圖實例插入的數(shù)據(jù)是第一條,則表示獲取block;?如果不是第一條,則獲取Block失敗
????????????????????????????//?組裝IDBlock對象
????????????????????????????ConsistentKeyIDBlock?idBlock?=?new?ConsistentKeyIDBlock(nextStart,blockSize,uniqueIdBitWidth,uniquePID);
????????????????????????????if?(log.isDebugEnabled())?{
????????????????????????????????????idBlock,?partition,?idNamespace,?uid);
????????????????????????????}
????????????????????????????success?=?true;
????????????????????????????return?idBlock;?//?返回
????????????????????????}?else?{?}
????????????????????}
????????????????}?finally?{
????????????????????if?(!success?&&?null?!=?target)?{?//?在獲取Block失敗后,刪除當前的插入;?如果沒有失敗,則保留當前的插入,在hbase中標識該Block已經(jīng)被占用
????????????????????????//Delete?claim?to?not?pollute?id?space
????????????????????????for?(int?attempt?=?0;?attempt?//?回滾:刪除當前插入,嘗試次數(shù)5次
????????????????????????}
????????????????????}
????????????????}
????????????}?catch?(UniqueIDExhaustedException?e)?{
????????????????//?No?need?to?increment?the?backoff?wait?time?or?to?sleep
????????????????log.warn(e.getMessage());
????????????}?catch?(TemporaryBackendException?e)?{
????????????????backoffMS?=?Durations.min(backoffMS.multipliedBy(2),?idApplicationWaitMS.multipliedBy(32));
????????????????sleepAndConvertInterrupts(backoffMS);?\
????????????}
????????}
????????throw?new?TemporaryLockingException();
????}
主要的邏輯就是:
組裝Rowkey:partition + idNameSpace+unquePId組裝Column:-nextEnd+now time+uid將 RowKey+Column插入Hbase獲取的上述組裝的RowKey 基于(-nextEnd + 0 : -nextEnd + max)范圍的所有Column集合 判斷集合的第一個Column是不是當前插入的Column,是的話則占用block成功,不是的話則占用失敗,刪除剛才占用并進行重試
最終:異步獲取到了唯一占用的Block,然后生成對應(yīng)的唯一count,組裝最后的唯一id
整體的調(diào)用流程如下:

四:其他類型的id生成
上述我們主要依據(jù)生成節(jié)點id(vertex id)的過程來進行分析
在JanusGraph中還包含edge id、property id、schema label id等幾種的分布式id生成
所有類型的分布式id的生成主要思想和邏輯都幾乎相同,只是一些具體的邏輯可能有所不同,我們理解了vertex id的分布式id生成流程,其他的也可以理解了。
1、property id的生成
在JanusGraph中的property的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;
property id的 生成和 vertex id有兩點不同:
ID的組成部分:?在 vertex id中組成部分包含count+partition+ID Padding;而在property id中沒有ID Padding部分,其組成為count + partition
????????long?id?=?(count<????????if?(type!=null)?id?=?type.addPadding(id);?//?此時,type?=?null
????????return?id;
partition id的獲取方式:在生成 vertex id時,partition id是隨機獲取的;而在生成property id時,partition id是獲取的當前節(jié)點對應(yīng)的partition id,如果節(jié)點獲取不到分區(qū)id,則隨機生成一個;
????????????if?(element?instanceof?InternalRelation)?{?//?屬性?+?邊
????????????????InternalRelation?relation?=?(InternalRelation)element;
????????????????if?(attempt?????????????????????InternalVertex?incident?=?relation.getVertex(attempt);
????????????????????Preconditions.checkArgument(incident.hasId());
????????????????????if?(!IDManager.VertexIDType.PartitionedVertex.is(incident.longId())?||?relation.isProperty())?{?//?獲取對應(yīng)節(jié)點已有的partition?id
????????????????????????partitionID?=?getPartitionID(incident);
????????????????????}?else?{
????????????????????????continue;
????????????????????}
????????????????}?else?{?//?如果對應(yīng)的節(jié)點都沒有,則隨機獲取一個partition?id
????????????????????partitionID?=?placementStrategy.getPartition(element);
????????????????}
2、Edge id的生成
在JanusGraph中的edge的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;
edge id的 生成和 vertex id有兩點不同:
ID的組成部分:?在 vertex id中組成部分包含count+partition+ID Padding;而在edge id中沒有ID Padding部分,其組成為count + partition,代碼同property id的生成代碼partition id的獲取方式:在生成 vertex id時,partition id是隨機獲取的;而在生成edge id時,partition id是獲取的當前source vertex或者target vertex對應(yīng)的partition id,如果節(jié)點獲取不到分區(qū)id,則隨機生成一個,代碼同property id的生成代碼;
3、Schema相關(guān)id的生成
在JanusGraph中的schema相關(guān)id的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;
schema相關(guān)id的生成分為四種:PropertyKey、EdgeLabel、VertexLabel、JanusGraphSchemaVertex
ID的組成部分:?在 vertex id中組成部分包含count+partition+ID Padding;在schema對應(yīng)的id生成,這四種產(chǎn)生的id對應(yīng)的結(jié)構(gòu)都是一樣的:count + 對應(yīng)類型的固定后綴
return?(count?<partition id的獲取方式:在生成 vertex id時,partition id是隨機獲取的;而在生成schema id時,partition id是默認的partition id = 0;
public?static?final?int?SCHEMA_PARTITION?=?0;
if?(element?instanceof?JanusGraphSchemaVertex)?{
????????????????partitionID?=?IDManager.SCHEMA_PARTITION;?//?默認分區(qū)
}

近期精彩內(nèi)容推薦:??


