php Swoole實(shí)現(xiàn)毫秒級(jí)定時(shí)任務(wù)
項(xiàng)目開(kāi)發(fā)中,如果有定時(shí)任務(wù)的業(yè)務(wù)要求,我們會(huì)使用linux的crontab來(lái)解決,但是它的最小粒度是分鐘級(jí)別,如果要求粒度是秒級(jí)別的,甚至毫秒級(jí)別的,crontab就無(wú)法滿足,值得慶幸的是swoole提供的強(qiáng)大的毫秒定時(shí)器。
應(yīng)用場(chǎng)景舉例
我們可能會(huì)遇到這樣的場(chǎng)景:
場(chǎng)景一:每隔30秒獲取一次本機(jī)內(nèi)存使用率
場(chǎng)景二:2分鐘后執(zhí)行報(bào)表發(fā)送任務(wù)
場(chǎng)景三:每天凌晨2點(diǎn)鐘定時(shí)請(qǐng)求第三方接口,如果接口有數(shù)據(jù)返回則停止任務(wù),如果接口由于某種原因沒(méi)有響應(yīng)或者沒(méi)有數(shù)據(jù)返回則5分鐘后繼續(xù)嘗試請(qǐng)求該接口,嘗試5次后仍然失敗則停止該任務(wù)
以上的三個(gè)場(chǎng)景我們都可以歸納為定時(shí)任務(wù)的范疇。
Swoole毫秒定時(shí)器
Swoole提供了異步毫秒定時(shí)器函數(shù):
swoole_timer_tick(int $msec, callable $callback):設(shè)置一個(gè)間隔時(shí)鐘定時(shí)器,每隔$msec毫秒執(zhí)行一次$callback,類似于javascript中的setInterval()。
swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的時(shí)間$after_time_ms后執(zhí)行$callback_function,類似于javascript的setTimeout()。
swoole_timer_clear(int $timer_id):刪除指定id的定時(shí)器,類似于javascript的clearInterval()。
解決方案
對(duì)于場(chǎng)景一,經(jīng)常用在系統(tǒng)檢測(cè)統(tǒng)計(jì)方面,實(shí)時(shí)性要求比較高,但又能控制好頻率,多用于后臺(tái)服務(wù)器性能監(jiān)控,可以生成可視化圖表??梢允?0秒獲取一次內(nèi)存使用率,也可以是10秒,而crontab最小粒度只能設(shè)置為1分鐘。
//?啟用定時(shí)器,每30秒執(zhí)行一次
?swoole_timer_tick(30000,?function($timer)?use?($task_id)?{?
????$memPercent?=?$this->getMemoryUsage();?//計(jì)算內(nèi)存使用率
????echo?date('Y-m-d?H:i:s')?.?'當(dāng)前內(nèi)存使用率:'.$memPercent."\n";
?});對(duì)于場(chǎng)景二,直接定義xx時(shí)間后執(zhí)行某項(xiàng)任務(wù)的話,貌似crontab比較困難,而使用swoole的swoole_timer_after可以實(shí)現(xiàn):
swoole_timer_after(120000,?function()?use?($str)?{?//2分鐘后執(zhí)行
?????$this->sendReport();?//發(fā)送報(bào)表
?????echo?"send?report,?$str\n";
?});對(duì)于場(chǎng)景三,用來(lái)作嘗試請(qǐng)求,請(qǐng)求失敗后繼續(xù),如果成功則停止請(qǐng)求。用crontab也能解決,但是比較傻,比如設(shè)置每隔5分鐘請(qǐng)求一次,不管成功會(huì)失敗都會(huì)去執(zhí)行一次。而用swoole定時(shí)器則智能多了。
//?啟用定時(shí)器,每5分鐘執(zhí)行一次
//更多視頻教程,idea激活碼,微信搜索【碼農(nóng)編程進(jìn)階筆記】
swoole_timer_tick(5*60*1000,?function($timer)?use?($url)?{?
????$rs?=?$this->postUrl($url);
????if?($rs)?{
????????//業(yè)務(wù)代碼...
????????swoole_timer_clear($timer);?//?停止定時(shí)器
????????echo?date('Y-m-d?H:i:s').?"請(qǐng)求接口任務(wù)執(zhí)行成功\n";
????}?else?{
????????echo?date('Y-m-d?H:i:s').?"請(qǐng)求接口失敗,5分鐘后再次嘗試\n";
????}
});?示例代碼
新建文件\src\App\Task.php:
namespace?Helloweba\Swoole;
use?swoole_server;
/**
*?任務(wù)調(diào)度
*?更多視頻教程,idea激活碼,
*?微信搜索【碼農(nóng)編程進(jìn)階筆記】
*/
class?Task
{
????protected?$serv;
????protected?$host?=?'127.0.0.1';
????protected?$port?=?9506;
????//?進(jìn)程名稱
????protected?$taskName?=?'swooleTask';
????//?PID路徑
????protected?$pidPath?=?'/run/swooletask.pid';
????//?設(shè)置運(yùn)行時(shí)參數(shù)
????protected?$options?=?[
????????'worker_num'?=>?4,?//worker進(jìn)程數(shù),一般設(shè)置為CPU數(shù)的1-4倍
????????'daemonize'?=>?true,?//啟用守護(hù)進(jìn)程
????????'log_file'?=>?'/data/log/swoole-task.log',?//指定swoole錯(cuò)誤日志文件
????????'log_level'?=>?0,?//日志級(jí)別?范圍是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
????????'dispatch_mode'?=>?1,?//數(shù)據(jù)包分發(fā)策略,1-輪詢模式
????????'task_worker_num'?=>?4,?//task進(jìn)程的數(shù)量
????????'task_ipc_mode'?=>?3,?//使用消息隊(duì)列通信,并設(shè)置為爭(zhēng)搶模式
????];
????public?function?__construct($options?=?[])
????{
????????date_default_timezone_set('PRC');
????????//?構(gòu)建Server對(duì)象,監(jiān)聽(tīng)127.0.0.1:9506端口
????????$this->serv?=?new?swoole_server($this->host,?$this->port);
????????if?(!empty($options))?{
????????????$this->options?=?array_merge($this->options,?$options);
????????}
????????$this->serv->set($this->options);
????????//?注冊(cè)事件
????????$this->serv->on('Start',?[$this,?'onStart']);
????????$this->serv->on('Connect',?[$this,?'onConnect']);
????????$this->serv->on('Receive',?[$this,?'onReceive']);
????????$this->serv->on('Task',?[$this,?'onTask']);
????????$this->serv->on('Finish',?[$this,?'onFinish']);
????????$this->serv->on('Close',?[$this,?'onClose']);
????}
????public?function?start()
????{
????????//?Run?worker
????????$this->serv->start();
????}
????public?function?onStart($serv)
????{
????????//?設(shè)置進(jìn)程名
????????cli_set_process_title($this->taskName);
????????//記錄進(jìn)程id,腳本實(shí)現(xiàn)自動(dòng)重啟
????????$pid?=?"{$serv->master_pid}\n{$serv->manager_pid}";
????????file_put_contents($this->pidPath,?$pid);
????}
????//監(jiān)聽(tīng)連接進(jìn)入事件
????public?function?onConnect($serv,?$fd,?$from_id)
????{
????????$serv->send(?$fd,?"Hello?{$fd}!"?);
????}
????//?監(jiān)聽(tīng)數(shù)據(jù)接收事件
????public?function?onReceive(swoole_server?$serv,?$fd,?$from_id,?$data)
????{
????????echo?"Get?Message?From?Client?{$fd}:{$data}\n";
????????//$this->writeLog('接收客戶端參數(shù):'.$fd?.'-'.$data);
????????$res['result']?=?'success';
????????$serv->send($fd,?json_encode($res));?//?同步返回消息給客戶端
????????$serv->task($data);??//?執(zhí)行異步任務(wù)
????}
????/**
????*?@param?$serv?swoole_server?swoole_server對(duì)象
????*?@param?$task_id?int?任務(wù)id
????*?@param?$from_id?int?投遞任務(wù)的worker_id
????*?@param?$data?string?投遞的數(shù)據(jù)
????*/
????public?function?onTask(swoole_server?$serv,?$task_id,?$from_id,?$data)
????{
????????swoole_timer_tick(30000,?function($timer)?use?($task_id)?{?//?啟用定時(shí)器,每30秒執(zhí)行一次
????????????$memPercent?=?$this->getMemoryUsage();
????????????echo?date('Y-m-d?H:i:s')?.?'當(dāng)前內(nèi)存使用率:'.$memPercent."\n";
????????});
????}
????/**
????*?@param?$serv?swoole_server?swoole_server對(duì)象
????*?@param?$task_id?int?任務(wù)id
????*?@param?$data?string?任務(wù)返回的數(shù)據(jù)
????*/
????public?function?onFinish(swoole_server?$serv,?$task_id,?$data)
????{
????????//
????}
????//?監(jiān)聽(tīng)連接關(guān)閉事件
????public?function?onClose($serv,?$fd,?$from_id)?{
????????echo?"Client?{$fd}?close?connection\n";
????}
????public?function?stop()
????{
????????$this->serv->stop();
????}
????private?function?getMemoryUsage()
????{
????????//?MEMORY
????????if?(false?===?($str?=?@file("/proc/meminfo")))?return?false;
????????$str?=?implode("",?$str);
????????preg_match_all("/MemTotal\s{0,}\:+\s{0,}([\d\.]+).+?MemFree\s{0,}\:+\s{0,}([\d\.]+).+?Cached\s{0,}\:+\s{0,}([\d\.]+).+?SwapTotal\s{0,}\:+\s{0,}([\d\.]+).+?SwapFree\s{0,}\:+\s{0,}([\d\.]+)/s",?$str,?$buf);
????????//preg_match_all("/Buffers\s{0,}\:+\s{0,}([\d\.]+)/s",?$str,?$buffers);
????????$memTotal?=?round($buf[1][0]/1024,?2);
????????$memFree?=?round($buf[2][0]/1024,?2);
????????$memUsed?=?$memTotal?-?$memFree;
????????$memPercent?=?(floatval($memTotal)!=0)???round($memUsed/$memTotal*100,2):0;
????????return?$memPercent;
????}
}?我們以場(chǎng)景一為例,在onTask啟用定時(shí)任務(wù),每隔30秒計(jì)算一次內(nèi)存使用率。實(shí)際應(yīng)用中可以把計(jì)算好的內(nèi)存按時(shí)間寫(xiě)入數(shù)據(jù)庫(kù)等存儲(chǔ)中,然后可以根據(jù)前端需求用來(lái)渲染成統(tǒng)計(jì)圖表,如:

