elastic-job的原理簡(jiǎn)介和使用
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
elastic-job是當(dāng)當(dāng)開(kāi)源的一款非常好用的作業(yè)框架,在這之前,我們開(kāi)發(fā)定時(shí)任務(wù)一般都是使用quartz或者spring-task(ScheduledExecutorService),無(wú)論是使用quartz還是spring-task,我們都會(huì)至少遇到兩個(gè)痛點(diǎn):
1.不敢輕易跟著應(yīng)用服務(wù)多節(jié)點(diǎn)部署,可能會(huì)重復(fù)多次執(zhí)行而引發(fā)系統(tǒng)邏輯的錯(cuò)誤。
2.quartz的集群僅僅只是用來(lái)HA,節(jié)點(diǎn)數(shù)量的增加并不能給我們的每次執(zhí)行效率帶來(lái)提升,即不能實(shí)現(xiàn)水平擴(kuò)展。
本篇博文將會(huì)自頂向下地介紹elastic-job,讓大家認(rèn)識(shí)了解并且快速搭建起環(huán)境。
elastic-job產(chǎn)品線說(shuō)明
elastic-job在2.x之后,出了兩個(gè)產(chǎn)品線:Elastic-Job-Lite和Elastic-Job-Cloud。我們一般使用Elastic-Job-Lite就能夠滿(mǎn)足需求,本文也是以Elastic-Job-Lite為主。1.x系列對(duì)應(yīng)的就只有Elastic-Job-Lite,并且在2.x里修改了一些核心類(lèi)名,差別雖大,原理類(lèi)似,建議使用2.x系列。寫(xiě)此博文,最新release版本為2.0.5。

