php-nsqNSQ 的 PHP 客戶端
php-nsq
php-nsq 是nsq的php客戶端,采用c擴展編寫,性能和穩(wěn)定性。
安裝 :
請?zhí)崆鞍惭blibevent
Dependencies: libevent (apt-get install libevent-dev ,yum install libevent-devel) 1. sudo phpize 2. ./configure 3. make 4. make install add in your php.ini: extension = nsq.so;
pub例子:
$nsqdAddr = array(
"127.0.0.1:4150",
"127.0.0.1:4154"
);
$nsq = new Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);
for($i = 0; $i < 10000; $i++){
$nsq->publish("test", "nihao");
}
$nsq->closeNsqdConnection();
// Deferred publish
//function : deferredPublish(string topic,string message, int millisecond);
//millisecond default : [0 < millisecond < 3600000]
$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
$deferred->deferredPublish("test", "message daly", 3000);
}
$deferred->closeNsqdConnection();
sub例子:
<?php
//sub
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => 2, //optional , default 1
"connect_num" => 1, //optional , default 1
"retry_delay_time" => 5000, //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
"auto_finish" => true, //default true
);
$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){
echo $msg->payload;
echo $msg->attempts;
echo $msg->message_id;
echo $msg->timestamp;
});
Nsq 類方法:
connectNsqd($nsqdAddrArr)
pub的時候連接nsq,你也可以利用此函數(shù)做健康檢查closeNsqdConnection()
關閉nsq的連接publish($topic,$msg)
消息發(fā)送deferredPublish($topic,$msg,$msec)
延遲消息發(fā)送subscribe($nsq_lookupd,$config,$callback)
消息訂閱
Message 類方法與屬性:
timestamp
消息時間戳attempts
消息的重試次數(shù),(從1開始)message_id
消息idpayload
消息內容finish($bev,$msg->message_id)
主動的 ack消息方法touch($bev,$msg->message_id)
如果你消息執(zhí)行太長,可以利用次函數(shù)告知nsq 你還活著,一般用于執(zhí)行頻率比較規(guī)律的場景。
Tips :
1.如果callback內需要外部變量,可以采用以下use的寫法:
$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev) use ($you_variable){
echo $msg->payload;
echo $msg->attempts;
echo $msg->message_id;
echo $msg->timestamp;
});
2.消息重試,只要拋異常就可以,切記不要陷入死循環(huán),超過自己覺得可以的次數(shù) 要return:
subscribe($nsq_lookupd, $config, function($msg){
try{
echo $msg->payload . " " . "attempts:".$msg->attempts."\n";
//do something
}catch(Exception $e){
if($msg->attempts < 3){
//the message will be retried after you configure retry_delay_time
throw new Exception("");
}else{
echo $e->getMessage();
return;
}
}
});
3.如果你想增加 客戶端的心跳時間與消息的超時時間 :
第一步 在nsqd啟動時要加入相關參數(shù),這個參數(shù)是最大的限制,比如--max-heartbeat-interval=1m30s 心跳時間最大不能超過1分30秒: nsqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-interval=1m30s --msg-timeout=10m30s 第二步 因為第一步是指定最大時間,所以還需要第二步在客戶端指定所需要的值 具體請看 example目錄中的identify開頭的文件例子。
4.如果你想增強消費能力,可以加大rdy參數(shù)
5.你可以用supervisor管理,但是因為是多進程消費,你需要在supervisor job的配置文件 添加:
stopasgroup=true killasgroup=true
Changes
3.0
修復因libevent 超過4096消息被截斷問題
增加identify指令功能,可以增加客戶端心跳時間 與 消息超時時間
2.4.0
修復 pub bug
修復 sub coredump
修覆蓋 touch bug
增加等待,當剛初始化的topic沒消息時
2.3.1
pub支持域名
修復 pub coredump