接著服務(wù)端代碼 public\taskServer.php :
?
require?dirname(__DIR__)?.?'/vendor/autoload.php';
use?Helloweba\Swoole\Task;
$opt?=?[
????'daemonize'?=>?false
];
?$ser?=?new?Task($opt);
?$ser->start();?客戶端代碼 public\taskClient.php :
?
class?Client
{
????private?$client;
????public?function?__construct()?{
????????$this->client?=?new?swoole_client(SWOOLE_SOCK_TCP);
????}
????public?function?connect()?{
????????if(?!$this->client->connect("127.0.0.1",?9506?,?1)?)?{
????????????echo?"Error:?{$this->client->errMsg}[{$this->client->errCode}]\n";
????????}
????????fwrite(STDOUT,?"請(qǐng)輸入消息?Please?input?msg:");
????????$msg?=?trim(fgets(STDIN));
????????$this->client->send(?$msg?);
????????$message?=?$this->client->recv();
????????echo?"Get?Message?From?Server:{$message}\n";
????}
}
??$client?=?new?Client();
??$client->connect();?驗(yàn)證效果
1.啟動(dòng)服務(wù)端:
php?taskServer.php2.客戶端輸入:
另開(kāi)命令行窗口,執(zhí)行
[root@localhost?public]#?php?taskClient.php?
請(qǐng)輸入消息?Please?input?msg:hello
Get?Message?From?Server:{"result":"success"}
[root@localhost?public]#?3.服務(wù)端返回:

如果返回上圖中的結(jié)果,則定時(shí)任務(wù)正常運(yùn)行,我們會(huì)發(fā)現(xiàn)每隔30秒會(huì)輸出一條信息。
往期推薦