elastic-job-lite原理
舉個(gè)典型的job場(chǎng)景,比如余額寶里的昨日收益,系統(tǒng)需要job在每天某個(gè)時(shí)間點(diǎn)開(kāi)始,給所有余額寶用戶(hù)計(jì)算收益。如果用戶(hù)數(shù)量不多,我們可以輕易使用quartz來(lái)完成,我們讓計(jì)息job在某個(gè)時(shí)間點(diǎn)開(kāi)始執(zhí)行,循環(huán)遍歷所有用戶(hù)計(jì)算利息,這沒(méi)問(wèn)題。可是,如果用戶(hù)體量特別大,我們可能會(huì)面臨著在第二天之前處理不完這么多用戶(hù)。另外,我們部署job的時(shí)候也得注意,我們可能會(huì)把job直接放在我們的webapp里,webapp通常是多節(jié)點(diǎn)部署的,這樣,我們的job也就是多節(jié)點(diǎn),多個(gè)job同時(shí)執(zhí)行,很容易造成重復(fù)執(zhí)行,比如用戶(hù)重復(fù)計(jì)息,為了避免這種情況,我們可能會(huì)對(duì)job的執(zhí)行加鎖,保證始終只有一個(gè)節(jié)點(diǎn)能執(zhí)行,或者干脆讓job從webapp里剝離出來(lái),獨(dú)自部署一個(gè)節(jié)點(diǎn)。
elastic-job就可以幫助我們解決上面的問(wèn)題,elastic底層的任務(wù)調(diào)度還是使用的quartz,通過(guò)zookeeper來(lái)動(dòng)態(tài)給job節(jié)點(diǎn)分片。
我們來(lái)看:
很大體量的用戶(hù)需要在特定的時(shí)間段內(nèi)計(jì)息完成
我們肯定是希望我們的任務(wù)可以通過(guò)集群達(dá)到水平擴(kuò)展,集群里的每個(gè)節(jié)點(diǎn)都處理部分用戶(hù),不管用戶(hù)數(shù)量有多龐大,我們只要增加機(jī)器就可以了,比如單臺(tái)機(jī)器特定時(shí)間能處理n個(gè)用戶(hù),2臺(tái)機(jī)器處理2n個(gè)用戶(hù),3臺(tái)3n,4臺(tái)4n...,再多的用戶(hù)也不怕了。
使用elastic-job開(kāi)發(fā)的作業(yè)都是zookeeper的客戶(hù)端,比如我希望3臺(tái)機(jī)器跑job,我們將任務(wù)分成3片,框架通過(guò)zk的協(xié)調(diào),最終會(huì)讓3臺(tái)機(jī)器分別分配到0,1,2的任務(wù)片,比如server0-->0,server1-->1,server2-->2,當(dāng)server0執(zhí)行時(shí),可以只查詢(xún)id%3==0的用戶(hù),server1執(zhí)行時(shí),只查詢(xún)id%3==1的用戶(hù),server2執(zhí)行時(shí),只查詢(xún)id%3==2的用戶(hù)。
任務(wù)部署多節(jié)點(diǎn)引發(fā)重復(fù)執(zhí)行
在上面的基礎(chǔ)上,我們?cè)僭黾觭erver3,此時(shí),server3分不到任務(wù)分片,因?yàn)橹挥?片,已經(jīng)分完了。沒(méi)有分到任務(wù)分片的作業(yè)程序?qū)⒉粓?zhí)行。
如果此時(shí)server2掛了,那么server2的分片項(xiàng)會(huì)分配給server3,server3有了分片,就會(huì)替代server2執(zhí)行。
如果此時(shí)server3也掛了,只剩下server0和server1了,框架也會(huì)自動(dòng)把server3的分片隨機(jī)分配給server0或者server1,可能會(huì)這樣,server0-->0,server1-->1,2。
這種特性稱(chēng)之為彈性擴(kuò)容,即elastic-job名稱(chēng)的由來(lái)。
代碼演示
我們搭建環(huán)境通過(guò)示例代碼來(lái)演示上面的例子,elastic-job是不支持單機(jī)多實(shí)例的,通過(guò)zk的協(xié)調(diào)分片是以ip為單元的。很多同學(xué)上來(lái)可能就是通過(guò)單機(jī)多實(shí)例來(lái)學(xué)習(xí),結(jié)果導(dǎo)致分片和預(yù)期不一致。這里沒(méi)辦法,只能通過(guò)多機(jī)器或者虛擬機(jī),我們這里使用虛擬機(jī),另外,由于資源有限,我們這里僅僅只模擬兩臺(tái)機(jī)器。
節(jié)點(diǎn)說(shuō)明:
本地宿主機(jī)器
zookeeper、job
192.168.241.1
虛擬機(jī)
job
192.168.241.128
環(huán)境說(shuō)明:
Java
請(qǐng)使用JDK1.7及其以上版本。
Zookeeper
請(qǐng)使用Zookeeper3.4.6及其以上版本
Elastic-Job-Lite
2.0.5(2.x系列即可,最好是2.0.4及其以上,因?yàn)?.0.4版本有本人提交的少許代碼,(*^__^*) 嘻嘻……)
需求說(shuō)明:
通過(guò)兩臺(tái)機(jī)器演示動(dòng)態(tài)分片
step1. 引入框架的jar包
<!-- 引入elastic-job-lite核心模塊 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.0.5</version>
</dependency>
<!-- 使用springframework自定義命名空間時(shí)引入 -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.0.5</version>
</dependency>
step2. 編寫(xiě)job
package com.fanfan.sample001;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.util.Date;
/**
* Created by fanfan on 2016/12/20.
*/
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("------Thread ID: %s, 任務(wù)總片數(shù): %s, 當(dāng)前分片項(xiàng): %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
/**
* 實(shí)際開(kāi)發(fā)中,有了任務(wù)總片數(shù)和當(dāng)前分片項(xiàng),就可以對(duì)任務(wù)進(jìn)行分片執(zhí)行了
* 比如 SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem
*/
}
}
Step3. Spring配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd">
<!--配置作業(yè)注冊(cè)中心 -->
<reg:zookeeper id="regCenter" server-lists="192.168.241.1:2181" namespace="dd-job"
base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置作業(yè)-->
<job:simple id="mySimpleJob" class="com.fanfan.sample001.MySimpleJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0/2 * * * * ?" overwrite="true" />
</beans>
Case1. 單節(jié)點(diǎn)



