<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          圖解Janusgraph系列-分布式id生成策略分析

          共 2516字,需瀏覽 6分鐘

           ·

          2020-12-26 07:45

          Python實戰(zhàn)社群

          Java實戰(zhàn)社群

          長按識別下方二維碼,按需求添加

          掃碼關(guān)注添加客服

          進Python社群▲

          掃碼關(guān)注添加客服

          進Java社群


          作者丨匠心Java
          來源丨匠心Java


          JanusGraph圖解系列文章~

          文章為作者跟蹤源碼和查看官方文檔整理,如有任何問題,請聯(lián)系我或在評論區(qū)指出,感激不盡!

          圖數(shù)據(jù)庫網(wǎng)上資源太少,評論區(qū)評論 or 私信我,邀你加入“圖庫交流微信群”,一起交流學習!

          源碼分析相關(guān)可查看github(求star~~):https://github.com/YYDreamer/janusgraph

          下述流程高清大圖地址:https://www.processon.com/view/link/5f471b2e7d9c086b9903b629

          版本:JanusGraph-0.5.2 、

          正文

          在介紹JanusGraph的分布式ID生成策略之前,我們來簡單分析一下分布式ID應(yīng)該滿足哪些特征?

          • 全局唯一:必須保證ID是分布式環(huán)境中全局性唯一的,這是基本要求

            • 高性能:高可用低延時,ID生成響應(yīng)快;否則可能會成為業(yè)務(wù)瓶頸
            • 高可用:提供分布式id的生成的服務(wù)要保證高可用,不能隨隨便便就掛掉了,會對業(yè)務(wù)產(chǎn)生影響
            • 趨勢遞增:主要看業(yè)務(wù)場景,類似于圖存儲中節(jié)點的唯一id就盡量保持趨勢遞增;但是如果類似于電商訂單就盡量不要趨勢遞增,因為趨勢遞增會被惡意估算出當天的訂單量和成交量,泄漏公司信息
            • 接入方便:要秉著拿來即用的設(shè)計原則,在系統(tǒng)設(shè)計和實現(xiàn)上要盡可能的簡單

          一:常用分布式id生成策略

          當前常用的分布式id的生成策略主要分為以下四種:

          • UUID
          • 數(shù)據(jù)庫+號段模式(優(yōu)化:數(shù)據(jù)庫+號段+雙buffer)
          • 基于Redis實現(xiàn)
          • 雪花算法(SnowFlake)

          還有一些其他的比如:基于數(shù)據(jù)庫自增id、數(shù)據(jù)庫多主模式等,這些在小并發(fā)的情況下可以使用,大并發(fā)的情況下就不太ok了

          市面上有一些生成分布式id的開源組件,包括滴滴基于數(shù)據(jù)庫+號段實現(xiàn)的TinyID 、百度基于SnowFlakeUidgenerator、美團支持號段SnowFlakeLeaf

          那么,在JanusGraph中分布式id的生成是采用的什么方式呢?

          二:JanusGraph的分布式id策略

          在JanusGraph中,分布式id的生成采用的是數(shù)據(jù)庫+號段+雙buffer優(yōu)化的模式;下面我們來具體分析一下:

          分布式id生成使用的數(shù)據(jù)庫就是JanusGraph當前使用的第三方存儲后端,這里我們以使用的存儲后端Hbase為例;

          JanusGraph分布式id生成所需元數(shù)據(jù)存儲位置:

          在Hbase中有column family 列族的概念;JanusGraph在初始化Hbase表時默認創(chuàng)建了9大列族,用于存儲不同的數(shù)據(jù), 具體看《圖解圖庫JanusGraph系列-一文知曉圖數(shù)據(jù)底層存儲結(jié)構(gòu)》;

          其中有一個列族janusgraph_ids簡寫為i這個列族,主要存儲的就是JanusGraph分布式id生成所需要的元數(shù)據(jù)!

          JanusGraph的分布式id的組成結(jié)構(gòu):

          ??//?源碼中有一句話體現(xiàn)
          ?????/*??---?JanusGraphElement?id?bit?format?---
          ??????*??[?0?|?count?|?partition?|?ID?padding?(if?any)?]
          ?????*/

          主要分為4部分:0、count、partition、ID padding(每個類型是固定值);

          其實這4部分的順序在序列化為二進制數(shù)據(jù)時,順序會有所改變;這里只是標明了id的組成部分!

          上述部分的partition + count來保證分布式節(jié)點的唯一性;

          • partition id:分區(qū)id值,JanusGraph默認分了32個邏輯分區(qū);節(jié)點分到哪個分區(qū)采用的是隨機分配;
          • count:每個partition都有對應(yīng)的一個count范圍:0-2的55次冪;JanusGraph每次拉取一部分的范圍作為節(jié)點的count取值;JanusGraph保證了針對相同的partition,不會重復(fù)獲取同一個count值!

          保證count在partition維度保持全局唯一性,就保證了生成的最終id的全局唯一性!!

          則分布式id的唯一性保證,就在于count基于partition維度的唯一性!下面我們的分析也是著重在count的獲??!

          JanusGraph分布式id生成的主要邏輯流程如下圖所示:(推薦結(jié)合源碼分析觀看?。?/strong>

          分析過程中有一個概念為id block:指當前獲取的號段范圍

          流程圖

          JanusGraph主要使用``PartitionIDPool 類來存儲不同類型的StandardIDPool;在StandardIDPool`中主要包含兩個id Block:

          • current block:當前生成id使用的block
          • next block:double buffer中的另一個已經(jīng)準備好的block

          為什么要有兩個block呢?

          主要是如果只有一個block的話,當我們在使用完當前的block時,需要阻塞等待區(qū)獲取下一個block,這樣便會導(dǎo)致分布式id生成較長時間的阻塞等待block的獲??;

          怎么優(yōu)化上述問題呢?double buffer

          除了當前使用的block,我們再存儲一個next block;當正在使用的block假設(shè)已經(jīng)使用了50%,觸發(fā)next block的異步獲取,如上圖的藍色部分所示;

          這樣當current block使用完成后可以直接無延遲的切換到next block如上圖中綠色部分所示;

          在執(zhí)行過程中可能會因為一些異常導(dǎo)致節(jié)點id獲取失敗,則會進行重試;重試次數(shù)默認為1000次;

          private?static?final?int?MAX_PARTITION_RENEW_ATTEMPTS?=?1000;
          for?(int?attempt?=?0;?attempt????//?獲取id的過程
          }

          ps:上述所說的IDPool和block是基于當前圖實例維度共用的!

          三:源碼分析

          在JanusGraph的源碼中,主要包含兩大部分和其他的一些組件:

          • Graph相關(guān)類:用于對節(jié)點、屬性、邊的操作
          • Transaction相關(guān)類:用于在對數(shù)據(jù)或者Schema進行CURD時,進行事務(wù)處理
          • 其他一些:分布式節(jié)點id生成類;序列化類;第三方索引操作類等等

          Graph和Transaction相關(guān)類的類圖如下所示:

          在這里插入圖片描述

          分布式id涉及到id生成的類圖如下所示:

          在這里插入圖片描述

          初始數(shù)據(jù):

          ????@Test
          ????public?void?addVertexTest(){
          ????????List?godProperties?=?new?ArrayList<>();
          ????????godProperties.add(T.label);
          ????????godProperties.add("god");

          ????????godProperties.add("name");
          ????????godProperties.add("lyy");

          ????????godProperties.add("age");
          ????????godProperties.add(18);

          ????????JanusGraphVertex?godVertex?=?graph.addVertex(godProperties.toArray());

          ????????assertNotNull(godVertex);
          ????}

          諸神之圖中添加一個name為lyy節(jié)點;看下執(zhí)行流程,注意,此處主要分析的節(jié)點的分布式id生成代碼!

          1、調(diào)用JanusGraphBlueprintsGraph類的AddVertex方法

          ????@Override
          ????public?JanusGraphVertex?addVertex(Object...?keyValues)?{
          ????????//?添加節(jié)點
          ????????return?getAutoStartTx().addVertex(keyValues);
          ????}

          2、調(diào)用JanusGraphBlueprintsTransactionaddVertex方法

          ???public?JanusGraphVertex?addVertex(Object...?keyValues)?{
          ????????//?。。。省略了其他的處理
          ????????//?該處生成節(jié)點對象,包含節(jié)點的唯一id生成邏輯
          ????????final?JanusGraphVertex?vertex?=?addVertex(id,?label);?
          ????????//?。。。省略了其他的處理
          ????????return?vertex;
          ????}

          3、調(diào)用StandardJanusGraphTxaddVertex方法

          ????@Override
          ????public?JanusGraphVertex?addVertex(Long?vertexId,?VertexLabel?label)?{
          ????????//?。。。省略了其他的處理
          ????????if?(vertexId?!=?null)?{
          ????????????vertex.setId(vertexId);
          ????????}?else?if?(config.hasAssignIDsImmediately()?||?label.isPartitioned())?{
          ????????????graph.assignID(vertex,label);??//?為節(jié)點分配正式的節(jié)點id!
          ????????}
          ?????????//?。。。省略了其他的處理
          ????????return?vertex;
          ????}

          4、調(diào)用VertexIDAssignerassignID(InternalElement element, IDManager.VertexIDType vertexIDType)方法

          ????private?void?assignID(InternalElement?element,?IDManager.VertexIDType?vertexIDType)?{
          ????????//?開始獲取節(jié)點分布式唯一id
          ????????//?因為一些異常導(dǎo)致獲取節(jié)點id失敗,進行重試,重試此為默認為1000次
          ????????for?(int?attempt?=?0;?attempt?????????????//?初始化一個partiiton?id
          ????????????long?partitionID?=?-1;
          ????????????//?獲取一個partition?id
          ????????????//?不同類型的數(shù)據(jù),partition?id的獲取方式也有所不同
          ????????????if?(element?instanceof?JanusGraphSchemaVertex)?{
          ????????????????//?為partition?id賦值
          ????????????}
          ????????????try?{
          ????????????????//?正式分配節(jié)點id,?依據(jù)partition?id?和?節(jié)點類型
          ????????????????assignID(element,?partitionID,?vertexIDType);
          ????????????}?catch?(IDPoolExhaustedException?e)?{
          ????????????????continue;?//try?again?on?a?different?partition
          ????????????}
          ????????????assert?element.hasId();
          ????????????//?。。。省略了其他代碼
          ????????}
          ????}

          5、調(diào)用了VertexIDAssignerassignID(final InternalElement element, final long partitionIDl, final IDManager.VertexIDType userVertexIDType)方法

          ????private?void?assignID(final?InternalElement?element,?final?long?partitionIDl,?final?IDManager.VertexIDType?userVertexIDType)?{
          ??????
          ????????final?int?partitionID?=?(int)?partitionIDl;

          ????????//?count為分布式id組成中的一部分,占55個字節(jié)
          ????????//?分布式id的唯一性保證,就在于`count`基于`partition`維度的唯一性
          ????????long?count;
          ????????if?(element?instanceof?JanusGraphSchemaVertex)?{?//?schema節(jié)點處理
          ????????????Preconditions.checkArgument(partitionID==IDManager.SCHEMA_PARTITION);
          ????????????count?=?schemaIdPool.nextID();
          ????????}?else?if?(userVertexIDType==IDManager.VertexIDType.PartitionedVertex)?{?//?配置的熱點節(jié)點,類似于`makeVertexLabel('product').partition()`的處理
          ????????????count?=?partitionVertexIdPool.nextID();
          ????????}?else?{?//?普通節(jié)點和邊類型的處理
          ????????????//?首先獲取當前partition敵營的idPool
          ????????????PartitionIDPool?partitionPool?=?idPools.get(partitionID);
          ????????????//?如果當前分區(qū)對應(yīng)的IDPool為空,則創(chuàng)建一個默認的IDPool,默認size?=?0
          ????????????if?(partitionPool?==?null)?{
          ????????????????//?在PartitionIDPool中包含多種類型對應(yīng)的StandardIDPool類型
          ????????????????//?StandardIDPool中包含對應(yīng)的block信息和count信息
          ????????????????partitionPool?=?new?PartitionIDPool(partitionID,?idAuthority,?idManager,?renewTimeoutMS,?renewBufferPercentage);
          ????????????????//?緩存下來
          ????????????????idPools.putIfAbsent(partitionID,partitionPool);
          ????????????????//?從緩存中再重新拿出
          ????????????????partitionPool?=?idPools.get(partitionID);
          ????????????}
          ????????????//?確保partitionPool不為空
          ????????????Preconditions.checkNotNull(partitionPool);
          ????????????//?判斷當前分區(qū)的IDPool是否枯竭;已經(jīng)被用完
          ????????????if?(partitionPool.isExhausted())?{
          ????????????????//?如果被用完,則將該分區(qū)id放到對應(yīng)的緩存中,避免之后獲取分區(qū)id再獲取到該分區(qū)id
          ????????????????placementStrategy.exhaustedPartition(partitionID);
          ????????????????//?拋出IDPool異常,?最外層捕獲,然后進行重試獲取節(jié)點id
          ????????????????throw?new?IDPoolExhaustedException("Exhausted?id?pool?for?partition:?"?+?partitionID);
          ????????????}
          ????????????//?存儲當前類型對應(yīng)的IDPool,因為partitionPool中保存好幾個類型的IDPool
          ????????????IDPool?idPool;
          ????????????if?(element?instanceof?JanusGraphRelation)?{
          ????????????????idPool?=?partitionPool.getPool(PoolType.RELATION);
          ????????????}?else?{
          ????????????????Preconditions.checkArgument(userVertexIDType!=null);
          ????????????????idPool?=?partitionPool.getPool(PoolType.getPoolTypeFor(userVertexIDType));
          ????????????}
          ????????????try?{
          ????????????????//?重要?。。?!?依據(jù)給定的IDPool獲取count值?。。?!
          ????????????????//?在此語句中設(shè)計 block的初始化?和 double buffer block的處理!
          ????????????????count?=?idPool.nextID();
          ????????????????partitionPool.accessed();
          ????????????}?catch?(IDPoolExhaustedException?e)?{?//?如果該IDPool被用完,拋出IDPool異常,?最外層捕獲,然后進行重試獲取節(jié)點id
          ????????????????log.debug("Pool?exhausted?for?partition?id?{}",?partitionID);
          ????????????????placementStrategy.exhaustedPartition(partitionID);
          ????????????????partitionPool.exhaustedIdPool();
          ????????????????throw?e;
          ????????????}
          ????????}

          ????????//?組裝最終的分布式id:[count + partition id + ID padding]
          ????????long?elementId;
          ????????if?(element?instanceof?InternalRelation)?{
          ????????????elementId?=?idManager.getRelationID(count,?partitionID);
          ????????}?else?if?(element?instanceof?PropertyKey)?{
          ????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.UserPropertyKey,count);
          ????????}?else?if?(element?instanceof?EdgeLabel)?{
          ????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.UserEdgeLabel,?count);
          ????????}?else?if?(element?instanceof?VertexLabel)?{
          ????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.VertexLabel,?count);
          ????????}?else?if?(element?instanceof?JanusGraphSchemaVertex)?{
          ????????????elementId?=?IDManager.getSchemaId(IDManager.VertexIDType.GenericSchemaType,count);
          ????????}?else?{
          ????????????elementId?=?idManager.getVertexID(count,?partitionID,?userVertexIDType);
          ????????}

          ????????Preconditions.checkArgument(elementId?>=?0);
          ????????//?對節(jié)點對象賦值其分布式唯一id
          ????????element.setId(elementId);
          ????}

          上述代碼,我們拿到了對應(yīng)的IdPool,有兩種情況:

          • 第一次獲取分布式id時,分區(qū)對應(yīng)的IDPool初始化為默認的size = 0的IDPool
          • 分區(qū)對應(yīng)的IDPool不是初次獲取

          這兩種情況的處理,都在代碼count = idPool.nextID()StandardIDPool類中的nextID()方法中被處理!

          在分析該代碼之前,我們需要知道 PartitionIDPoolStandardIDPool的關(guān)系:

          每個partition都有一個對應(yīng)的PartitionIDPool extends EnumMap 是一個枚舉map類型;

          每一個PartitionIDPool都有對應(yīng)的不同類型的StandardIDPool

          • NORMAL_VERTEX:用于vertex id的分配
          • UNMODIFIABLE_VERTEX:用于schema label id的分配
          • RELATION:用于edge id的分配

          StandardIDPool中包含多個字段,分別代表不同的含義,抽取幾個重要的字段進行介紹:

          ????private?static?final?int?RENEW_ID_COUNT?=?100;?
          ????private?final?long?idUpperBound;?//?Block的最大值,默認為2的55次冪
          ????private?final?int?partition;?//?當前pool對應(yīng)的分區(qū)
          ????private?final?int?idNamespace;?//?標識pool為那種類型的pool,上述的三種類型NORMAL_VERTEX、UNMODIFIABLE_VERTEX、RELATION;值為當前枚舉值在枚舉中的位置

          ????private?final?Duration?renewTimeout;//?重新獲取block的超時時間
          ????private?final?double?renewBufferPercentage;//?雙buffer中,當?shù)谝粋€buffer?block使用的百分比,到達配置的百分比則觸發(fā)other?buffer?block的獲取

          ????private?IDBlock?currentBlock;?//?當前的block
          ????private?long?currentIndex;?//?標識當前block使用到那一個位置
          ????private?long?renewBlockIndex;?//?依據(jù)currentBlock.numIds()*renewBufferPercentage來獲取這個值,主要用于在當前的block在消費到某個index的時候觸發(fā)獲取下一個buffer?block

          ????private?volatile?IDBlock?nextBlock;//?雙buffer中的另外一個block

          ????private?final?ThreadPoolExecutor?exec;//?異步獲取雙buffer的線程池

          6、調(diào)用了StandardIDPool類中的nextID方法

          經(jīng)過上述分析,我們知道,分布式唯一id的唯一性是由在partition維度下的count的值的唯一性來保證的;

          上述代碼通過調(diào)用IDPool的nextId來獲取count值;

          下述代碼就是獲取count的邏輯;

          ????@Override
          ????public?synchronized?long?nextID()?{
          ????????//?currentIndex標識當前的index小于current?block的最大值
          ????????assert?currentIndex?<=?currentBlock.numIds();

          ????????//?此處涉及兩種情況:
          ????????// 1、分區(qū)對應(yīng)的IDPool是第一次被初始化;則currentIndex =?0;currentBlock.numIds()?=?0;
          ????????//?2、分區(qū)對應(yīng)的該IDPool不是第一次,但是此次的index正好使用到了current?block的最后一個count
          ????????if?(currentIndex?==?currentBlock.numIds())?{
          ????????????try?{
          ????????????????//?將current?block賦值為next?block
          ????????????????//?next?block置空?并計算renewBlockIndex
          ????????????????nextBlock();
          ????????????}?catch?(InterruptedException?e)?{
          ????????????????throw?new?JanusGraphException("Could?not?renew?id?block?due?to?interruption",?e);
          ????????????}
          ????????}
          ????????
          ????????//?在使用current block的過程中,當current index ?==? renewBlockIndex時,觸發(fā)double buffer next block的異步獲取?。。?!
          ????????if?(currentIndex?==?renewBlockIndex)?{
          ????????????//?異步獲取next?block
          ????????????startIDBlockGetter();
          ????????}
          ????????
          ????????//?生成最終的count
          ????????long?returnId?=?currentBlock.getId(currentIndex);
          ????????//?current?index?+?1
          ????????currentIndex++;
          ????????if?(returnId?>=?idUpperBound)?throw?new?IDPoolExhaustedException("Reached?id?upper?bound?of?"?+?idUpperBound);
          ????????log.trace("partition({})-namespace({})?Returned?id:?{}",?partition,?idNamespace,?returnId);
          ????????//?返回最終獲取的分區(qū)維度的全局唯一count
          ????????return?returnId;
          ????}

          上述代碼中進行了兩次判斷:

          • currentIndex == currentBlock.numIds():
            • 第一次生成分布式id:此處判斷即為 0==0;然后生成新的block
            • 非第一次生成分布式id:等于情況下標識當前的block已經(jīng)使用完了,需要切換為next block
          • currentIndex == renewBlockIndex
            • renew index:標識index使用多少后開始獲取下一個double buffer 的next block;有一個默認值100,主要為了兼容第一次分布式id的生成;相等則會觸發(fā)異步獲取下一個next block

          下面我們分別對nextBlock();邏輯和startIDBlockGetter();進行分析;

          7、調(diào)用了StandardIDPool類中的nextBlock方法

          ????private?synchronized?void?nextBlock()?throws?InterruptedException?{
          ????????//?在分區(qū)對應(yīng)的IDPool第一次使用時,double?buffer的nextBlock為空
          ????????if?(null?==?nextBlock?&&?null?==?idBlockFuture)?{
          ????????????//?異步啟動?獲取id?block
          ????????????startIDBlockGetter();
          ????????}

          ????????//?也是在分區(qū)對應(yīng)的IDPool第一次使用時,因為上述為異步獲取,所以在執(zhí)行到這一步時nextBlock可能還沒拿到
          ????????//?所以需要阻塞等待block的獲取
          ????????if?(null?==?nextBlock)?{
          ????????????waitForIDBlockGetter();
          ????????}

          ????????//?將當前使用block指向next?block
          ????????currentBlock?=?nextBlock;
          ????????//?index清零
          ????????currentIndex?=?0;
          ????????//?nextBlock置空
          ????????nextBlock?=?null;

          ????????//?renewBlockIndex用于雙buffer中,當?shù)谝粋€buffer?block使用的百分比,到達配置的百分比則觸發(fā)other?buffer?block的獲取
          ????????//?值current?block?對應(yīng)的count數(shù)量?-?(值current?block?對應(yīng)的count數(shù)量?*?為renewBufferPercentage配置的剩余空間百分比)
          ????????//?在使用current block的時候,當current index ?==? renewBlockIndex時,觸發(fā)double buffer next block的異步獲?。。。?!
          ????????renewBlockIndex?=?Math.max(0,currentBlock.numIds()-Math.max(RENEW_ID_COUNT,?Math.round(currentBlock.numIds()*renewBufferPercentage)));
          ????}

          主要是做了三件事:

          • 1、block是否為空,為空的話則異步獲取一個block
          • 2、nextBlock不為空的情況下:next賦值到current、next置空、index置零
          • 3、計算獲取下一個nextBlock的觸發(fā)index renewBlockIndex值

          8、調(diào)用了StandardIDPool類中的startIDBlockGetter方法

          ????private?synchronized?void?startIDBlockGetter()?{
          ????????Preconditions.checkArgument(idBlockFuture?==?null,?idBlockFuture);
          ????????if?(closed)?return;?//Don't?renew?anymore?if?closed
          ????????//Renew?buffer
          ????????log.debug("Starting?id?block?renewal?thread?upon?{}",?currentIndex);
          ????????//?創(chuàng)建一個線程對象,包含給定的權(quán)限控制類、分區(qū)、命名空間、超時時間
          ????????idBlockGetter?=?new?IDBlockGetter(idAuthority,?partition,?idNamespace,?renewTimeout);
          ????????//?提交獲取double?buffer的線程任務(wù),異步執(zhí)行
          ????????idBlockFuture?=?exec.submit(idBlockGetter);
          ????}

          其中創(chuàng)建一個線程任務(wù),提交到線程池exec進行異步執(zhí)行;

          下面看下,線程類的call方法主要是調(diào)用了idAuthority.getIDBlock方法,這個方法主要是基于Hbase來獲取還未使用的block;

          ????/**
          ?????*?獲取double?buffer?block的線程類
          ?????*/

          ????private?static?class?IDBlockGetter?implements?Callable<IDBlock>?{

          ????????//?省略部分代碼
          ????????@Override
          ????????public?IDBlock?call()?{
          ????????????Stopwatch?running?=?Stopwatch.createStarted();
          ????????????try?{
          ????????????????//?此處調(diào)用idAuthority?調(diào)用HBase進行占用獲取Block
          ????????????????IDBlock?idBlock?=?idAuthority.getIDBlock(partition,?idNamespace,?renewTimeout);
          ????????????????return?idBlock;
          ????????????}?catch?(BackendException?e)?{}
          ????????}
          ????}

          9、調(diào)用ConsistentKeyIDAuthority類的getIDBlock方法

          ????@Override
          ????public?synchronized?IDBlock?getIDBlock(final?int?partition,?final?int?idNamespace,?Duration?timeout)?throws?BackendException?{
          ??????
          ????????//?開始時間
          ????????final?Timer?methodTime?=?times.getTimer().start();

          ????????//?獲取當前命名空間配置的blockSize,默認值10000;可自定義配置
          ????????final?long?blockSize?=?getBlockSize(idNamespace);
          ????????//?獲取當前命名空間配置的最大id值idUpperBound;值為:2的55次冪大小
          ????????final?long?idUpperBound?=?getIdUpperBound(idNamespace);
          ????????// uniqueIdBitWidth標識uniqueId占用的位數(shù);uniqueId為了兼容“關(guān)閉分布式id唯一性保障”的開關(guān)情況,uniqueIdBitWidth默認值=4
          ????????//?值:64-1(默認0)-5(分區(qū)占用位數(shù))-3(ID Padding占用位數(shù))-4(uniqueIdBitWidth)?= 51;標識block中的上限為2的51次冪大小
          ????????final?int?maxAvailableBits?=?(VariableLong.unsignedBitLength(idUpperBound)-1)-uniqueIdBitWidth;

          ????????//?標識block中的上限為2的51次冪大小
          ????????final?long?idBlockUpperBound?=?(1L?<
          ????????// UniquePID用盡的UniquePID集合,默認情況下,randomUniqueIDLimit =?0;
          ????????final?List?exhaustedUniquePIDs?=?new?ArrayList<>(randomUniqueIDLimit);

          ????????//?默認0.3秒??用于處理TemporaryBackendException異常情況(后端存儲出現(xiàn)問題)下:阻塞一斷時間,然后進行重試
          ????????Duration?backoffMS?=?idApplicationWaitMS;

          ????????//?從開始獲取IDBlock開始,持續(xù)超時時間(默認2分鐘)內(nèi)重試獲取IDBlock
          ????????while?(methodTime.elapsed().compareTo(timeout)?0)?{
          ????????????final?int?uniquePID?=?getUniquePartitionID();?//?獲取uniquePID,默認情況下“開啟分布式id唯一性控制”,值?=?0;?當“關(guān)閉分布式id唯一性控制”時為一個隨機值
          ????????????final?StaticBuffer?partitionKey?=?getPartitionKey(partition,idNamespace,uniquePID);?//?依據(jù)partition?+?idNamespace?+?uniquePID組裝一個RowKey
          ????????????try?{
          ????????????????long?nextStart?=?getCurrentID(partitionKey);?//?從Hbase中獲取當前partition對應(yīng)的IDPool中被分配的最大值,用來作為當前申請新的block的開始值
          ????????????????if?(idBlockUpperBound?-?blockSize?<=?nextStart)?{?//?確保還未被分配的id池中的id個數(shù),大于等于blockSize
          ????????????????????//?相應(yīng)處理
          ????????????????}

          ????????????????long?nextEnd?=?nextStart?+?blockSize;?//?獲取當前想要獲取block的最大值
          ????????????????StaticBuffer?target?=?null;

          ????????????????//?attempt?to?write?our?claim?on?the?next?id?block
          ????????????????boolean?success?=?false;
          ????????????????try?{
          ????????????????????Timer?writeTimer?=?times.getTimer().start();?//?===開始:開始進行插入自身的block需求到Hbase
          ????????????????????target?=?getBlockApplication(nextEnd,?writeTimer.getStartTime());?//?組裝對應(yīng)的Column:?-nextEnd?+??當前時間戳?+?uid(唯一標識當前圖實例)
          ????????????????????final?StaticBuffer?finalTarget?=?target;?//?copy?for?the?inner?class
          ????????????????????BackendOperation.execute(txh?->?{?//?異步插入當前生成的RowKey?和?Column
          ????????????????????????idStore.mutate(partitionKey,?Collections.singletonList(StaticArrayEntry.of(finalTarget)),?KeyColumnValueStore.NO_DELETIONS,?txh);
          ????????????????????????return?true;
          ????????????????????},this,times);
          ????????????????????writeTimer.stop();?//?===結(jié)束:插入完成

          ????????????????????final?boolean?distributed?=?manager.getFeatures().isDistributed();
          ????????????????????Duration?writeElapsed?=?writeTimer.elapsed();?//?===獲取方才插入的時間耗時
          ????????????????????if?(idApplicationWaitMS.compareTo(writeElapsed)?0?&&?distributed)?{?//?判斷是否超過配置的超時時間,超過則報錯TemporaryBackendException,然后等待一斷時間進行重試
          ????????????????????????throw?new?TemporaryBackendException("Wrote?claim?for?id?block?["?+?nextStart?+?",?"?+?nextEnd?+?")?in?"?+?(writeElapsed)?+?"?=>?too?slow,?threshold?is:?"?+?idApplicationWaitMS);
          ????????????????????}?else?{

          ????????????????????????assert?0?!=?target.length();
          ????????????????????????final?StaticBuffer[]?slice?=?getBlockSlice(nextEnd);?//?組裝下述基于上述Rowkey的Column的查找范圍:(-nextEnd +?0?:?0nextEnd +?最大值)??????

          ????????????????????????final?List?blocks?=?BackendOperation.execute(?//?異步獲取指定Rowkey和指定Column區(qū)間的值
          ????????????????????????????(BackendOperation.Transactional>)?txh?->?idStore.getSlice(new?KeySliceQuery(partitionKey,?slice[0],?slice[1]),?txh),this,times);
          ????????????????????????if?(blocks?==?null)?throw?new?TemporaryBackendException("Could?not?read?from?storage");
          ????????????????????????if?(blocks.isEmpty())
          ????????????????????????????throw?new?PermanentBackendException("It?seems?there?is?a?race-condition?in?the?block?application.?"?+
          ????????????????????????????????????"If?you?have?multiple?JanusGraph?instances?running?on?one?physical?machine,?ensure?that?they?have?unique?machine?idAuthorities");

          ????????????????????????if?(target.equals(blocks.get(0).getColumnAs(StaticBuffer.STATIC_FACTORY)))?{?//?如果獲取的集合中,當前的圖實例插入的數(shù)據(jù)是第一條,則表示獲取block;?如果不是第一條,則獲取Block失敗
          ????????????????????????????//?組裝IDBlock對象
          ????????????????????????????ConsistentKeyIDBlock?idBlock?=?new?ConsistentKeyIDBlock(nextStart,blockSize,uniqueIdBitWidth,uniquePID);

          ????????????????????????????if?(log.isDebugEnabled())?{
          ????????????????????????????????????idBlock,?partition,?idNamespace,?uid);
          ????????????????????????????}

          ????????????????????????????success?=?true;
          ????????????????????????????return?idBlock;?//?返回
          ????????????????????????}?else?{?}
          ????????????????????}
          ????????????????}?finally?{
          ????????????????????if?(!success?&&?null?!=?target)?{?//?在獲取Block失敗后,刪除當前的插入;?如果沒有失敗,則保留當前的插入,在hbase中標識該Block已經(jīng)被占用
          ????????????????????????//Delete?claim?to?not?pollute?id?space
          ????????????????????????for?(int?attempt?=?0;?attempt?//?回滾:刪除當前插入,嘗試次數(shù)5次
          ????????????????????????}
          ????????????????????}
          ????????????????}
          ????????????}?catch?(UniqueIDExhaustedException?e)?{
          ????????????????//?No?need?to?increment?the?backoff?wait?time?or?to?sleep
          ????????????????log.warn(e.getMessage());
          ????????????}?catch?(TemporaryBackendException?e)?{
          ????????????????backoffMS?=?Durations.min(backoffMS.multipliedBy(2),?idApplicationWaitMS.multipliedBy(32));
          ????????????????sleepAndConvertInterrupts(backoffMS);?\
          ????????????}
          ????????}

          ????????throw?new?TemporaryLockingException();
          ????}

          主要的邏輯就是:

          • 組裝Rowkey:partition + idNameSpace+unquePId
          • 組裝Column:-nextEnd+now time+uid
          • RowKey+Column插入Hbase
          • 獲取的上述組裝的RowKey 基于(-nextEnd + 0 : -nextEnd + max)范圍的所有Column集合
          • 判斷集合的第一個Column是不是當前插入的Column,是的話則占用block成功,不是的話則占用失敗,刪除剛才占用并進行重試

          最終:異步獲取到了唯一占用的Block,然后生成對應(yīng)的唯一count,組裝最后的唯一id

          整體的調(diào)用流程如下:

          在這里插入圖片描述

          四:其他類型的id生成

          上述我們主要依據(jù)生成節(jié)點id(vertex id)的過程來進行分析

          JanusGraph中還包含edge id、property idschema label id等幾種的分布式id生成

          所有類型的分布式id的生成主要思想和邏輯都幾乎相同,只是一些具體的邏輯可能有所不同,我們理解了vertex id的分布式id生成流程,其他的也可以理解了。

          1、property id的生成

          在JanusGraph中的property的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;

          property id的 生成和 vertex id有兩點不同:

          • ID的組成部分:?在vertex id中組成部分包含count+partition+ID Padding;而在property id中沒有ID Padding部分,其組成為count + partition
          ????????long?id?=?(count<????????if?(type!=null)?id?=?type.addPadding(id);?//?此時,type?=?null
          ????????return?id;
          • partition id的獲取方式:在生成vertex id時,partition id是隨機獲取的;而在生成property id時,partition id是獲取的當前節(jié)點對應(yīng)的partition id,如果節(jié)點獲取不到分區(qū)id,則隨機生成一個;
          ????????????if?(element?instanceof?InternalRelation)?{?//?屬性?+?邊
          ????????????????InternalRelation?relation?=?(InternalRelation)element;
          ????????????????if?(attempt?????????????????????InternalVertex?incident?=?relation.getVertex(attempt);
          ????????????????????Preconditions.checkArgument(incident.hasId());
          ????????????????????if?(!IDManager.VertexIDType.PartitionedVertex.is(incident.longId())?||?relation.isProperty())?{?//?獲取對應(yīng)節(jié)點已有的partition?id
          ????????????????????????partitionID?=?getPartitionID(incident);
          ????????????????????}?else?{
          ????????????????????????continue;
          ????????????????????}
          ????????????????}?else?{?//?如果對應(yīng)的節(jié)點都沒有,則隨機獲取一個partition?id
          ????????????????????partitionID?=?placementStrategy.getPartition(element);
          ????????????????}

          2、Edge id的生成

          在JanusGraph中的edge的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;

          edge id的 生成和 vertex id有兩點不同:

          • ID的組成部分:?在vertex id中組成部分包含count+partition+ID Padding;而在edge id中沒有ID Padding部分,其組成為count + partition,代碼同property id的生成代碼
          • partition id的獲取方式:在生成vertex id時,partition id是隨機獲取的;而在生成edge id時,partition id是獲取的當前source vertex 或者 target vertex對應(yīng)的partition id,如果節(jié)點獲取不到分區(qū)id,則隨機生成一個,代碼同property id的生成代碼;

          3、Schema相關(guān)id的生成

          在JanusGraph中的schema相關(guān)id的分布式唯一id的生成,整體邏輯和vertex id的生成邏輯大體相同;

          schema相關(guān)id的生成分為四種:PropertyKey、EdgeLabel、VertexLabel、JanusGraphSchemaVertex

          • ID的組成部分:?在vertex id中組成部分包含count+partition+ID Padding;在schema對應(yīng)的id生成,這四種產(chǎn)生的id對應(yīng)的結(jié)構(gòu)都是一樣的:count + 對應(yīng)類型的固定后綴
          return?(count?<
          • partition id的獲取方式:在生成vertex id時,partition id是隨機獲取的;而在生成schema id時,partition id是默認的partition id = 0
          public?static?final?int?SCHEMA_PARTITION?=?0;
          if?(element?instanceof?JanusGraphSchemaVertex)?{
          ????????????????partitionID?=?IDManager.SCHEMA_PARTITION;?//?默認分區(qū)
          }
          程序員專欄
          ?掃碼關(guān)注填加客服?
          長按識別下方二維碼進群

          近期精彩內(nèi)容推薦:??

          ?又一個程序員,被抓捕?。ㄕ鎸嵤录?/a>

          ?程序員有個可愛女朋友是種什么體驗?

          ?“12306”的架構(gòu)到底有多牛逼?

          ?csv文件讀寫亂碼問題的一個簡單解決方法




          在看點這里好文分享給更多人↓↓

          瀏覽 14
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                    <th id="afajh"><progress id="afajh"></progress></th>
                    国产三级网站免费观看 | 欧美性开放网站 | 久久精品10 | 成人欧美一区二区三区男男 | 老鸭窝毛片 |