<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          自己動手實(shí)現(xiàn)一個loghub(或kafka)分片消費(fèi)負(fù)載均衡器

          共 43836字,需瀏覽 88分鐘

           ·

          2021-04-09 15:04

          走過路過不要錯過

          點(diǎn)擊藍(lán)字關(guān)注我們


          一般地,像kafka之類的消息中間件,作為一個可以保持歷史消息的組件,其消費(fèi)模型一般是主動拉取方式。這是為了給消費(fèi)者足夠的自由,回滾或者前進(jìn)。

          然而,也正是由于將消費(fèi)消息的權(quán)力交給了消費(fèi)者,所以,消費(fèi)者往往需要承擔(dān)更多的責(zé)任。比如:需要自行保存消費(fèi)偏移量,以便后續(xù)可以知道從哪里繼續(xù)。而當(dāng)這一點(diǎn)處理不好時,則可能帶來一些麻煩。

          不管怎么樣,解決方案也都是現(xiàn)成的,咱們也不用擔(dān)心。

          今天我們要談?wù)摰氖且粋€場景:如何讓n個機(jī)器消費(fèi)m個分片數(shù)據(jù)?(帶狀態(tài)的,即不能任意機(jī)器消費(fèi)任意shard)

          這在消息中間件的解決方案里,明白地寫著,使用消費(fèi)者群組就可以實(shí)現(xiàn)了。具體來說就是,每個分片至多會被一機(jī)器消費(fèi),每個機(jī)器則可以消費(fèi)多個分片數(shù)據(jù)。即機(jī)器數(shù)據(jù)小于分片數(shù)時,分片會被均衡地分配到消費(fèi)者中。當(dāng)機(jī)器數(shù)大于分片數(shù)時,多余的機(jī)器將不做任何事情。

          好吧,既然官方已經(jīng)說明白了,那咱們應(yīng)該就不再需要自己搞一個輪子了吧。

          但是,我還有個場景:如果我要求在機(jī)器做負(fù)載重平衡時,需要保證被抽取出去的機(jī)器分片,至少保留一段時間,不允許任何機(jī)器消費(fèi)該分片,因?yàn)榭赡苓€有數(shù)據(jù)需要備份。

          針對這種場景,我想官方也許是有提供回調(diào)函數(shù)之類的解決方案的吧。不管了,反正我沒找到,只能自己先造個輪子了。

          本文場景前提:

          1. 使用loghub作為消息中間件(原理同kafka);
          2. 整個數(shù)據(jù)有m個分片shard;
          3. 整個消費(fèi)者集群有n臺機(jī)器;
          4. 每個分片的數(shù)據(jù)需要集中到一機(jī)器上做有狀態(tài)處理;
          5. 可以借助redis保存有狀態(tài)數(shù)據(jù),以便消費(fèi)者機(jī)器做優(yōu)雅停機(jī);

          最簡單的方案是,使 n=m, 每臺機(jī)器消費(fèi)一個shard, 這樣狀態(tài)永遠(yuǎn)不會錯亂。

          但是這樣明顯可擴(kuò)展能力太差了!

          比如有時數(shù)據(jù)量小了,雖然分片還在,但是完全不用那么多機(jī)器的時候,如何縮減機(jī)器?
          比如由于數(shù)據(jù)壓力大了,我想增加下分片數(shù),以提高發(fā)送者性能,但是消費(fèi)者我還不想理他,消費(fèi)慢點(diǎn)無所謂?

          其實(shí),我們可以使用官方的消費(fèi)者群組方法,可以動態(tài)縮減機(jī)器。

          但是這個有狀態(tài)就比較難做到了。

          以上痛點(diǎn),總結(jié)下來就是,可擴(kuò)展性問題。

          想象中的輪子是怎么樣的?

          1. 需要有個注冊中心,管理機(jī)器的上下線監(jiān)控;
          2. 需要有負(fù)載均衡器,負(fù)載將shard的負(fù)載均衡的分布到在線機(jī)器中;
          3. 需要有每個機(jī)器自己消費(fèi)的分片記錄,以使機(jī)器自身有據(jù)可查;
          4. 需要有每個分片的消費(fèi)情況,以判定出哪些分片已分配給哪些機(jī)器;

          我們來細(xì)看下實(shí)現(xiàn):

          【1】均衡協(xié)調(diào)器主框架:

          import com.aliyun.openservices.log.Client;import com.aliyun.openservices.log.common.Shard;import com.aliyun.openservices.log.exception.LogException;import com.aliyun.openservices.log.response.ListShardResponse;import com.test.common.config.LogHubProperties;import com.test.utils.RedisPoolUtil;import com.google.common.collect.Lists;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
          import java.net.InetAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.Comparator;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
          import static com.test.dispatcher.work.RedisKeyConstants.MAX_CONSUMER_SHARD_LOAD;

          /** * loghub動態(tài)消費(fèi)者 shard分配shard 協(xié)調(diào)器 * */public class LoghubConsumerShardCoWorker implements Runnable {
          private static final Logger logger = LoggerFactory.getLogger(LoghubConsumerShardCoWorker.class);
          private LogHubProperties logHubProperties;
          private RedisPoolUtil redisPoolUtil;
          private Client mClient;
          private ShardAssignMaster shardAssignMaster;
          private String HOST_NAME;
          public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties) { this(redisPoolUtil, logHubProperties, null); }
          public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties, String hostName) { this.redisPoolUtil = redisPoolUtil; this.logHubProperties = logHubProperties; this.HOST_NAME = hostName;
          initSharedVars(); initConsumerClient(); initShardAssigner(); getAllShardList(); registerSelfConsumer(); startHeartBeatThread(); }
          /** * 開啟心跳線程,?;?/span> */ private void startHeartBeatThread() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); executorService.scheduleAtFixedRate(() -> { String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME; redisPoolUtil.expire(serverConsumeCacheKey, 30); shardAssignMaster.sendHeartbeat(HOST_NAME); }, 30, 25, TimeUnit.SECONDS); }
          /** * 初始化客戶端實(shí)例 */ private void initConsumerClient() { this.mClient = new Client(logHubProperties.getEndpoint(), logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey()); }
          /** * 初始化分片分配控制器 */ private void initShardAssigner() { shardAssignMaster = new ShardAssignMaster(redisPoolUtil); }
          /** * 初始化公共變量 */ private void initSharedVars() { try { if(HOST_NAME != null) { return; } HOST_NAME = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { logger.error("init error : 獲取服務(wù)器主機(jī)名失敗", e); throw new RuntimeException("init error : 獲取服務(wù)器主機(jī)名失敗"); } }
          /** * 將自己作為消費(fèi)者注冊到消費(fèi)者列表中,以判定后續(xù)可以進(jìn)行消費(fèi) */ private void registerSelfConsumer() { shardAssignMaster.registerConsumer(HOST_NAME); shardAssignMaster.sendHeartbeat(HOST_NAME); }
          @Override public void run() { try { checkConsumerSharding(); } catch (Exception e) { logger.error("動態(tài)分配shard 發(fā)生異常", e); } }
          /** * job 只做一件事,即檢查 shard 的消費(fèi)情況,不平衡則處理 */ private void checkConsumerSharding() { try { if (tryCoWorkerLock()) { // step1. 檢查是否需要進(jìn)行shard分配 // 集群消費(fèi)loghub數(shù)據(jù)動態(tài)伸縮策略 // 1. 啟動時先去獲取部分片數(shù),備用; // 2. 應(yīng)用啟動后,把自己注冊到注冊中心或redis中; // 3. 根據(jù)注冊上來的機(jī)器列表,按平均分配策略分配shard(只能由一個機(jī)器來分配,其他機(jī)器處理分布式鎖競爭失敗,等待狀態(tài)); // 4. 分配好后,釋放鎖,各機(jī)器開始消費(fèi),如機(jī)器A消費(fèi)shard 0/3,則機(jī)器1以輪詢的方式依次從shard 0/3 摘取數(shù)據(jù)消費(fèi); // 5. 分配好的數(shù)據(jù)結(jié)構(gòu)為:prefix+ip保存具體數(shù)據(jù),另外將自己的key添加到另一個zset中,標(biāo)識自己存活;自己的key有效期為30秒;使用另一維度 shard,保存每個shard被占用情況,使用hash保存,key為shard,value為當(dāng)有占用時為機(jī)器ip或主機(jī)名,當(dāng)無占用時為null或空串; // 6. 以上數(shù)據(jù)刷入,將在機(jī)器搶占到shard更新數(shù)據(jù);shard總數(shù)信息暫時不允許在運(yùn)行期間進(jìn)行變更;(即如果變理shard必須重啟服務(wù)器) // 7. 機(jī)器下線時,占用的key將自動過期;(考慮是否主動刪除) // 8. 各機(jī)器上啟動一個后臺掃描線程,每隔30秒掃描一次。掃描zset,取出所有值后查看是否存在相應(yīng)的key,如果不存在說明機(jī)器已下線,需要重新分配其占用的shard; // 9. 重新分配策略,使用一致性hash算法實(shí)現(xiàn); // 10. 機(jī)器上線時,使用一致性hash算法重新平衡shard; // 11. 使用分布式鎖保證分配進(jìn)程只有一個; CheckConsumerShardingResultContainer resultContainer = checkShardConsumerReBalanceStatus(); if(resultContainer.getStatusResultType() != ReBalanceStatusResultEnum.OK) { reBalanceConsumerShard(resultContainer); } } } finally { releaseCoWorkerLock(); } }
          /** * 確認(rèn)機(jī)器和shard是否需要再平衡 * * @return 結(jié)果狀態(tài)集 */ private CheckConsumerShardingResultContainer checkShardConsumerReBalanceStatus() { // step1. 檢查自身是否存在shard, 不存在則立即進(jìn)行一次重分配(消費(fèi)者機(jī)器數(shù)大于分片數(shù)時,重平衡動作將是無效動作) // step2. 檢查所有shard列表,是否有未被分配的shard,如有,立即觸發(fā)一次重分配 // step3. 檢查是否有負(fù)荷比較高的機(jī)器,如有觸發(fā)平衡(功能預(yù)留,此功能需要基于統(tǒng)計(jì)信息) CheckConsumerShardingResultContainer resultContainer = new CheckConsumerShardingResultContainer();
          final List<String> activeServersList = shardAssignMaster.getAllOnlineServerList(); final List<String> allShardList = getAllShardList();
          // 計(jì)算空閑機(jī)器 Map<String, Integer> hostConsumeLoadCountMap = new HashMap<>(); List<String> idleServerList = filterIdleServerList(activeServersList, hostConsumeLoadCountMap);
          // 計(jì)算未被分配的shard List<String> unAssignedShardList = filterUnAssignedShardList(allShardList);
          // 根據(jù)資源信息,得出目前的負(fù)載狀態(tài) ReBalanceStatusResultEnum statusResult = computeReBalanceStatusOnResources( unAssignedShardList, idleServerList, hostConsumeLoadCountMap);
          resultContainer.setAllServerList(activeServersList); resultContainer.setAllShardList(allShardList); resultContainer.setIdleServerList(idleServerList); resultContainer.setUnAssignedShardList(unAssignedShardList); resultContainer.setServerConsumeShardLoad(hostConsumeLoadCountMap); resultContainer.setStatusResultType(statusResult); return resultContainer; }
          /** * 根據(jù)給定資源信息,計(jì)算出目前的負(fù)載狀態(tài) * * @param unAssignedShardList 未分配的shard列表 * @param idleServerList 空閑機(jī)器列表 * @param hostConsumeLoadMap 機(jī)器消費(fèi)計(jì)數(shù)容器(負(fù)載情況) * @return 狀態(tài)值 */ private ReBalanceStatusResultEnum computeReBalanceStatusOnResources( List<String> unAssignedShardList, List<String> idleServerList, Map<String, Integer> hostConsumeLoadMap) { // 沒有未分配的shard,檢測是否平衡即可 // 0. 有空閑機(jī)器,則直接分配給空閑機(jī)器即可 // 1. 最大消費(fèi)shard-最小消費(fèi)shard數(shù) >= 2, 則說明有機(jī)器消費(fèi)過多shard,需重分配 // 2. 機(jī)器負(fù)載平衡,無須調(diào)整 if(unAssignedShardList.isEmpty()) { int minConsume = MAX_CONSUMER_SHARD_LOAD; int maxConsume = 0; for (Map.Entry<String, Integer> entry : hostConsumeLoadMap.entrySet()) { int gotCount = entry.getValue(); if(gotCount > maxConsume) { maxConsume = gotCount; } if(gotCount < minConsume) { minConsume = gotCount; } }
          // 因有未分配的機(jī)器,假如現(xiàn)有的機(jī)器消費(fèi)都是2,則需要重分配的大壓力的機(jī)器 shard 給空閑機(jī)器 if(!idleServerList.isEmpty()) { if (maxConsume > 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED; } }
          // 有消費(fèi)相差2的機(jī)器,重新分配,從大數(shù)上借調(diào)到小數(shù)上 if(maxConsume > minConsume + 1) { return ReBalanceStatusResultEnum.HEAVY_LOAD_BALANCE_NEEDED; } return ReBalanceStatusResultEnum.OK; }
          // 有可用shard // 3. 有空閑機(jī)器,直接讓空閑shard分配給這些空閑機(jī)器就ok了 // 4. 沒有空閑機(jī)器,須將空閑shard 分配給負(fù)載小的機(jī)器 if(idleServerList.isEmpty()) { return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS; } return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS; }
          /** * 過濾出空閑的機(jī)器列表 * * @param activeServersList 所有機(jī)器列表 * @return 空閑機(jī)器集, 且將各自消費(fèi)數(shù)放入計(jì)數(shù)容器 */ private List<String> filterIdleServerList(List<String> activeServersList, Map<String, Integer> hostConsumeCountMap) { List<String> idleServerList = new ArrayList<>(); for (String hostname1 : activeServersList) { if(!shardAssignMaster.isConsumerServerAlive(hostname1)) { shardAssignMaster.invalidateOfflineServer(hostname1); continue; } int consumeCount; Set<String> consumeShardSet = shardAssignMaster.getServerDutyConsumeShardSet(hostname1); if(consumeShardSet == null || consumeShardSet.isEmpty()) { idleServerList.add(hostname1); consumeCount = 0; } else { consumeCount = consumeShardSet.size(); } hostConsumeCountMap.put(hostname1, consumeCount); } return idleServerList; }

          /** * 過濾出未分配的shard列表 * * @param allShardList 所有shard * @return 未分配的shard */ private List<String> filterUnAssignedShardList(List<String> allShardList) { List<String> unAssignedShardList = new ArrayList<>(); for (String shardId1 : allShardList) { String consumeHostname = shardAssignMaster.getShardAssignedServer(shardId1); // 如果不為空,則之前分配過,檢查機(jī)器是否下線 // 如果為空,則是第一次分配 if(!StringUtils.isBlank(consumeHostname)) { if(!shardAssignMaster.isConsumerServerAlive(consumeHostname)) { // 清除下線機(jī)器信息,將當(dāng)前shard置為空閑 shardAssignMaster.invalidateOfflineServer(consumeHostname); shardAssignMaster.invalidateShardAssignInfo(shardId1); unAssignedShardList.add(shardId1); } } else { unAssignedShardList.add(shardId1); } } return unAssignedShardList; }
          /** * 嘗試獲取協(xié)調(diào)者協(xié)調(diào)鎖 * * 在集群環(huán)境中,只允許有一個協(xié)調(diào)器在運(yùn)行 * * @return true:成功, false:失敗,不得進(jìn)行協(xié)調(diào)分配工作 */ private boolean tryCoWorkerLock() { return redisPoolUtil.getDistributedLock("distributedLock", HOST_NAME, 30); }
          /** * 釋放協(xié)調(diào)鎖,以便下次再競爭 */ private void releaseCoWorkerLock() { redisPoolUtil.releaseDistributedLock("distributedLock", HOST_NAME); }
          /** * 重新平衡消費(fèi)者和shard的關(guān)系 * * @param resultContainer 待重平衡狀態(tài) */ private void reBalanceConsumerShard(CheckConsumerShardingResultContainer resultContainer) {
          // 集群消費(fèi)loghub數(shù)據(jù)動態(tài)伸縮策略,根據(jù)負(fù)載狀態(tài),調(diào)用相應(yīng)策略進(jìn)行重平衡 StatusReBalanceStrategy strategy = StatusReBalanceStrategyFactory.createStatusReBalanceAlgorithm(resultContainer, shardAssignMaster); strategy.loadBalance(); }

          /** * 獲取分片列表 * * @return 分片列表,如: 0,1,2,3 */ private List<String> getAllShardList() { // 實(shí)時讀取列表 List<String> shardList = Lists.newArrayList(); try { ListShardResponse listShardResponse = mClient.ListShard(logHubProperties.getProjectName(), logHubProperties.getEventlogStore()); ArrayList<Shard> getShards = listShardResponse.GetShards(); for (Shard shard : getShards) { shardList.add(shard.GetShardId() + ""); } } catch (LogException e) { logger.error("loghub 獲取shard列表 error :", e); } return shardList; }
          }

          如上,就是協(xié)調(diào)均衡主框架。主要邏輯如下:

              1. 啟動時初始化各種端,分配器,注冊自己到控制中心等等;
              2. 以線程的形式,被外部以定時任務(wù)執(zhí)行的方式調(diào)用;
              3. 檢查任務(wù)前,須獲得檢查鎖,否則直接返回;
              4. 先獲得目前機(jī)器的所有消費(fèi)情況和shard的分配情況,得出資源負(fù)載數(shù)據(jù);
              5. 根據(jù)得到的數(shù)據(jù)信息,推算出目前的平衡狀態(tài);
              6. 根據(jù)平衡狀態(tài),調(diào)用相應(yīng)的平衡策略進(jìn)行重平衡;
              7. 等待下一次調(diào)度;

          檢查結(jié)果將作為后續(xù)選擇均衡策略的依據(jù),所以需要相應(yīng)的狀態(tài)容器保存。如下:

          /** * 集群狀態(tài)預(yù)檢查 結(jié)果容器 */class CheckConsumerShardingResultContainer {
          /** * 所有shard列表 */ private List<String> allShardList;
          /** * 未被分配的shard列表 */ private List<String> unAssignedShardList;
          /** * 所有機(jī)器列表 */ private List<String> allServerList;
          /** * 空閑的機(jī)器列表(未被分配shard) */ private List<String> idleServerList;
          /** * 機(jī)器消費(fèi)shard的負(fù)載計(jì)數(shù)容器 */ private Map<String, Integer> serverConsumeShardLoad;
          /** * 狀態(tài)檢查結(jié)果類型 */ private ReBalanceStatusResultEnum statusResultType;
          public Map<String, Integer> getServerConsumeShardLoad() { return serverConsumeShardLoad; }
          public void setServerConsumeShardLoad(Map<String, Integer> serverConsumeShardLoad) { this.serverConsumeShardLoad = serverConsumeShardLoad; }
          public List<String> getAllShardList() { return allShardList; }
          public void setAllShardList(List<String> allShardList) { this.allShardList = allShardList; }
          public List<String> getUnAssignedShardList() { return unAssignedShardList; }
          public void setUnAssignedShardList(List<String> unAssignedShardList) { this.unAssignedShardList = unAssignedShardList; }
          public List<String> getAllServerList() { return allServerList; }
          public void setAllServerList(List<String> allServerList) { this.allServerList = allServerList; }
          public List<String> getIdleServerList() { return idleServerList; }
          public void setIdleServerList(List<String> idleServerList) { this.idleServerList = idleServerList; }
          public ReBalanceStatusResultEnum getStatusResultType() { return statusResultType; }
          public void setStatusResultType(ReBalanceStatusResultEnum statusResultType) { this.statusResultType = statusResultType; }}

          針對多個平衡策略算法,使用一個工廠類來生產(chǎn)各種策略實(shí)例。如下:

          /** * 再平衡算法工廠類 */class StatusReBalanceStrategyFactory {
          /** * 無需做平衡的控制器 */ private static final StatusReBalanceStrategy EMPTY_BALANCER = new EmptyReBalancer();
          /** * 根據(jù)當(dāng)前的負(fù)載狀態(tài),創(chuàng)建對應(yīng)的負(fù)載均衡算法 * * @param resultContainer 負(fù)載狀態(tài)集 * @param shardAssignMaster 分片分配管理者實(shí)例 * @return 算法實(shí)例 */ public static StatusReBalanceStrategy createStatusReBalanceAlgorithm(CheckConsumerShardingResultContainer resultContainer, ShardAssignMaster shardAssignMaster) { ReBalanceStatusResultEnum balanceStatus = resultContainer.getStatusResultType(); switch (balanceStatus) { case OK: return EMPTY_BALANCER; case UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS: return new UnAssignedShardWithConsumerIdleReBalancer(shardAssignMaster, resultContainer.getUnAssignedShardList(), resultContainer.getIdleServerList()); case UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS: return new UnassignedShardWithoutConsumerIdleReBalancer(shardAssignMaster, resultContainer.getUnAssignedShardList(), resultContainer.getServerConsumeShardLoad()); case HEAVY_LOAD_BALANCE_NEEDED: return new HeavyLoadReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad()); case HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED: return new HeavyLoadWithConsumerIdleReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad(), resultContainer.getIdleServerList()); default: break; } return EMPTY_BALANCER; }}
          /** * 負(fù)載均衡策略統(tǒng)一接口 */interface StatusReBalanceStrategy {
          /** * 執(zhí)行負(fù)載均衡方法 */ public void loadBalance();}

          針對各種場景的負(fù)載均衡,各自實(shí)現(xiàn)如下。其中,無需操作時,將返回一個空操作實(shí)例!

          1. 空操作實(shí)例

          /** * 無需做平衡的控制器 * * @see ReBalanceStatusResultEnum#OK 狀態(tài)枚舉 */class EmptyReBalancer implements StatusReBalanceStrategy {    @Override    public void loadBalance() {        // ignore ...    }}

          2. 分配剩余shard給空閑的機(jī)器控制器

          /** * 為所有空閑的其他空閑機(jī)器分配可用 shard 的控制器 * * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態(tài)枚舉 */class UnAssignedShardWithConsumerIdleReBalancer implements StatusReBalanceStrategy {
          /** * 未被分配的分片列表 */ private List<String> unAssignedShardList;
          /** * 分片分配管理者實(shí)例 */ private ShardAssignMaster shardAssignMaster;
          /** * 空閑的機(jī)器列表 */ private List<String> idleServerList;
          public UnAssignedShardWithConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, List<String> unAssignedShardList, List<String> idleServerList) { this.shardAssignMaster = shardAssignMaster; this.unAssignedShardList = unAssignedShardList; this.idleServerList = idleServerList; }
          @Override public void loadBalance() { // 1. 找出還未被消費(fèi)的shard // 2. 依次分配給各空閑機(jī)器,每個空閑機(jī)器只至多分配一個shard int serverIndex = 0; for (String shard1 : unAssignedShardList) { // 輪詢分配shard, 先只給一個機(jī)器分配一個shard if(serverIndex >= idleServerList.size()) { break; } String serverHostname = idleServerList.get(serverIndex++); shardAssignMaster.assignShardToServer(shard1, serverHostname); } }}

          3. 分配剩余shard給負(fù)載低的機(jī)器的控制器

          /** * 有空閑shard場景 的控制器 , 須找出負(fù)載最低的機(jī)器塞入shard到現(xiàn)有的機(jī)器中(可能是有機(jī)器下線導(dǎo)致) * * @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態(tài)枚舉 */class UnassignedShardWithoutConsumerIdleReBalancer implements StatusReBalanceStrategy {
          /** * 未被分配分片列表 */ private List<String> unAssignedShardList;
          /** * 分片管理者實(shí)例 */ private ShardAssignMaster shardAssignMaster;
          /** * 消費(fèi)者負(fù)載情況 */ private Map<String, Integer> consumerLoadCount;
          public UnassignedShardWithoutConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, List<String> unAssignedShardList, Map<String, Integer> consumerLoadCount) { this.shardAssignMaster = shardAssignMaster; this.unAssignedShardList = unAssignedShardList; this.consumerLoadCount = consumerLoadCount; }
          @Override public void loadBalance() { // 1. 找出負(fù)載最低的機(jī)器 // 2. 依次分配shard給該機(jī)器 // 3. 分配的后負(fù)載數(shù)+1, 循環(huán)分配 // 先根據(jù)空閑數(shù),計(jì)算出一個可以接受新shard的機(jī)器的shard負(fù)載最低值,然后依次分配給這些機(jī)器 for (String shard1 : unAssignedShardList) { // 按負(fù)載最小分配原則 分配shard Map.Entry<String, Integer> minLoadServer = getMinLoadServer(consumerLoadCount); String serverHostname = minLoadServer.getKey();
          // 分配shard給機(jī)器 shardAssignMaster.assignShardToServer(shard1, serverHostname);
          // 負(fù)載數(shù) +1 minLoadServer.setValue(minLoadServer.getValue() + 1); } }
          /** * 獲取負(fù)載最小的機(jī)器名備用 * * @param loadCount 負(fù)載數(shù)據(jù) * @return 最小負(fù)載機(jī)器 */ private Map.Entry<String, Integer> getMinLoadServer(Map<String, Integer> loadCount) { int minCount = MAX_CONSUMER_SHARD_LOAD; Map.Entry<String, Integer> minLoadServer = null; for(Map.Entry<String, Integer> server1 : loadCount.entrySet()) { if(server1.getValue() < minCount) { minCount = server1.getValue(); minLoadServer = server1; } } return minLoadServer; }}

          4. 將現(xiàn)有機(jī)器消費(fèi)情況做重分配,從而使各自負(fù)載相近控制器

          /** * 負(fù)載不均衡導(dǎo)致的 重新均衡控制器,將消費(fèi)shard多的機(jī)器的 shard 拆解部分到 消費(fèi)少的機(jī)器上 (須上鎖) * * @see ReBalanceStatusResultEnum#HEAVY_LOAD_BALANCE_NEEDED 狀態(tài)枚舉 */class HeavyLoadReBalancer implements StatusReBalanceStrategy {
          /** * 分片分配管理者實(shí)例 */ private ShardAssignMaster shardAssignMaster;
          /** * 機(jī)器消費(fèi)負(fù)載情況 */ private Map<String, Integer> consumerLoadCount;
          public HeavyLoadReBalancer(ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount) { this.shardAssignMaster = shardAssignMaster; this.consumerLoadCount = consumerLoadCount; }
          @Override public void loadBalance() { // 1. 找出所有機(jī)器的消費(fèi)數(shù)的平均線值 // 2. 負(fù)載數(shù)大于均線1的,直接抽出多余的shard, 放到待分配容器中 // 3. 從大到小排序負(fù)載機(jī)器 // 4. 從大的負(fù)載上減少shard到最后的機(jī)器上,直到小的機(jī)器達(dá)到平均負(fù)載線最貼近的地方,或者小的機(jī)器到達(dá)平均負(fù)載線最貼近的地方 // 5. ++大負(fù)載機(jī)器 或者 --小負(fù)載機(jī)器,下一次循環(huán) double avgLoadCount = computeAliveServersAvgLoadCount(consumerLoadCount); List<Map.Entry<String, Integer>> sortedLoadCountList = sortLoadCountByLoadWithSmallEndian(consumerLoadCount); int bigLoadIndex = 0; int smallLoadIndex = sortedLoadCountList.size() - 1; for (;;) { // 首先檢測是否已遍歷完成,完成后不再進(jìn)行分配 if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) { break; } Map.Entry<String, Integer> bigLoadServerEntry = sortedLoadCountList.get(bigLoadIndex); double canTakeCountFromBigLoad = bigLoadServerEntry.getValue() - avgLoadCount; if(canTakeCountFromBigLoad < 1) { bigLoadIndex += 1; continue; } for (int reAssignShardIndex = 0; reAssignShardIndex < canTakeCountFromBigLoad; reAssignShardIndex++) { if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) { break; } Map.Entry<String, Integer> smallLoadServerEntry = sortedLoadCountList.get(smallLoadIndex); double canPutIntoSmallLoad = avgLoadCount - smallLoadServerEntry.getValue(); if(canPutIntoSmallLoad < 1) { smallLoadIndex -= 1; continue; } // 此處可以使用管道操作,更流暢, 或者更準(zhǔn)確的說,使用事務(wù)操作 // 從 bigLoad 中移除shard 0 // 將移除的 shard 上鎖,以防后續(xù)新機(jī)器立即消費(fèi),導(dǎo)致數(shù)據(jù)異常 // 添加新shard到 smallLoad 中 String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(bigLoadServerEntry.getKey()); bigLoadServerEntry.setValue(bigLoadServerEntry.getValue() - 1);
          // 上鎖分片,禁用消費(fèi) shardAssignMaster.lockShardId(firstLoadSHardId);
          // 添加shard到 smallLoad 中 shardAssignMaster.assignShardToServer(firstLoadSHardId, smallLoadServerEntry.getKey()); smallLoadServerEntry.setValue(smallLoadServerEntry.getValue() + 1); } } }
          /** * 判定輪詢是否完成 * * @param startIndex 開始下標(biāo) * @param endIndex 結(jié)束下標(biāo) * @return true: 輪詢完成, false: 未完成 */ private boolean isRoundRobinComplete(int startIndex, int endIndex) { return startIndex == endIndex; }
          /** * 從大到小排序 負(fù)載機(jī)器 * * @param consumerLoadCount 總負(fù)載情況 * @return 排序后的機(jī)器列表 */ private List<Map.Entry<String, Integer>> sortLoadCountByLoadWithSmallEndian(Map<String, Integer> consumerLoadCount) { List<Map.Entry<String, Integer>> sortedList = new ArrayList<>(consumerLoadCount.entrySet()); sortedList.sort(new Comparator<Map.Entry<String, Integer>>() { @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o2.getValue() - o1.getValue(); } }); return sortedList; }
          /** * 計(jì)算平均每臺機(jī)器的消費(fèi)shard負(fù)載 * * @param loadCount 總負(fù)載指標(biāo)容器 * @return 負(fù)載均線 */ private double computeAliveServersAvgLoadCount(Map<String, Integer> loadCount) { int totalServerCount = loadCount.size(); int totalShardCount = 0; for(Integer consumeShardCount : loadCount.values()) { totalShardCount += consumeShardCount; } return (double) totalShardCount / totalServerCount; }}

          5. 從負(fù)載重的機(jī)器上剝奪shard,分配給空閑的機(jī)器 控制器

          /** *  負(fù)載不均衡,且存在空閑的機(jī)器, 此時應(yīng)是 均值與最大值之間相差較小值,但是至少有一個 消費(fèi)2 的機(jī)器,可以剝奪其1個shard給空閑機(jī)器 的控制器 * * @see ReBalanceStatusResultEnum#HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED 狀態(tài)枚舉 */class HeavyLoadWithConsumerIdleReBalancer implements StatusReBalanceStrategy {
          /** * 分片分配管理者實(shí)例 */ private ShardAssignMaster shardAssignMaster;
          /** * 空閑的機(jī)器列表 */ private List<String> idleServerList;
          /** * 機(jī)器消費(fèi)負(fù)載情況 */ private Map<String, Integer> consumerLoadCount;
          public HeavyLoadWithConsumerIdleReBalancer( ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount, List<String> idleServerList) { this.shardAssignMaster = shardAssignMaster; this.consumerLoadCount = consumerLoadCount; this.idleServerList = idleServerList; }
          @Override public void loadBalance() { // 1. 找出還未被消費(fèi)的shard // 2. 分配一個給自己 // 3. 如果還有其他機(jī)器也未分配,則同樣進(jìn)行分配 for (String idleHostname1 : idleServerList) { Map.Entry<String, Integer> maxLoadEntry = getMaxLoadConsumerEntry(consumerLoadCount); // 本身只有一個則不再分配負(fù)擔(dān)了 if(maxLoadEntry.getValue() <= 1) { break; } String maxLoadServerHostname = maxLoadEntry.getKey();
          // 此處可以使用管道操作,更流暢, 或者更準(zhǔn)確的說,使用事務(wù)操作 // 從 bigLoad 中移除shard 0 // 將移除的 shard 上鎖,以防后續(xù)新機(jī)器立即消費(fèi),導(dǎo)致數(shù)據(jù)異常 // 添加新shard到 smallLoad 中 String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(maxLoadServerHostname); maxLoadEntry.setValue(maxLoadEntry.getValue() - 1);
          // 上鎖卸載下來的shard,鎖定50s shardAssignMaster.lockShardId(firstLoadSHardId);
          // 添加shard到 smallLoad 中 shardAssignMaster.assignShardToServer(firstLoadSHardId, idleHostname1); consumerLoadCount.put(idleHostname1, 1); } }
          /** * 獲取負(fù)載最大的機(jī)器實(shí)例作 * * @param consumerLoadCount 所有機(jī)器的負(fù)載情況 * @return 最大負(fù)載機(jī)器實(shí)例 */ private Map.Entry<String, Integer> getMaxLoadConsumerEntry(Map<String, Integer> consumerLoadCount) { Integer maxConsumeCount = 0; Map.Entry<String, Integer> maxEntry = null; for (Map.Entry<String, Integer> server1 : consumerLoadCount.entrySet()) { if(server1.getValue() > maxConsumeCount) { maxConsumeCount = server1.getValue(); maxEntry = server1; } } return maxEntry; }}

          如上,各個平衡策略,實(shí)現(xiàn)各自的功能,就能掌控整個集群的消費(fèi)控制了!

          除了上面的主料,還有一些附帶的東西!

          【2】均衡狀態(tài)枚舉值如下:

          /** * 再平衡檢測結(jié)果類型枚舉 * */public enum ReBalanceStatusResultEnum {
          /** * 一切正常,無須操作 */ OK("一切正常,無須操作"),
          /** * 有新下線機(jī)器,可以將其分片分配給其他機(jī)器 */ UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS("有未分配的分片,可以分配給其他機(jī)器"),
          /** * 有未分配的分片,且有空閑機(jī)器,直接將空閑shard分配給空閑機(jī)器即可(最好只分配1個,以便其他機(jī)器啟動后可用) */ UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS("有未分配的分片,且有空閑機(jī)器"),
          /** * 負(fù)載不均衡,須生平衡 */ HEAVY_LOAD_BALANCE_NEEDED("負(fù)載不均衡,須生平衡"),
          /** * 負(fù)載不均衡,且存在空閑的機(jī)器, 此時應(yīng)是 均值與最大值之間相差較小值,但是至少有一個 消費(fèi)2 的機(jī)器,可以剝奪其1個shard給空閑機(jī)器 */ HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED("負(fù)載不均衡,且存在空閑的機(jī)器"),
          ;
          private ReBalanceStatusResultEnum(String remark) { // ignore }}

          【3】RedisKeyConstants 常量定義

          /** * redis 相關(guān)常量定義 */public class RedisKeyConstants {
          /** * 在線機(jī)器緩存key.與心跳同時作用 * * @see #SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX */ public static final String ALL_ONLINE_SERVER_CACHE_KEY = "prefix:active.servers";
          /** * 機(jī)器消費(fèi)shard情況 緩存key前綴 */ public static final String SERVER_CONSUME_CACHE_KEY_PREFIX = "prefix:log.consumer:server:";
          /** * 分片被分配情況 緩存key前綴 */ public static final String SHARD_ASSIGNED_CACHE_KEY_PREFIX = "prefix:shard.assigned:id:";
          /** * 分片鎖 緩存key前綴, 當(dāng)上鎖時,任何機(jī)器不得再消費(fèi) */ public static final String SHARD_LOCK_CONSUME_CACHE_PREFIX = "prefix:consume.lock.shard:id:";
          /** * 存活機(jī)器心跳,與上面的機(jī)器形成呼應(yīng) * * @see #ALL_ONLINE_SERVER_CACHE_KEY */ public static final String SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX = "prefix:log.consumer:server.heartbeat:";
          /** * 單個消費(fèi)者最大消費(fèi)負(fù)載數(shù) (一個不可能達(dá)到的值) */ public static final Integer MAX_CONSUMER_SHARD_LOAD = 9999;}

          【4】shard分配控制器負(fù)責(zé)所有shard分配

          import com.test.utils.RedisPoolUtil;
          import java.util.ArrayList;import java.util.List;import java.util.Set;
          /** * shard分配管理者 (盡量使用接口表達(dá)) * */public class ShardAssignMaster {
          private RedisPoolUtil redisPoolUtil;
          public ShardAssignMaster(RedisPoolUtil redisPoolUtil) { this.redisPoolUtil = redisPoolUtil; }
          /** * 注冊消費(fèi)者到 控制中心(注冊中心) */ public void registerConsumer(String serverHostname) { // 注冊server到 redis zset 中,如有條件,可以使用 zk 進(jìn)行操作,也許更好 redisPoolUtil.zadd(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, (double)System.currentTimeMillis(), serverHostname); }
          /** * 心跳發(fā)送數(shù)據(jù) */ public void sendHeartbeat(String serverHostname) { String heartbeatCacheKey = RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + serverHostname; redisPoolUtil.set(heartbeatCacheKey, "1", 30); }
          /** * 檢測指定消費(fèi)者服務(wù)器還存活與否 * * @param consumeHostname 機(jī)器名 * @return true: 存活, false: 宕機(jī) */ public boolean isConsumerServerAlive(String consumeHostname) { String aliveValue = redisPoolUtil.get(RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + consumeHostname); return aliveValue != null && "1".equals(aliveValue); } /** * 獲取并刪除指定server的所屬消費(fèi)的第一個 shardId * * @param serverHostname 機(jī)器名 * @return 第一個shardId */ public String popServerFirstConsumeShardId(String serverHostname) { String bigLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; Set<String> firstLoadShardSet = redisPoolUtil.zrange(bigLoadConsumerServerCacheKey, 0, 0); String firstLoadSHardId = firstLoadShardSet.iterator().next(); redisPoolUtil.zrem(bigLoadConsumerServerCacheKey, firstLoadSHardId); redisPoolUtil.expire(bigLoadConsumerServerCacheKey, 60); return firstLoadSHardId; }
          /** * 對shard進(jìn)行上鎖,禁止所有消費(fèi)行為 * * @param shardId 分片id */ public void lockShardId(String shardId) { String shardLockCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId; redisPoolUtil.set(shardLockCacheKey, "1", 50); }
          /** * 分配shard分片數(shù)據(jù)給 指定server * * @param shardId 分片id * @param serverHostname 分配給的消費(fèi)者機(jī)器名 */ public void assignShardToServer(String shardId, String serverHostname) { String smallLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; redisPoolUtil.zadd(smallLoadConsumerServerCacheKey, (double)System.currentTimeMillis(), shardId); redisPoolUtil.expire(smallLoadConsumerServerCacheKey, 60);
          // 更新新的shard消費(fèi)者標(biāo)識 String shardIdAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; redisPoolUtil.set(shardIdAssignCacheKey, serverHostname); }
          /** * 獲取被分配了shardId的server信息 * * @param shardId 要檢查的分片id * @return 被分配了shardId 的機(jī)器名 */ public String getShardAssignedServer(String shardId) { String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; return redisPoolUtil.get(shardAssignCacheKey); }
          /** * 刪除shard的分配信息,使無效化 * * @param shardId 要刪除的分片id */ public void invalidateShardAssignInfo(String shardId) { String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId; redisPoolUtil.del(shardAssignCacheKey); }
          /** * 清理下線機(jī)器 * * @param hostname 下線機(jī)器名 */ public void invalidateOfflineServer(String hostname) { redisPoolUtil.zrem(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, hostname); }
          /** * 獲取機(jī)器消費(fèi)的shard列表 * * @param serverHostname 機(jī)器主機(jī)名 * @return shard列表 或者 null */ public Set<String> getServerDutyConsumeShardSet(String serverHostname) { String serverDutyConsumeShardCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname; return redisPoolUtil.zrange(serverDutyConsumeShardCacheKey, 0, -1); }
          /** * 獲取所有在線機(jī)器列表 * * @return 在線機(jī)器列表 */ public List<String> getAllOnlineServerList() { Set<String> hostnameSet = redisPoolUtil.zrange(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, 0, -1); return new ArrayList<>(hostnameSet); }
          }

          以上是協(xié)同負(fù)載均衡器代碼實(shí)現(xiàn)。

          【5】當(dāng)然你還需要一個消費(fèi)者

          接下來我們還要看下消費(fèi)者如何實(shí)現(xiàn)消費(fèi)。

          import com.test.utils.RedisPoolUtil;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;
          import java.net.InetAddress;import java.net.UnknownHostException;import java.time.LocalDateTime;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;
          /** * 消費(fèi)者業(yè)務(wù)線程 * */public class LoghubConsumeWorker implements Runnable {
          private static final Logger logger = LoggerFactory.getLogger(LoghubConsumeWorker.class);
          private RedisPoolUtil redisPoolUtil;
          private String HOST_NAME;
          /** * 因消費(fèi)者數(shù)目不一定,所以使用 CachedThreadPool */ private ExecutorService consumeExecutorService = Executors.newCachedThreadPool();
          public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil) { this(redisPoolUtil, null); }
          public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil, String hostName) { this.redisPoolUtil = redisPoolUtil; // 為測試需要添加的 hostName HOST_NAME = hostName; initSharedVars(); }
          /** * 初始化公共變量 */ private void initSharedVars() { try { if(HOST_NAME != null) { return; } HOST_NAME = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { throw new RuntimeException("init error : 獲取服務(wù)器主機(jī)名失敗"); } }

          @Override public void run() { while (!Thread.interrupted()) { // 先獲取所有分配給的shard列表,為空則進(jìn)入下一次循環(huán)(注意此時阻塞鎖不能起作用) Set<String> shardsSet = blockingTakeAvailableConsumeShardList(); try { // 消費(fèi)所有給定shard數(shù)據(jù) consumeLogHubShards(shardsSet); } catch (Exception e) { logger.error("消費(fèi)loghub, error", e); }
          }
          }
          /** * 獲取可用的分片列表(沒有則阻塞等待) * * @return 分片列表 */ private Set<String> blockingTakeAvailableConsumeShardList() { while (!Thread.interrupted()) { String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME; Set<String> shardsSet = redisPoolUtil.zrange(serverConsumeCacheKey, 0, -1); if (shardsSet != null && !shardsSet.isEmpty()) { return shardsSet; } logger.warn(" =========== 當(dāng)前主機(jī)[hostname:{}]未查詢到任何shard =========", HOST_NAME); try { Thread.sleep(1000); } catch (InterruptedException e) { logger.error("LogHubClientWork run 未獲取到該主機(jī)的shard時,每隔1秒鐘獲取 ,error : {}", e); } } return null; }
          /** * 消費(fèi)loghub 分片數(shù)據(jù) * * @param shardsSet 被分配的分片列表 */ public void consumeLogHubShards(Set<String> shardsSet) throws InterruptedException { if(shardsSet == null || shardsSet.isEmpty()) { return; } // 此處使用 CountdownLatch, 保證至少有一個任務(wù)完成時,才開始下一次任務(wù)的調(diào)入// Semaphore semaphoreLock = new Semaphore(shardsSet.size()); CountDownLatch openDoorLatch = new CountDownLatch(1); boolean startNewJobAtLeastOnce = false; for (String shard : shardsSet) { // 檢測當(dāng)前shard是否處于鎖定狀態(tài),如果鎖定則不能消費(fèi), 注意鎖情況 if(isShardLocked(shard)) { logger.info("=============== shard:{} is locked, continue... ======", shard); continue; } int shardId = Integer.parseInt(shard); LoghubConsumerTaskExecutor consumer = getConsumerExecutor(shardId); // consumer 應(yīng)保證有所消費(fèi),如果沒有消費(fèi),則自行等待一個長周期,外部應(yīng)只管調(diào)入請求 // consumer 應(yīng)保證所有消費(fèi),在上一個任務(wù)未完成時,不得再開啟下一輪提交消費(fèi) boolean startNewJob = consumer.startNewConsumeJob(openDoorLatch); if(startNewJob) { // start failed, prev job is running maybe // ignore job, no blocking startNewJobAtLeastOnce = true; } } // 任意一個任務(wù)完成,都將打開新的分配周期,且后續(xù) countDown 將無效,此處可能導(dǎo)致死鎖 if(startNewJobAtLeastOnce) { openDoorLatch.await(); } else { // 當(dāng)本次分配調(diào)度一個任務(wù)都未提交時,則睡眠等待 // (一般此情況為 消費(fèi)者被分配了上了鎖的shard時,即搶占另的機(jī)器的shard, 需要給別的機(jī)器備份數(shù)據(jù)時間鎖) Thread.sleep(200); } }
          /** * 檢測分片是否被鎖定消費(fèi)了 * * @param shardId 分片id * @return true:鎖定, false:未鎖定可用 */ private boolean isShardLocked(String shardId) { String shardCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId; String lockValue = redisPoolUtil.get(shardCacheKey); return !StringUtils.isBlank(lockValue) && "1".equals(lockValue); }
          /** * 獲取消費(fèi)者實(shí)例,針對一個shard, 只創(chuàng)建一個實(shí)例 */ private Map<Integer, LoghubConsumerTaskExecutor> mShardConsumerMap = new ConcurrentHashMap<>(); private LoghubConsumerTaskExecutor getConsumerExecutor(final int shardId) { LoghubConsumerTaskExecutor consumer = mShardConsumerMap.get(shardId); if (consumer != null) { return consumer; } consumer = new LoghubConsumerTaskExecutor(new SingleShardConsumerJob(shardId)); mShardConsumerMap.put(shardId, consumer); logger.info(" ======================= create new consumer executor shard:{}", shardId); return consumer; }
          /** * 消費(fèi)者調(diào)度器 * * 統(tǒng)一控制消費(fèi)者的運(yùn)行狀態(tài)管控 */ class LoghubConsumerTaskExecutor {
          private Future<?> future;
          private ConsumerJob consumerJob;
          public LoghubConsumerTaskExecutor(ConsumerJob consumerJob) { this.consumerJob = consumerJob; }
          /** * 啟動一個新消費(fèi)任務(wù) * * @return true: 啟動成功, false: 啟動失敗有未完成任務(wù)在前 */ public boolean startNewConsumeJob(CountDownLatch latch) { if(future == null || future.isCancelled() || future.isDone()) { //沒有任務(wù)或者任務(wù)已取消或已完成 提交任務(wù) future = consumeExecutorService.submit(new Runnable() { @Override public void run() { try { consumerJob.consumeShardData(); } finally { latch.countDown(); } } }); return true; } return false; }
          }
          }
          /** * 消費(fèi)者任務(wù)接口定義 */interface ConsumerJob {
          /** * 消費(fèi)數(shù)據(jù)具體邏輯實(shí)現(xiàn) */ public void consumeShardData();}
          /** * 單個shard消費(fèi)的任務(wù)實(shí)現(xiàn) */class SingleShardConsumerJob implements ConsumerJob {
          /** * 當(dāng)前任務(wù)的消費(fèi) shardId */ private int shardId;
          public SingleShardConsumerJob(int shardId) { this.shardId = shardId; }
          @Override public void consumeShardData() { System.out.println(LocalDateTime.now() + " - host -> consume shard: " + shardId); try { // do complex biz // 此處如果發(fā)現(xiàn)shard 不存在異常,則應(yīng)回調(diào)協(xié)調(diào)器,進(jìn)行shard的移除 Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } }}

          【6】當(dāng)然你還需要一個demo

          看不到效果,我就是不信!
          所以來看個 demo 吧!
          我們使用單機(jī)開多個 單元測試用例,直接測試就好!

          測試代碼:.

          import com.test.common.config.LogHubProperties;import com.test.utils.RedisPoolUtil;import org.junit.Test;
          import java.io.IOException;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
          /** * 臨時測試 負(fù)載均衡 * */public class ShardConsumerLoadBalanceTest {
          public static void main(String[] args) throws IOException { startAConsumer(); System.in.read(); }
          // 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用 @Test public void mainMock() throws IOException { startAConsumer(); System.in.read(); }
          // 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用 @Test public void startNewConsumer() throws IOException { startAConsumer(); System.in.read(); }
          // 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用 @Test public void startNewConsumer2() throws IOException { startAConsumer(); System.in.read(); }

          private static void startAConsumer() { RedisPoolUtil redisPoolUtil = new RedisPoolUtil(); redisPoolUtil.setIp("127.0.0.1"); redisPoolUtil.setMaxActive(111); redisPoolUtil.setMaxIdle(1000); redisPoolUtil.setPort(6379); redisPoolUtil.setMaxWait(100000); redisPoolUtil.setTimeout(100000); redisPoolUtil.setPassWord("123"); redisPoolUtil.setDatabase(0); redisPoolUtil.initPool();
          LogHubProperties logHubProperties = new LogHubProperties(); logHubProperties.setProjectName("test"); logHubProperties.setEndpoint("cn-shanghai-finance-1.log.aliyuncs.com"); logHubProperties.setAccessKey("xxxx"); logHubProperties.setAccessKeyId("11111");
          // 使用隨機(jī) hostname 模擬多臺機(jī)器調(diào)用 Random random = new Random(); String myHostname = "my-host-" + random.nextInt(10);
          // 啟動管理線程 LoghubConsumerShardCoWorker shardCoWorker = new LoghubConsumerShardCoWorker(redisPoolUtil, logHubProperties, myHostname); ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); scheduledExecutorService.scheduleAtFixedRate(shardCoWorker, 5, 30, TimeUnit.SECONDS);
          // 啟動業(yè)務(wù)線程 ExecutorService executorService = Executors.newFixedThreadPool(2); LoghubConsumeWorker worker = new LoghubConsumeWorker(redisPoolUtil, myHostname);
          executorService.submit(worker);
          }}

          如上,就可以實(shí)現(xiàn)自己的負(fù)載均衡消費(fèi)了。

          比如: 總分片數(shù)為4。

          1. 最開始啟動1個機(jī)器時,將會被分配 0,1,2,3。
          2. 啟動兩個后,將分為 0,1; 2,3;
          3. 啟動3個后,將分為 0; 1; 2,3;
          4. 反之,關(guān)閉一個機(jī)器后,將把壓力分擔(dān)到原機(jī)器上。
          當(dāng)做負(fù)載重分配時,將有50秒的鎖定時間備份。

          【7】待完善的點(diǎn)

          本文是基于loghub實(shí)現(xiàn)的分片拉取,其實(shí)在這方面loghub與kafka是如出一轍的,只是loghub更商業(yè)產(chǎn)品化。

          當(dāng)shard縮減時,應(yīng)能夠自動發(fā)現(xiàn),從而去除原有的機(jī)器消費(fèi)分配。而不是讓消費(fèi)者報錯。

          注意進(jìn)行再均衡時,消費(fèi)者偏移量問題,尤其是你為了性能使用了jvm本地變量保存偏移時,注意刷新該變量偏移。本文沒有實(shí)現(xiàn)類似zookeeper強(qiáng)大的watch監(jiān)聽功能,但是有一個上鎖等待的過程,你可以基于這個鎖做一些力所能及的事!

          老話:可以適當(dāng)造輪子!





          往期精彩推薦



          騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)

          面試:史上最全多線程面試題 !

          最新阿里內(nèi)推Java后端面試題

          JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章


          END


          關(guān)注作者微信公眾號 —《JAVA爛豬皮》


          了解更多java后端架構(gòu)知識以及最新面試寶典


          你點(diǎn)的每個好看,我都認(rèn)真當(dāng)成了


          看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


          作者:等你歸去來

          出處:https://www.cnblogs.com/yougewe/p/11114348.html

          瀏覽 26
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點(diǎn)贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  精品久久123 | 久草福利网站首页 | 欧美性猛交ⅩXXX无码视频 | 国产乱伦大杂烩 | 操操操无码 |