自己動手實(shí)現(xiàn)一個loghub(或kafka)分片消費(fèi)負(fù)載均衡器
一般地,像kafka之類的消息中間件,作為一個可以保持歷史消息的組件,其消費(fèi)模型一般是主動拉取方式。這是為了給消費(fèi)者足夠的自由,回滾或者前進(jìn)。
然而,也正是由于將消費(fèi)消息的權(quán)力交給了消費(fèi)者,所以,消費(fèi)者往往需要承擔(dān)更多的責(zé)任。比如:需要自行保存消費(fèi)偏移量,以便后續(xù)可以知道從哪里繼續(xù)。而當(dāng)這一點(diǎn)處理不好時,則可能帶來一些麻煩。
不管怎么樣,解決方案也都是現(xiàn)成的,咱們也不用擔(dān)心。
今天我們要談?wù)摰氖且粋€場景:如何讓n個機(jī)器消費(fèi)m個分片數(shù)據(jù)?(帶狀態(tài)的,即不能任意機(jī)器消費(fèi)任意shard)
這在消息中間件的解決方案里,明白地寫著,使用消費(fèi)者群組就可以實(shí)現(xiàn)了。具體來說就是,每個分片至多會被一機(jī)器消費(fèi),每個機(jī)器則可以消費(fèi)多個分片數(shù)據(jù)。即機(jī)器數(shù)據(jù)小于分片數(shù)時,分片會被均衡地分配到消費(fèi)者中。當(dāng)機(jī)器數(shù)大于分片數(shù)時,多余的機(jī)器將不做任何事情。
好吧,既然官方已經(jīng)說明白了,那咱們應(yīng)該就不再需要自己搞一個輪子了吧。
但是,我還有個場景:如果我要求在機(jī)器做負(fù)載重平衡時,需要保證被抽取出去的機(jī)器分片,至少保留一段時間,不允許任何機(jī)器消費(fèi)該分片,因?yàn)榭赡苓€有數(shù)據(jù)需要備份。
針對這種場景,我想官方也許是有提供回調(diào)函數(shù)之類的解決方案的吧。不管了,反正我沒找到,只能自己先造個輪子了。
本文場景前提:
1. 使用loghub作為消息中間件(原理同kafka);
2. 整個數(shù)據(jù)有m個分片shard;
3. 整個消費(fèi)者集群有n臺機(jī)器;
4. 每個分片的數(shù)據(jù)需要集中到一機(jī)器上做有狀態(tài)處理;
5. 可以借助redis保存有狀態(tài)數(shù)據(jù),以便消費(fèi)者機(jī)器做優(yōu)雅停機(jī);
最簡單的方案是,使 n=m, 每臺機(jī)器消費(fèi)一個shard, 這樣狀態(tài)永遠(yuǎn)不會錯亂。
但是這樣明顯可擴(kuò)展能力太差了!
比如有時數(shù)據(jù)量小了,雖然分片還在,但是完全不用那么多機(jī)器的時候,如何縮減機(jī)器?
比如由于數(shù)據(jù)壓力大了,我想增加下分片數(shù),以提高發(fā)送者性能,但是消費(fèi)者我還不想理他,消費(fèi)慢點(diǎn)無所謂?
其實(shí),我們可以使用官方的消費(fèi)者群組方法,可以動態(tài)縮減機(jī)器。
但是這個有狀態(tài)就比較難做到了。
以上痛點(diǎn),總結(jié)下來就是,可擴(kuò)展性問題。
想象中的輪子是怎么樣的?
1. 需要有個注冊中心,管理機(jī)器的上下線監(jiān)控;
2. 需要有負(fù)載均衡器,負(fù)載將shard的負(fù)載均衡的分布到在線機(jī)器中;
3. 需要有每個機(jī)器自己消費(fèi)的分片記錄,以使機(jī)器自身有據(jù)可查;
4. 需要有每個分片的消費(fèi)情況,以判定出哪些分片已分配給哪些機(jī)器;
我們來細(xì)看下實(shí)現(xiàn):
【1】均衡協(xié)調(diào)器主框架:
import com.aliyun.openservices.log.Client;import com.aliyun.openservices.log.common.Shard;import com.aliyun.openservices.log.exception.LogException;import com.aliyun.openservices.log.response.ListShardResponse;import com.test.common.config.LogHubProperties;import com.test.utils.RedisPoolUtil;import com.google.common.collect.Lists;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.net.InetAddress;import java.net.UnknownHostException;import java.util.ArrayList;import java.util.Comparator;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Set;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import static com.test.dispatcher.work.RedisKeyConstants.MAX_CONSUMER_SHARD_LOAD;/*** loghub動態(tài)消費(fèi)者 shard分配shard 協(xié)調(diào)器**/public class LoghubConsumerShardCoWorker implements Runnable {private static final Logger logger = LoggerFactory.getLogger(LoghubConsumerShardCoWorker.class);private LogHubProperties logHubProperties;private RedisPoolUtil redisPoolUtil;private Client mClient;private ShardAssignMaster shardAssignMaster;private String HOST_NAME;public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties) {this(redisPoolUtil, logHubProperties, null);}public LoghubConsumerShardCoWorker(RedisPoolUtil redisPoolUtil, LogHubProperties logHubProperties, String hostName) {this.redisPoolUtil = redisPoolUtil;this.logHubProperties = logHubProperties;this.HOST_NAME = hostName;initSharedVars();initConsumerClient();initShardAssigner();getAllShardList();registerSelfConsumer();startHeartBeatThread();}/*** 開啟心跳線程,?;?/span>*/private void startHeartBeatThread() {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();executorService.scheduleAtFixedRate(() -> {String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME;redisPoolUtil.expire(serverConsumeCacheKey, 30);shardAssignMaster.sendHeartbeat(HOST_NAME);}, 30, 25, TimeUnit.SECONDS);}/*** 初始化客戶端實(shí)例*/private void initConsumerClient() {this.mClient = new Client(logHubProperties.getEndpoint(),logHubProperties.getAccessKeyId(), logHubProperties.getAccessKey());}/*** 初始化分片分配控制器*/private void initShardAssigner() {shardAssignMaster = new ShardAssignMaster(redisPoolUtil);}/*** 初始化公共變量*/private void initSharedVars() {try {if(HOST_NAME != null) {return;}HOST_NAME = InetAddress.getLocalHost().getHostName();}catch (UnknownHostException e) {logger.error("init error : 獲取服務(wù)器主機(jī)名失敗", e);throw new RuntimeException("init error : 獲取服務(wù)器主機(jī)名失敗");}}/*** 將自己作為消費(fèi)者注冊到消費(fèi)者列表中,以判定后續(xù)可以進(jìn)行消費(fèi)*/private void registerSelfConsumer() {shardAssignMaster.registerConsumer(HOST_NAME);shardAssignMaster.sendHeartbeat(HOST_NAME);}@Overridepublic void run() {try {checkConsumerSharding();}catch (Exception e) {logger.error("動態(tài)分配shard 發(fā)生異常", e);}}/*** job 只做一件事,即檢查 shard 的消費(fèi)情況,不平衡則處理*/private void checkConsumerSharding() {try {if (tryCoWorkerLock()) {// step1. 檢查是否需要進(jìn)行shard分配// 集群消費(fèi)loghub數(shù)據(jù)動態(tài)伸縮策略// 1. 啟動時先去獲取部分片數(shù),備用;// 2. 應(yīng)用啟動后,把自己注冊到注冊中心或redis中;// 3. 根據(jù)注冊上來的機(jī)器列表,按平均分配策略分配shard(只能由一個機(jī)器來分配,其他機(jī)器處理分布式鎖競爭失敗,等待狀態(tài));// 4. 分配好后,釋放鎖,各機(jī)器開始消費(fèi),如機(jī)器A消費(fèi)shard 0/3,則機(jī)器1以輪詢的方式依次從shard 0/3 摘取數(shù)據(jù)消費(fèi);// 5. 分配好的數(shù)據(jù)結(jié)構(gòu)為:prefix+ip保存具體數(shù)據(jù),另外將自己的key添加到另一個zset中,標(biāo)識自己存活;自己的key有效期為30秒;使用另一維度 shard,保存每個shard被占用情況,使用hash保存,key為shard,value為當(dāng)有占用時為機(jī)器ip或主機(jī)名,當(dāng)無占用時為null或空串;// 6. 以上數(shù)據(jù)刷入,將在機(jī)器搶占到shard更新數(shù)據(jù);shard總數(shù)信息暫時不允許在運(yùn)行期間進(jìn)行變更;(即如果變理shard必須重啟服務(wù)器)// 7. 機(jī)器下線時,占用的key將自動過期;(考慮是否主動刪除)// 8. 各機(jī)器上啟動一個后臺掃描線程,每隔30秒掃描一次。掃描zset,取出所有值后查看是否存在相應(yīng)的key,如果不存在說明機(jī)器已下線,需要重新分配其占用的shard;// 9. 重新分配策略,使用一致性hash算法實(shí)現(xiàn);// 10. 機(jī)器上線時,使用一致性hash算法重新平衡shard;// 11. 使用分布式鎖保證分配進(jìn)程只有一個;CheckConsumerShardingResultContainer resultContainer = checkShardConsumerReBalanceStatus();if(resultContainer.getStatusResultType() != ReBalanceStatusResultEnum.OK) {reBalanceConsumerShard(resultContainer);}}}finally {releaseCoWorkerLock();}}/*** 確認(rèn)機(jī)器和shard是否需要再平衡** @return 結(jié)果狀態(tài)集*/private CheckConsumerShardingResultContainer checkShardConsumerReBalanceStatus() {// step1. 檢查自身是否存在shard, 不存在則立即進(jìn)行一次重分配(消費(fèi)者機(jī)器數(shù)大于分片數(shù)時,重平衡動作將是無效動作)// step2. 檢查所有shard列表,是否有未被分配的shard,如有,立即觸發(fā)一次重分配// step3. 檢查是否有負(fù)荷比較高的機(jī)器,如有觸發(fā)平衡(功能預(yù)留,此功能需要基于統(tǒng)計(jì)信息)CheckConsumerShardingResultContainer resultContainer = new CheckConsumerShardingResultContainer();final List<String> activeServersList = shardAssignMaster.getAllOnlineServerList();final List<String> allShardList = getAllShardList();// 計(jì)算空閑機(jī)器Map<String, Integer> hostConsumeLoadCountMap = new HashMap<>();List<String> idleServerList = filterIdleServerList(activeServersList, hostConsumeLoadCountMap);// 計(jì)算未被分配的shardList<String> unAssignedShardList = filterUnAssignedShardList(allShardList);// 根據(jù)資源信息,得出目前的負(fù)載狀態(tài)ReBalanceStatusResultEnum statusResult = computeReBalanceStatusOnResources(unAssignedShardList, idleServerList, hostConsumeLoadCountMap);resultContainer.setAllServerList(activeServersList);resultContainer.setAllShardList(allShardList);resultContainer.setIdleServerList(idleServerList);resultContainer.setUnAssignedShardList(unAssignedShardList);resultContainer.setServerConsumeShardLoad(hostConsumeLoadCountMap);resultContainer.setStatusResultType(statusResult);return resultContainer;}/*** 根據(jù)給定資源信息,計(jì)算出目前的負(fù)載狀態(tài)** @param unAssignedShardList 未分配的shard列表* @param idleServerList 空閑機(jī)器列表* @param hostConsumeLoadMap 機(jī)器消費(fèi)計(jì)數(shù)容器(負(fù)載情況)* @return 狀態(tài)值*/private ReBalanceStatusResultEnum computeReBalanceStatusOnResources(List<String> unAssignedShardList,List<String> idleServerList,Map<String, Integer> hostConsumeLoadMap) {// 沒有未分配的shard,檢測是否平衡即可// 0. 有空閑機(jī)器,則直接分配給空閑機(jī)器即可// 1. 最大消費(fèi)shard-最小消費(fèi)shard數(shù) >= 2, 則說明有機(jī)器消費(fèi)過多shard,需重分配// 2. 機(jī)器負(fù)載平衡,無須調(diào)整if(unAssignedShardList.isEmpty()) {int minConsume = MAX_CONSUMER_SHARD_LOAD;int maxConsume = 0;for (Map.Entry<String, Integer> entry : hostConsumeLoadMap.entrySet()) {int gotCount = entry.getValue();if(gotCount > maxConsume) {maxConsume = gotCount;}if(gotCount < minConsume) {minConsume = gotCount;}}// 因有未分配的機(jī)器,假如現(xiàn)有的機(jī)器消費(fèi)都是2,則需要重分配的大壓力的機(jī)器 shard 給空閑機(jī)器if(!idleServerList.isEmpty()) {if (maxConsume > 1) {return ReBalanceStatusResultEnum.HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED;}}// 有消費(fèi)相差2的機(jī)器,重新分配,從大數(shù)上借調(diào)到小數(shù)上if(maxConsume > minConsume + 1) {return ReBalanceStatusResultEnum.HEAVY_LOAD_BALANCE_NEEDED;}return ReBalanceStatusResultEnum.OK;}// 有可用shard// 3. 有空閑機(jī)器,直接讓空閑shard分配給這些空閑機(jī)器就ok了// 4. 沒有空閑機(jī)器,須將空閑shard 分配給負(fù)載小的機(jī)器if(idleServerList.isEmpty()) {return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS;}return ReBalanceStatusResultEnum.UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS;}/*** 過濾出空閑的機(jī)器列表** @param activeServersList 所有機(jī)器列表* @return 空閑機(jī)器集, 且將各自消費(fèi)數(shù)放入計(jì)數(shù)容器*/private List<String> filterIdleServerList(List<String> activeServersList, Map<String, Integer> hostConsumeCountMap) {List<String> idleServerList = new ArrayList<>();for (String hostname1 : activeServersList) {if(!shardAssignMaster.isConsumerServerAlive(hostname1)) {shardAssignMaster.invalidateOfflineServer(hostname1);continue;}int consumeCount;Set<String> consumeShardSet = shardAssignMaster.getServerDutyConsumeShardSet(hostname1);if(consumeShardSet == null || consumeShardSet.isEmpty()) {idleServerList.add(hostname1);consumeCount = 0;}else {consumeCount = consumeShardSet.size();}hostConsumeCountMap.put(hostname1, consumeCount);}return idleServerList;}/*** 過濾出未分配的shard列表** @param allShardList 所有shard* @return 未分配的shard*/private List<String> filterUnAssignedShardList(List<String> allShardList) {List<String> unAssignedShardList = new ArrayList<>();for (String shardId1 : allShardList) {String consumeHostname = shardAssignMaster.getShardAssignedServer(shardId1);// 如果不為空,則之前分配過,檢查機(jī)器是否下線// 如果為空,則是第一次分配if(!StringUtils.isBlank(consumeHostname)) {if(!shardAssignMaster.isConsumerServerAlive(consumeHostname)) {// 清除下線機(jī)器信息,將當(dāng)前shard置為空閑shardAssignMaster.invalidateOfflineServer(consumeHostname);shardAssignMaster.invalidateShardAssignInfo(shardId1);unAssignedShardList.add(shardId1);}}else {unAssignedShardList.add(shardId1);}}return unAssignedShardList;}/*** 嘗試獲取協(xié)調(diào)者協(xié)調(diào)鎖** 在集群環(huán)境中,只允許有一個協(xié)調(diào)器在運(yùn)行** @return true:成功, false:失敗,不得進(jìn)行協(xié)調(diào)分配工作*/private boolean tryCoWorkerLock() {return redisPoolUtil.getDistributedLock("distributedLock", HOST_NAME, 30);}/*** 釋放協(xié)調(diào)鎖,以便下次再競爭*/private void releaseCoWorkerLock() {redisPoolUtil.releaseDistributedLock("distributedLock", HOST_NAME);}/*** 重新平衡消費(fèi)者和shard的關(guān)系** @param resultContainer 待重平衡狀態(tài)*/private void reBalanceConsumerShard(CheckConsumerShardingResultContainer resultContainer) {// 集群消費(fèi)loghub數(shù)據(jù)動態(tài)伸縮策略,根據(jù)負(fù)載狀態(tài),調(diào)用相應(yīng)策略進(jìn)行重平衡StatusReBalanceStrategy strategy = StatusReBalanceStrategyFactory.createStatusReBalanceAlgorithm(resultContainer, shardAssignMaster);strategy.loadBalance();}/*** 獲取分片列表** @return 分片列表,如: 0,1,2,3*/private List<String> getAllShardList() {// 實(shí)時讀取列表List<String> shardList = Lists.newArrayList();try {ListShardResponse listShardResponse = mClient.ListShard(logHubProperties.getProjectName(),logHubProperties.getEventlogStore());ArrayList<Shard> getShards = listShardResponse.GetShards();for (Shard shard : getShards) {shardList.add(shard.GetShardId() + "");}}catch (LogException e) {logger.error("loghub 獲取shard列表 error :", e);}return shardList;}}
如上,就是協(xié)調(diào)均衡主框架。主要邏輯如下:
1. 啟動時初始化各種端,分配器,注冊自己到控制中心等等;
2. 以線程的形式,被外部以定時任務(wù)執(zhí)行的方式調(diào)用;
3. 檢查任務(wù)前,須獲得檢查鎖,否則直接返回;
4. 先獲得目前機(jī)器的所有消費(fèi)情況和shard的分配情況,得出資源負(fù)載數(shù)據(jù);
5. 根據(jù)得到的數(shù)據(jù)信息,推算出目前的平衡狀態(tài);
6. 根據(jù)平衡狀態(tài),調(diào)用相應(yīng)的平衡策略進(jìn)行重平衡;
7. 等待下一次調(diào)度;
檢查結(jié)果將作為后續(xù)選擇均衡策略的依據(jù),所以需要相應(yīng)的狀態(tài)容器保存。如下:
/*** 集群狀態(tài)預(yù)檢查 結(jié)果容器*/class CheckConsumerShardingResultContainer {/*** 所有shard列表*/private List<String> allShardList;/*** 未被分配的shard列表*/private List<String> unAssignedShardList;/*** 所有機(jī)器列表*/private List<String> allServerList;/*** 空閑的機(jī)器列表(未被分配shard)*/private List<String> idleServerList;/*** 機(jī)器消費(fèi)shard的負(fù)載計(jì)數(shù)容器*/private Map<String, Integer> serverConsumeShardLoad;/*** 狀態(tài)檢查結(jié)果類型*/private ReBalanceStatusResultEnum statusResultType;public Map<String, Integer> getServerConsumeShardLoad() {return serverConsumeShardLoad;}public void setServerConsumeShardLoad(Map<String, Integer> serverConsumeShardLoad) {this.serverConsumeShardLoad = serverConsumeShardLoad;}public List<String> getAllShardList() {return allShardList;}public void setAllShardList(List<String> allShardList) {this.allShardList = allShardList;}public List<String> getUnAssignedShardList() {return unAssignedShardList;}public void setUnAssignedShardList(List<String> unAssignedShardList) {this.unAssignedShardList = unAssignedShardList;}public List<String> getAllServerList() {return allServerList;}public void setAllServerList(List<String> allServerList) {this.allServerList = allServerList;}public List<String> getIdleServerList() {return idleServerList;}public void setIdleServerList(List<String> idleServerList) {this.idleServerList = idleServerList;}public ReBalanceStatusResultEnum getStatusResultType() {return statusResultType;}public void setStatusResultType(ReBalanceStatusResultEnum statusResultType) {this.statusResultType = statusResultType;}}
針對多個平衡策略算法,使用一個工廠類來生產(chǎn)各種策略實(shí)例。如下:
/*** 再平衡算法工廠類*/class StatusReBalanceStrategyFactory {/*** 無需做平衡的控制器*/private static final StatusReBalanceStrategy EMPTY_BALANCER = new EmptyReBalancer();/*** 根據(jù)當(dāng)前的負(fù)載狀態(tài),創(chuàng)建對應(yīng)的負(fù)載均衡算法** @param resultContainer 負(fù)載狀態(tài)集* @param shardAssignMaster 分片分配管理者實(shí)例* @return 算法實(shí)例*/public static StatusReBalanceStrategy createStatusReBalanceAlgorithm(CheckConsumerShardingResultContainer resultContainer, ShardAssignMaster shardAssignMaster) {ReBalanceStatusResultEnum balanceStatus = resultContainer.getStatusResultType();switch (balanceStatus) {case OK:return EMPTY_BALANCER;case UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS:return new UnAssignedShardWithConsumerIdleReBalancer(shardAssignMaster,resultContainer.getUnAssignedShardList(), resultContainer.getIdleServerList());case UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS:return new UnassignedShardWithoutConsumerIdleReBalancer(shardAssignMaster,resultContainer.getUnAssignedShardList(), resultContainer.getServerConsumeShardLoad());case HEAVY_LOAD_BALANCE_NEEDED:return new HeavyLoadReBalancer(shardAssignMaster, resultContainer.getServerConsumeShardLoad());case HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED:return new HeavyLoadWithConsumerIdleReBalancer(shardAssignMaster,resultContainer.getServerConsumeShardLoad(), resultContainer.getIdleServerList());default:break;}return EMPTY_BALANCER;}}/*** 負(fù)載均衡策略統(tǒng)一接口*/interface StatusReBalanceStrategy {/*** 執(zhí)行負(fù)載均衡方法*/public void loadBalance();}
針對各種場景的負(fù)載均衡,各自實(shí)現(xiàn)如下。其中,無需操作時,將返回一個空操作實(shí)例!
1. 空操作實(shí)例
/*** 無需做平衡的控制器** @see ReBalanceStatusResultEnum#OK 狀態(tài)枚舉*/class EmptyReBalancer implements StatusReBalanceStrategy {@Overridepublic void loadBalance() {// ignore ...}}
2. 分配剩余shard給空閑的機(jī)器控制器
/*** 為所有空閑的其他空閑機(jī)器分配可用 shard 的控制器** @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態(tài)枚舉*/class UnAssignedShardWithConsumerIdleReBalancer implements StatusReBalanceStrategy {/*** 未被分配的分片列表*/private List<String> unAssignedShardList;/*** 分片分配管理者實(shí)例*/private ShardAssignMaster shardAssignMaster;/*** 空閑的機(jī)器列表*/private List<String> idleServerList;public UnAssignedShardWithConsumerIdleReBalancer(ShardAssignMaster shardAssignMaster,List<String> unAssignedShardList,List<String> idleServerList) {this.shardAssignMaster = shardAssignMaster;this.unAssignedShardList = unAssignedShardList;this.idleServerList = idleServerList;}@Overridepublic void loadBalance() {// 1. 找出還未被消費(fèi)的shard// 2. 依次分配給各空閑機(jī)器,每個空閑機(jī)器只至多分配一個shardint serverIndex = 0;for (String shard1 : unAssignedShardList) {// 輪詢分配shard, 先只給一個機(jī)器分配一個shardif(serverIndex >= idleServerList.size()) {break;}String serverHostname = idleServerList.get(serverIndex++);shardAssignMaster.assignShardToServer(shard1, serverHostname);}}}
3. 分配剩余shard給負(fù)載低的機(jī)器的控制器
/*** 有空閑shard場景 的控制器 , 須找出負(fù)載最低的機(jī)器塞入shard到現(xiàn)有的機(jī)器中(可能是有機(jī)器下線導(dǎo)致)** @see ReBalanceStatusResultEnum#UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS 狀態(tài)枚舉*/class UnassignedShardWithoutConsumerIdleReBalancer implements StatusReBalanceStrategy {/*** 未被分配分片列表*/private List<String> unAssignedShardList;/*** 分片管理者實(shí)例*/private ShardAssignMaster shardAssignMaster;/*** 消費(fèi)者負(fù)載情況*/private Map<String, Integer> consumerLoadCount;public UnassignedShardWithoutConsumerIdleReBalancer(ShardAssignMaster shardAssignMaster,List<String> unAssignedShardList,Map<String, Integer> consumerLoadCount) {this.shardAssignMaster = shardAssignMaster;this.unAssignedShardList = unAssignedShardList;this.consumerLoadCount = consumerLoadCount;}@Overridepublic void loadBalance() {// 1. 找出負(fù)載最低的機(jī)器// 2. 依次分配shard給該機(jī)器// 3. 分配的后負(fù)載數(shù)+1, 循環(huán)分配// 先根據(jù)空閑數(shù),計(jì)算出一個可以接受新shard的機(jī)器的shard負(fù)載最低值,然后依次分配給這些機(jī)器for (String shard1 : unAssignedShardList) {// 按負(fù)載最小分配原則 分配shardMap.Entry<String, Integer> minLoadServer = getMinLoadServer(consumerLoadCount);String serverHostname = minLoadServer.getKey();// 分配shard給機(jī)器shardAssignMaster.assignShardToServer(shard1, serverHostname);// 負(fù)載數(shù) +1minLoadServer.setValue(minLoadServer.getValue() + 1);}}/*** 獲取負(fù)載最小的機(jī)器名備用** @param loadCount 負(fù)載數(shù)據(jù)* @return 最小負(fù)載機(jī)器*/private Map.Entry<String, Integer> getMinLoadServer(Map<String, Integer> loadCount) {int minCount = MAX_CONSUMER_SHARD_LOAD;Map.Entry<String, Integer> minLoadServer = null;for(Map.Entry<String, Integer> server1 : loadCount.entrySet()) {if(server1.getValue() < minCount) {minCount = server1.getValue();minLoadServer = server1;}}return minLoadServer;}}
4. 將現(xiàn)有機(jī)器消費(fèi)情況做重分配,從而使各自負(fù)載相近控制器
/*** 負(fù)載不均衡導(dǎo)致的 重新均衡控制器,將消費(fèi)shard多的機(jī)器的 shard 拆解部分到 消費(fèi)少的機(jī)器上 (須上鎖)** @see ReBalanceStatusResultEnum#HEAVY_LOAD_BALANCE_NEEDED 狀態(tài)枚舉*/class HeavyLoadReBalancer implements StatusReBalanceStrategy {/*** 分片分配管理者實(shí)例*/private ShardAssignMaster shardAssignMaster;/*** 機(jī)器消費(fèi)負(fù)載情況*/private Map<String, Integer> consumerLoadCount;public HeavyLoadReBalancer(ShardAssignMaster shardAssignMaster, Map<String, Integer> consumerLoadCount) {this.shardAssignMaster = shardAssignMaster;this.consumerLoadCount = consumerLoadCount;}@Overridepublic void loadBalance() {// 1. 找出所有機(jī)器的消費(fèi)數(shù)的平均線值// 2. 負(fù)載數(shù)大于均線1的,直接抽出多余的shard, 放到待分配容器中// 3. 從大到小排序負(fù)載機(jī)器// 4. 從大的負(fù)載上減少shard到最后的機(jī)器上,直到小的機(jī)器達(dá)到平均負(fù)載線最貼近的地方,或者小的機(jī)器到達(dá)平均負(fù)載線最貼近的地方// 5. ++大負(fù)載機(jī)器 或者 --小負(fù)載機(jī)器,下一次循環(huán)double avgLoadCount = computeAliveServersAvgLoadCount(consumerLoadCount);List<Map.Entry<String, Integer>> sortedLoadCountList = sortLoadCountByLoadWithSmallEndian(consumerLoadCount);int bigLoadIndex = 0;int smallLoadIndex = sortedLoadCountList.size() - 1;for (;;) {// 首先檢測是否已遍歷完成,完成后不再進(jìn)行分配if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) {break;}Map.Entry<String, Integer> bigLoadServerEntry = sortedLoadCountList.get(bigLoadIndex);double canTakeCountFromBigLoad = bigLoadServerEntry.getValue() - avgLoadCount;if(canTakeCountFromBigLoad < 1) {bigLoadIndex += 1;continue;}for (int reAssignShardIndex = 0;reAssignShardIndex < canTakeCountFromBigLoad; reAssignShardIndex++) {if(isRoundRobinComplete(bigLoadIndex, smallLoadIndex)) {break;}Map.Entry<String, Integer> smallLoadServerEntry = sortedLoadCountList.get(smallLoadIndex);double canPutIntoSmallLoad = avgLoadCount - smallLoadServerEntry.getValue();if(canPutIntoSmallLoad < 1) {smallLoadIndex -= 1;continue;}// 此處可以使用管道操作,更流暢, 或者更準(zhǔn)確的說,使用事務(wù)操作// 從 bigLoad 中移除shard 0// 將移除的 shard 上鎖,以防后續(xù)新機(jī)器立即消費(fèi),導(dǎo)致數(shù)據(jù)異常// 添加新shard到 smallLoad 中String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(bigLoadServerEntry.getKey());bigLoadServerEntry.setValue(bigLoadServerEntry.getValue() - 1);// 上鎖分片,禁用消費(fèi)shardAssignMaster.lockShardId(firstLoadSHardId);// 添加shard到 smallLoad 中shardAssignMaster.assignShardToServer(firstLoadSHardId, smallLoadServerEntry.getKey());smallLoadServerEntry.setValue(smallLoadServerEntry.getValue() + 1);}}}/*** 判定輪詢是否完成** @param startIndex 開始下標(biāo)* @param endIndex 結(jié)束下標(biāo)* @return true: 輪詢完成, false: 未完成*/private boolean isRoundRobinComplete(int startIndex, int endIndex) {return startIndex == endIndex;}/*** 從大到小排序 負(fù)載機(jī)器** @param consumerLoadCount 總負(fù)載情況* @return 排序后的機(jī)器列表*/private List<Map.Entry<String, Integer>> sortLoadCountByLoadWithSmallEndian(Map<String, Integer> consumerLoadCount) {List<Map.Entry<String, Integer>> sortedList = new ArrayList<>(consumerLoadCount.entrySet());sortedList.sort(new Comparator<Map.Entry<String, Integer>>() {@Overridepublic int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {return o2.getValue() - o1.getValue();}});return sortedList;}/*** 計(jì)算平均每臺機(jī)器的消費(fèi)shard負(fù)載** @param loadCount 總負(fù)載指標(biāo)容器* @return 負(fù)載均線*/private double computeAliveServersAvgLoadCount(Map<String, Integer> loadCount) {int totalServerCount = loadCount.size();int totalShardCount = 0;for(Integer consumeShardCount : loadCount.values()) {totalShardCount += consumeShardCount;}return (double) totalShardCount / totalServerCount;}}
5. 從負(fù)載重的機(jī)器上剝奪shard,分配給空閑的機(jī)器 控制器
/*** 負(fù)載不均衡,且存在空閑的機(jī)器, 此時應(yīng)是 均值與最大值之間相差較小值,但是至少有一個 消費(fèi)2 的機(jī)器,可以剝奪其1個shard給空閑機(jī)器 的控制器** @see ReBalanceStatusResultEnum#HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED 狀態(tài)枚舉*/class HeavyLoadWithConsumerIdleReBalancer implements StatusReBalanceStrategy {/*** 分片分配管理者實(shí)例*/private ShardAssignMaster shardAssignMaster;/*** 空閑的機(jī)器列表*/private List<String> idleServerList;/*** 機(jī)器消費(fèi)負(fù)載情況*/private Map<String, Integer> consumerLoadCount;public HeavyLoadWithConsumerIdleReBalancer(ShardAssignMaster shardAssignMaster,Map<String, Integer> consumerLoadCount,List<String> idleServerList) {this.shardAssignMaster = shardAssignMaster;this.consumerLoadCount = consumerLoadCount;this.idleServerList = idleServerList;}@Overridepublic void loadBalance() {// 1. 找出還未被消費(fèi)的shard// 2. 分配一個給自己// 3. 如果還有其他機(jī)器也未分配,則同樣進(jìn)行分配for (String idleHostname1 : idleServerList) {Map.Entry<String, Integer> maxLoadEntry = getMaxLoadConsumerEntry(consumerLoadCount);// 本身只有一個則不再分配負(fù)擔(dān)了if(maxLoadEntry.getValue() <= 1) {break;}String maxLoadServerHostname = maxLoadEntry.getKey();// 此處可以使用管道操作,更流暢, 或者更準(zhǔn)確的說,使用事務(wù)操作// 從 bigLoad 中移除shard 0// 將移除的 shard 上鎖,以防后續(xù)新機(jī)器立即消費(fèi),導(dǎo)致數(shù)據(jù)異常// 添加新shard到 smallLoad 中String firstLoadSHardId = shardAssignMaster.popServerFirstConsumeShardId(maxLoadServerHostname);maxLoadEntry.setValue(maxLoadEntry.getValue() - 1);// 上鎖卸載下來的shard,鎖定50sshardAssignMaster.lockShardId(firstLoadSHardId);// 添加shard到 smallLoad 中shardAssignMaster.assignShardToServer(firstLoadSHardId, idleHostname1);consumerLoadCount.put(idleHostname1, 1);}}/*** 獲取負(fù)載最大的機(jī)器實(shí)例作** @param consumerLoadCount 所有機(jī)器的負(fù)載情況* @return 最大負(fù)載機(jī)器實(shí)例*/private Map.Entry<String, Integer> getMaxLoadConsumerEntry(Map<String, Integer> consumerLoadCount) {Integer maxConsumeCount = 0;Map.Entry<String, Integer> maxEntry = null;for (Map.Entry<String, Integer> server1 : consumerLoadCount.entrySet()) {if(server1.getValue() > maxConsumeCount) {maxConsumeCount = server1.getValue();maxEntry = server1;}}return maxEntry;}}
如上,各個平衡策略,實(shí)現(xiàn)各自的功能,就能掌控整個集群的消費(fèi)控制了!
除了上面的主料,還有一些附帶的東西!
【2】均衡狀態(tài)枚舉值如下:
/*** 再平衡檢測結(jié)果類型枚舉**/public enum ReBalanceStatusResultEnum {/*** 一切正常,無須操作*/OK("一切正常,無須操作"),/*** 有新下線機(jī)器,可以將其分片分配給其他機(jī)器*/UNASSIGNED_SHARD_WITHOUT_CONSUMER_IDLE_EXISTS("有未分配的分片,可以分配給其他機(jī)器"),/*** 有未分配的分片,且有空閑機(jī)器,直接將空閑shard分配給空閑機(jī)器即可(最好只分配1個,以便其他機(jī)器啟動后可用)*/UNASSIGNED_SHARD_WITH_CONSUMER_IDLE_EXISTS("有未分配的分片,且有空閑機(jī)器"),/*** 負(fù)載不均衡,須生平衡*/HEAVY_LOAD_BALANCE_NEEDED("負(fù)載不均衡,須生平衡"),/*** 負(fù)載不均衡,且存在空閑的機(jī)器, 此時應(yīng)是 均值與最大值之間相差較小值,但是至少有一個 消費(fèi)2 的機(jī)器,可以剝奪其1個shard給空閑機(jī)器*/HEAVY_LOAD_WITH_CONSUMER_IDLE_BALANCE_NEEDED("負(fù)載不均衡,且存在空閑的機(jī)器"),;private ReBalanceStatusResultEnum(String remark) {// ignore}}
【3】RedisKeyConstants 常量定義
/*** redis 相關(guān)常量定義*/public class RedisKeyConstants {/*** 在線機(jī)器緩存key.與心跳同時作用** @see #SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX*/public static final String ALL_ONLINE_SERVER_CACHE_KEY = "prefix:active.servers";/*** 機(jī)器消費(fèi)shard情況 緩存key前綴*/public static final String SERVER_CONSUME_CACHE_KEY_PREFIX = "prefix:log.consumer:server:";/*** 分片被分配情況 緩存key前綴*/public static final String SHARD_ASSIGNED_CACHE_KEY_PREFIX = "prefix:shard.assigned:id:";/*** 分片鎖 緩存key前綴, 當(dāng)上鎖時,任何機(jī)器不得再消費(fèi)*/public static final String SHARD_LOCK_CONSUME_CACHE_PREFIX = "prefix:consume.lock.shard:id:";/*** 存活機(jī)器心跳,與上面的機(jī)器形成呼應(yīng)** @see #ALL_ONLINE_SERVER_CACHE_KEY*/public static final String SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX = "prefix:log.consumer:server.heartbeat:";/*** 單個消費(fèi)者最大消費(fèi)負(fù)載數(shù) (一個不可能達(dá)到的值)*/public static final Integer MAX_CONSUMER_SHARD_LOAD = 9999;}
【4】shard分配控制器負(fù)責(zé)所有shard分配
import com.test.utils.RedisPoolUtil;import java.util.ArrayList;import java.util.List;import java.util.Set;/*** shard分配管理者 (盡量使用接口表達(dá))**/public class ShardAssignMaster {private RedisPoolUtil redisPoolUtil;public ShardAssignMaster(RedisPoolUtil redisPoolUtil) {this.redisPoolUtil = redisPoolUtil;}/*** 注冊消費(fèi)者到 控制中心(注冊中心)*/public void registerConsumer(String serverHostname) {// 注冊server到 redis zset 中,如有條件,可以使用 zk 進(jìn)行操作,也許更好redisPoolUtil.zadd(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, (double)System.currentTimeMillis(), serverHostname);}/*** 心跳發(fā)送數(shù)據(jù)*/public void sendHeartbeat(String serverHostname) {String heartbeatCacheKey = RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + serverHostname;redisPoolUtil.set(heartbeatCacheKey, "1", 30);}/*** 檢測指定消費(fèi)者服務(wù)器還存活與否** @param consumeHostname 機(jī)器名* @return true: 存活, false: 宕機(jī)*/public boolean isConsumerServerAlive(String consumeHostname) {String aliveValue = redisPoolUtil.get(RedisKeyConstants.SERVER_ALIVE_HEARTBEAT_CACHE_PREFIX + consumeHostname);return aliveValue != null&& "1".equals(aliveValue);}/*** 獲取并刪除指定server的所屬消費(fèi)的第一個 shardId** @param serverHostname 機(jī)器名* @return 第一個shardId*/public String popServerFirstConsumeShardId(String serverHostname) {String bigLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;Set<String> firstLoadShardSet = redisPoolUtil.zrange(bigLoadConsumerServerCacheKey, 0, 0);String firstLoadSHardId = firstLoadShardSet.iterator().next();redisPoolUtil.zrem(bigLoadConsumerServerCacheKey, firstLoadSHardId);redisPoolUtil.expire(bigLoadConsumerServerCacheKey, 60);return firstLoadSHardId;}/*** 對shard進(jìn)行上鎖,禁止所有消費(fèi)行為** @param shardId 分片id*/public void lockShardId(String shardId) {String shardLockCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId;redisPoolUtil.set(shardLockCacheKey, "1", 50);}/*** 分配shard分片數(shù)據(jù)給 指定server** @param shardId 分片id* @param serverHostname 分配給的消費(fèi)者機(jī)器名*/public void assignShardToServer(String shardId, String serverHostname) {String smallLoadConsumerServerCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;redisPoolUtil.zadd(smallLoadConsumerServerCacheKey, (double)System.currentTimeMillis(), shardId);redisPoolUtil.expire(smallLoadConsumerServerCacheKey, 60);// 更新新的shard消費(fèi)者標(biāo)識String shardIdAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;redisPoolUtil.set(shardIdAssignCacheKey, serverHostname);}/*** 獲取被分配了shardId的server信息** @param shardId 要檢查的分片id* @return 被分配了shardId 的機(jī)器名*/public String getShardAssignedServer(String shardId) {String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;return redisPoolUtil.get(shardAssignCacheKey);}/*** 刪除shard的分配信息,使無效化** @param shardId 要刪除的分片id*/public void invalidateShardAssignInfo(String shardId) {String shardAssignCacheKey = RedisKeyConstants.SHARD_ASSIGNED_CACHE_KEY_PREFIX + shardId;redisPoolUtil.del(shardAssignCacheKey);}/*** 清理下線機(jī)器** @param hostname 下線機(jī)器名*/public void invalidateOfflineServer(String hostname) {redisPoolUtil.zrem(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, hostname);}/*** 獲取機(jī)器消費(fèi)的shard列表** @param serverHostname 機(jī)器主機(jī)名* @return shard列表 或者 null*/public Set<String> getServerDutyConsumeShardSet(String serverHostname) {String serverDutyConsumeShardCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + serverHostname;return redisPoolUtil.zrange(serverDutyConsumeShardCacheKey, 0, -1);}/*** 獲取所有在線機(jī)器列表** @return 在線機(jī)器列表*/public List<String> getAllOnlineServerList() {Set<String> hostnameSet = redisPoolUtil.zrange(RedisKeyConstants.ALL_ONLINE_SERVER_CACHE_KEY, 0, -1);return new ArrayList<>(hostnameSet);}}
以上是協(xié)同負(fù)載均衡器代碼實(shí)現(xiàn)。
【5】當(dāng)然你還需要一個消費(fèi)者
接下來我們還要看下消費(fèi)者如何實(shí)現(xiàn)消費(fèi)。
import com.test.utils.RedisPoolUtil;import org.apache.commons.lang3.StringUtils;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.net.InetAddress;import java.net.UnknownHostException;import java.time.LocalDateTime;import java.util.Map;import java.util.Set;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;/*** 消費(fèi)者業(yè)務(wù)線程**/public class LoghubConsumeWorker implements Runnable {private static final Logger logger = LoggerFactory.getLogger(LoghubConsumeWorker.class);private RedisPoolUtil redisPoolUtil;private String HOST_NAME;/*** 因消費(fèi)者數(shù)目不一定,所以使用 CachedThreadPool*/private ExecutorService consumeExecutorService = Executors.newCachedThreadPool();public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil) {this(redisPoolUtil, null);}public LoghubConsumeWorker(RedisPoolUtil redisPoolUtil, String hostName) {this.redisPoolUtil = redisPoolUtil;// 為測試需要添加的 hostNameHOST_NAME = hostName;initSharedVars();}/*** 初始化公共變量*/private void initSharedVars() {try {if(HOST_NAME != null) {return;}HOST_NAME = InetAddress.getLocalHost().getHostName();}catch (UnknownHostException e) {throw new RuntimeException("init error : 獲取服務(wù)器主機(jī)名失敗");}}@Overridepublic void run() {while (!Thread.interrupted()) {// 先獲取所有分配給的shard列表,為空則進(jìn)入下一次循環(huán)(注意此時阻塞鎖不能起作用)Set<String> shardsSet = blockingTakeAvailableConsumeShardList();try {// 消費(fèi)所有給定shard數(shù)據(jù)consumeLogHubShards(shardsSet);} catch (Exception e) {logger.error("消費(fèi)loghub, error", e);}}}/*** 獲取可用的分片列表(沒有則阻塞等待)** @return 分片列表*/private Set<String> blockingTakeAvailableConsumeShardList() {while (!Thread.interrupted()) {String serverConsumeCacheKey = RedisKeyConstants.SERVER_CONSUME_CACHE_KEY_PREFIX + HOST_NAME;Set<String> shardsSet = redisPoolUtil.zrange(serverConsumeCacheKey, 0, -1);if (shardsSet != null && !shardsSet.isEmpty()) {return shardsSet;}logger.warn(" =========== 當(dāng)前主機(jī)[hostname:{}]未查詢到任何shard =========", HOST_NAME);try {Thread.sleep(1000);} catch (InterruptedException e) {logger.error("LogHubClientWork run 未獲取到該主機(jī)的shard時,每隔1秒鐘獲取 ,error : {}", e);}}return null;}/*** 消費(fèi)loghub 分片數(shù)據(jù)** @param shardsSet 被分配的分片列表*/public void consumeLogHubShards(Set<String> shardsSet) throws InterruptedException {if(shardsSet == null || shardsSet.isEmpty()) {return;}// 此處使用 CountdownLatch, 保證至少有一個任務(wù)完成時,才開始下一次任務(wù)的調(diào)入// Semaphore semaphoreLock = new Semaphore(shardsSet.size());CountDownLatch openDoorLatch = new CountDownLatch(1);boolean startNewJobAtLeastOnce = false;for (String shard : shardsSet) {// 檢測當(dāng)前shard是否處于鎖定狀態(tài),如果鎖定則不能消費(fèi), 注意鎖情況if(isShardLocked(shard)) {logger.info("=============== shard:{} is locked, continue... ======", shard);continue;}int shardId = Integer.parseInt(shard);LoghubConsumerTaskExecutor consumer = getConsumerExecutor(shardId);// consumer 應(yīng)保證有所消費(fèi),如果沒有消費(fèi),則自行等待一個長周期,外部應(yīng)只管調(diào)入請求// consumer 應(yīng)保證所有消費(fèi),在上一個任務(wù)未完成時,不得再開啟下一輪提交消費(fèi)boolean startNewJob = consumer.startNewConsumeJob(openDoorLatch);if(startNewJob) {// start failed, prev job is running maybe// ignore job, no blockingstartNewJobAtLeastOnce = true;}}// 任意一個任務(wù)完成,都將打開新的分配周期,且后續(xù) countDown 將無效,此處可能導(dǎo)致死鎖if(startNewJobAtLeastOnce) {openDoorLatch.await();}else {// 當(dāng)本次分配調(diào)度一個任務(wù)都未提交時,則睡眠等待// (一般此情況為 消費(fèi)者被分配了上了鎖的shard時,即搶占另的機(jī)器的shard, 需要給別的機(jī)器備份數(shù)據(jù)時間鎖)Thread.sleep(200);}}/*** 檢測分片是否被鎖定消費(fèi)了** @param shardId 分片id* @return true:鎖定, false:未鎖定可用*/private boolean isShardLocked(String shardId) {String shardCacheKey = RedisKeyConstants.SHARD_LOCK_CONSUME_CACHE_PREFIX + shardId;String lockValue = redisPoolUtil.get(shardCacheKey);return !StringUtils.isBlank(lockValue)&& "1".equals(lockValue);}/*** 獲取消費(fèi)者實(shí)例,針對一個shard, 只創(chuàng)建一個實(shí)例*/private Map<Integer, LoghubConsumerTaskExecutor> mShardConsumerMap = new ConcurrentHashMap<>();private LoghubConsumerTaskExecutor getConsumerExecutor(final int shardId) {LoghubConsumerTaskExecutor consumer = mShardConsumerMap.get(shardId);if (consumer != null) {return consumer;}consumer = new LoghubConsumerTaskExecutor(new SingleShardConsumerJob(shardId));mShardConsumerMap.put(shardId, consumer);logger.info(" ======================= create new consumer executor shard:{}", shardId);return consumer;}/*** 消費(fèi)者調(diào)度器** 統(tǒng)一控制消費(fèi)者的運(yùn)行狀態(tài)管控*/class LoghubConsumerTaskExecutor {private Future<?> future;private ConsumerJob consumerJob;public LoghubConsumerTaskExecutor(ConsumerJob consumerJob) {this.consumerJob = consumerJob;}/*** 啟動一個新消費(fèi)任務(wù)** @return true: 啟動成功, false: 啟動失敗有未完成任務(wù)在前*/public boolean startNewConsumeJob(CountDownLatch latch) {if(future == null|| future.isCancelled() || future.isDone()) {//沒有任務(wù)或者任務(wù)已取消或已完成 提交任務(wù)future = consumeExecutorService.submit(new Runnable() {@Overridepublic void run() {try {consumerJob.consumeShardData();}finally {latch.countDown();}}});return true;}return false;}}}/*** 消費(fèi)者任務(wù)接口定義*/interface ConsumerJob {/*** 消費(fèi)數(shù)據(jù)具體邏輯實(shí)現(xiàn)*/public void consumeShardData();}/*** 單個shard消費(fèi)的任務(wù)實(shí)現(xiàn)*/class SingleShardConsumerJob implements ConsumerJob {/*** 當(dāng)前任務(wù)的消費(fèi) shardId*/private int shardId;public SingleShardConsumerJob(int shardId) {this.shardId = shardId;}@Overridepublic void consumeShardData() {System.out.println(LocalDateTime.now() + " - host -> consume shard: " + shardId);try {// do complex biz// 此處如果發(fā)現(xiàn)shard 不存在異常,則應(yīng)回調(diào)協(xié)調(diào)器,進(jìn)行shard的移除Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}}
【6】當(dāng)然你還需要一個demo
看不到效果,我就是不信!
所以來看個 demo 吧!
我們使用單機(jī)開多個 單元測試用例,直接測試就好!
測試代碼:.
import com.test.common.config.LogHubProperties;import com.test.utils.RedisPoolUtil;import org.junit.Test;import java.io.IOException;import java.util.Random;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;/*** 臨時測試 負(fù)載均衡**/public class ShardConsumerLoadBalanceTest {public static void main(String[] args) throws IOException {startAConsumer();System.in.read();}// 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用@Testpublic void mainMock() throws IOException {startAConsumer();System.in.read();}// 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用@Testpublic void startNewConsumer() throws IOException {startAConsumer();System.in.read();}// 啟動一個單元測試,就相當(dāng)于啟動一個消費(fèi)者應(yīng)用@Testpublic void startNewConsumer2() throws IOException {startAConsumer();System.in.read();}private static void startAConsumer() {RedisPoolUtil redisPoolUtil = new RedisPoolUtil();redisPoolUtil.setIp("127.0.0.1");redisPoolUtil.setMaxActive(111);redisPoolUtil.setMaxIdle(1000);redisPoolUtil.setPort(6379);redisPoolUtil.setMaxWait(100000);redisPoolUtil.setTimeout(100000);redisPoolUtil.setPassWord("123");redisPoolUtil.setDatabase(0);redisPoolUtil.initPool();LogHubProperties logHubProperties = new LogHubProperties();logHubProperties.setProjectName("test");logHubProperties.setEndpoint("cn-shanghai-finance-1.log.aliyuncs.com");logHubProperties.setAccessKey("xxxx");logHubProperties.setAccessKeyId("11111");// 使用隨機(jī) hostname 模擬多臺機(jī)器調(diào)用Random random = new Random();String myHostname = "my-host-" + random.nextInt(10);// 啟動管理線程LoghubConsumerShardCoWorker shardCoWorker = new LoghubConsumerShardCoWorker(redisPoolUtil, logHubProperties, myHostname);ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);scheduledExecutorService.scheduleAtFixedRate(shardCoWorker, 5, 30, TimeUnit.SECONDS);// 啟動業(yè)務(wù)線程ExecutorService executorService = Executors.newFixedThreadPool(2);LoghubConsumeWorker worker = new LoghubConsumeWorker(redisPoolUtil, myHostname);executorService.submit(worker);}}
如上,就可以實(shí)現(xiàn)自己的負(fù)載均衡消費(fèi)了。
比如: 總分片數(shù)為4。
1. 最開始啟動1個機(jī)器時,將會被分配 0,1,2,3。
2. 啟動兩個后,將分為 0,1; 2,3;
3. 啟動3個后,將分為 0; 1; 2,3;
4. 反之,關(guān)閉一個機(jī)器后,將把壓力分擔(dān)到原機(jī)器上。
當(dāng)做負(fù)載重分配時,將有50秒的鎖定時間備份。
【7】待完善的點(diǎn)
本文是基于loghub實(shí)現(xiàn)的分片拉取,其實(shí)在這方面loghub與kafka是如出一轍的,只是loghub更商業(yè)產(chǎn)品化。
當(dāng)shard縮減時,應(yīng)能夠自動發(fā)現(xiàn),從而去除原有的機(jī)器消費(fèi)分配。而不是讓消費(fèi)者報錯。
注意進(jìn)行再均衡時,消費(fèi)者偏移量問題,尤其是你為了性能使用了jvm本地變量保存偏移時,注意刷新該變量偏移。本文沒有實(shí)現(xiàn)類似zookeeper強(qiáng)大的watch監(jiān)聽功能,但是有一個上鎖等待的過程,你可以基于這個鎖做一些力所能及的事!
老話:可以適當(dāng)造輪子!

騰訊、阿里、滴滴后臺面試題匯總總結(jié) — (含答案)
面試:史上最全多線程面試題 !
最新阿里內(nèi)推Java后端面試題
JVM難學(xué)?那是因?yàn)槟銢]認(rèn)真看完這篇文章

關(guān)注作者微信公眾號 —《JAVA爛豬皮》
了解更多java后端架構(gòu)知識以及最新面試寶典


看完本文記得給作者點(diǎn)贊+在看哦~~~大家的支持,是作者源源不斷出文的動力
作者:等你歸去來
出處:https://www.cnblogs.com/yougewe/p/11114348.html