Case2. 增加一個(gè)節(jié)點(diǎn)





Case3. 斷開(kāi)一個(gè)節(jié)點(diǎn)



作業(yè)類(lèi)型
elastic-job提供了三種類(lèi)型的作業(yè):Simple類(lèi)型作業(yè)、Dataflow類(lèi)型作業(yè)、Script類(lèi)型作業(yè)。這里主要講解前兩者。Script類(lèi)型作業(yè)意為腳本類(lèi)型作業(yè),支持shell,python,perl等所有類(lèi)型腳本,使用不多,可以參見(jiàn)github文檔。
SimpleJob需要實(shí)現(xiàn)SimpleJob接口,意為簡(jiǎn)單實(shí)現(xiàn),未經(jīng)過(guò)任何封裝,與quartz原生接口相似,比如示例代碼中所使用的job。
Dataflow類(lèi)型用于處理數(shù)據(jù)流,需實(shí)現(xiàn)DataflowJob接口。該接口提供2個(gè)方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。
可通過(guò)DataflowJobConfiguration配置是否流式處理。
流式處理數(shù)據(jù)只有fetchData方法的返回值為null或集合長(zhǎng)度為空時(shí),作業(yè)才停止抓取,否則作業(yè)將一直運(yùn)行下去;非流式處理數(shù)據(jù)則只會(huì)在每次作業(yè)執(zhí)行過(guò)程中執(zhí)行一次fetchData方法和processData方法,隨即完成本次作業(yè)。
實(shí)際開(kāi)發(fā)中,Dataflow類(lèi)型的job還是很有好用的。
比如拿余額寶計(jì)息來(lái)說(shuō):
package com.fanfan.sample001;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import java.util.ArrayList;
import java.util.List;
/**
* Created by fanfan on 2016/12/23.
*/
public class MyDataFlowJob implements DataflowJob<User> {
/*
status
0:待處理
1:已處理
*/
@Override
public List<User> fetchData(ShardingContext shardingContext) {
List<User> users = null;
/**
* users = SELECT * FROM user WHERE status = 0 AND MOD(id, shardingTotalCount) = shardingItem Limit 0, 30
*/
return users;
}
@Override
public void processData(ShardingContext shardingContext, List<User> data) {
for (User user: data) {
System.out.println(String.format("用戶(hù) %s 開(kāi)始計(jì)息", user.getUserId()));
user.setStatus(1);
/**
* update user
*/
}
}
}
<job:dataflow id="myDataFlowJob" class="com.fanfan.sample001.MyDataFlowJob" registry-center-ref="regCenter"
sharding-total-count="2" cron="0 0 02 * * ?" streaming-process="true" overwrite="true" />
其它功能
上述介紹的是最精簡(jiǎn)常用的功能。elastic-job的功能集還不止這些,比如像作業(yè)事件追蹤、任務(wù)監(jiān)聽(tīng)等,另外,elastic-job-lite-console作為一個(gè)獨(dú)立的運(yùn)維平臺(tái)還提供了用來(lái)查詢(xún)和操作任務(wù)的web頁(yè)面。
這些增強(qiáng)的功能讀者可以在github/elastic-job上自行學(xué)習(xí),相信有了本篇博文的基礎(chǔ),再閱讀那些文檔就特別簡(jiǎn)單了。
————————————————
版權(quán)聲明:本文為CSDN博主「秋名車(chē)手」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/fanfan_v5/article/details/61310045
粉絲福利:Java從入門(mén)到入土學(xué)習(xí)路線圖
??????

??長(zhǎng)按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
