fastrpc高性能跨平臺c++協(xié)程rpc框架
用過go erlang gevent的親們應(yīng)該都會知道協(xié)程在應(yīng)用中帶來的方便。
如果對協(xié)程不理解的同學,通過閱讀下面例子可以快速了解我們框架的協(xié)程的意義,已了解的可以跳過這部分。
協(xié)程例子:假設(shè)我們要發(fā)個Get請求獲取百度首頁內(nèi)容;
php同步方式:$result = file_get_contents("http://www.baidu.com"), php果然是世界上最好的語言,多么簡潔。
然后java和c++的同學開始不屑了: "呵呵, 同步,鄙視你不解釋。"
好了那就異步吧,php也有異步的方式啦,例如使用騰訊的swoole框架,
<?php
$cli = new swoole_http_client('127.0.0.1', 8993);
$cli->get('ww.baidu.com', function ($cli) {
echo $cli->body;
});
執(zhí)行到get請求時,當前進程不阻塞等待,可以繼續(xù)去處理后面的事情,等到響應(yīng)結(jié)果返回再去執(zhí)行回調(diào)function里的代碼。
這樣是不是就完美了呢,我們來看看異步的問題:
這里這個代碼被拆分成兩個塊調(diào)用$cli->get函數(shù)之前塊和回調(diào)函數(shù)里面的塊,問題就出在這里,現(xiàn)在假設(shè)有一個需求,我要給發(fā)Get請求之前的一個html變量追加百度返回的內(nèi)容,那么我必須這么做:
<?php
$html = "init";
$cli = new swoole_http_client('127.0.0.1', 8993);
$cli->html = $html;
$cli->get('w.baidu.com', function ($cli) {
echo $cli->html.$cli->body;
});
也就是說get之前和回調(diào)函數(shù)里是兩個不同的代碼區(qū)域,要在兩個不同代碼區(qū)域連續(xù)使用一個局部變量,那么我們的做法就是必須傳值(傳值有很多種方式了,這里不一一列舉).
也許有的同學會說就傳個值而已嘛,也沒什么不好的呀。好那么我們來看看一段實際項目中的代碼(不用真的去看懂):
//代碼塊1
$url = $_SERVER['HTTP_HOST'] . $_SERVER["REQUEST_URI"];
$this->history_add( $wid, $sku_id, 2 );
$smarty = get_smarty();
$sku = new Sku( $sku_id );
$goods = new Goods( $sku->goods_id );
$smarty->assign('pinlei_name', $goods->goods_name );
$smarty->assign("title_name", PageListWorksItem::trans_goods_name( $goods->goods_name ));
$this->goods_id = $goods->goods_id;
$work = new Works( $wid );
$work_data = $work->get_data();
$s = new ImgArgs( $work_data );
//插入點 在這里插入獲取百度首頁內(nèi)容
//代碼塊2
$merge_img_url = $s->get_product_merge_img_url( $sku_id , 2 );
$work_data['img_url'] = $merge_img_url;
$work_data['real_price'] = $work_data['money'] + $sku->price;
// $reply_count = WorksComment::get_works_comment_list_count2( $sku->goods_id );
$reply_count = WorksComment::get_works_comment_list_count( $sku->goods_id );
$smarty->assign('reply_count', $reply_count );
...
后面代碼省略,其中以上所有對象和變量在后續(xù)代碼中都使用到
在上面插入點這個地方 我們需要調(diào)用異步發(fā)請求去獲取百度首頁內(nèi)容,然后在代碼塊2中使用到百度的響應(yīng)結(jié)果,
這樣我們就要把代碼塊2這部分全部遷移到回調(diào)函數(shù)中,接著修改的同學就要抓狂了,要把代碼塊1中一個一個變量去檢查,看后續(xù)是否用到,然后一個一個去傳值給回調(diào)函數(shù)。
同理,在中間插入點,我們要做其它操作,例如查mysql 查redis時,同樣會出現(xiàn)上述 同步和異步的選擇困難問題。
好,到這里使用js的同學表示不服了,這有什么難,看我大js:
function asynGetBaidu(){
var url = "xxx";
var html = "init";
$.get(url, {}, function(data,status) {
alert(html+data);
});
}
你看,不是不需要傳值在回調(diào)內(nèi)也可以使用外部變量嗎?
這是因為,js在底層閉包的自動傳值機制,把asynGetBaidu函數(shù)內(nèi)的變量都可以在回調(diào)函數(shù)內(nèi)作用。
那么問題又來了,如果原來函數(shù)是有返回值的,我們在這加了異步get后返回值還能有效嗎?
function asynGetBaidu(){
var url = "xxx";
var html = "init";
$.get(url, {}, function(data,status) {
html = html+data;
return html;
});
}
var res = asynGetBaidu();
alert(res);
彈出結(jié)果是undefine。有人又說了,真無聊,網(wǎng)頁端可以通過全局變量,或網(wǎng)頁標簽取返回值就行了,這里返回值沒什么意思啦。
是的對網(wǎng)頁端來說jquery功能完全夠用了,但是對服務(wù)器轉(zhuǎn)發(fā)請求來說,我們又能設(shè)多少個這樣的全局變量呢?(最新的es6 es7已經(jīng)支持協(xié)程新特性,可查詢koa的 yield 或 async/await使用)
那怎么辦呢?我們假設(shè)場景是這樣: 在服務(wù)器然后端 processer函數(shù)用來處理客戶端來的http請求,然后再轉(zhuǎn)向百度獲取信息,打印返回給客戶端。
來看看經(jīng)過我們封裝后python的實現(xiàn)(人生苦短,我用python):
import gevent.monkey
gevent.monkey.patch_socket()
def asynGetBaidu():
html = "init"
...
f=urllib.urlopen("ww.baidu.com")
...
return html + f.read()
def processer(self, request, response):
response = asynGetBaidu()
print response
坑誰呢,這明明就是同步發(fā)GET請求。
沒錯這就是協(xié)程的作用了,同步的編碼方式,異步的效果。上述代碼要在我們fastpy框架下使用才會真的起到異步的效果。
每當有一個http請求到來時,框架會新開一個協(xié)程,這個協(xié)程執(zhí)行processer函數(shù),當執(zhí)行到urllib.urlopen時,因為有io阻塞會主動放棄對cpu的使用權(quán),讓給誰呢?讓給下一個協(xié)程,下個協(xié)程又會執(zhí)行processer去處理下一個http請求。
等到baidu有響應(yīng)結(jié)果了,框架引擎會恢復之前暫停的協(xié)程,繼續(xù)執(zhí)行processer內(nèi)剩余的代碼,這樣我們想在asynGetBaidu中插入多少段請求別的服務(wù)的代碼都只需要像同步一樣編寫,不需要調(diào)整上下文代碼結(jié)構(gòu)。
試想想,urlopen處如果是mysql query或redis的get,都可以0代價的同步自動轉(zhuǎn)換為異步,多么方便的一件事啊。
協(xié)程主要原理:當代碼執(zhí)行帶io阻塞時,把當前進程的調(diào)用的服務(wù)函數(shù)里的局部變量和函數(shù)執(zhí)行堆??截惖絼e的地方,等到io返回就緒后,再把原來的堆棧內(nèi)容拷貝回來,然后既可以實現(xiàn)從當前代碼斷點繼續(xù)執(zhí)行,其實和異步中等待執(zhí)行回調(diào)的方式是類似的,只不過這樣封裝后就不用再顯示聲明回調(diào)函數(shù)了。
說到底,我們其實就是在抄go語言的特性啦,而且抄的遠遠不如那樣, 是不是感覺我們在為go做廣告。
有人會說了,go語言那么好,那你們?yōu)楹尾挥冒 ?--回答:在國內(nèi)不好找工作啊。
言歸正傳,看看我們團隊c++和python的兩個協(xié)程解決方案:
python解決方案: fastpy -————本框架是在gevent基礎(chǔ)上封裝而成,主要面向web應(yīng)用:
源代碼只有800多行 項目地址: https://git.oschina.net/feimat/fastpy
性能比較如下
tornado 4kqps 多進程1wqps
nginx+tornado 9kqps
nginx+uwsgi 8kqps
django和webpy 原生性能較差
本server 2w qps
歡迎加入qq群339711102,一起探討優(yōu)化哦
快速入門:
1、啟動:
指定監(jiān)聽端口即可啟動
python fastpy.py 8992
(如果需要使用gevent協(xié)程功能請先安裝gevent,參考鏈接http://www.xue163.com/exploit/138/1381297.html)
2、快速編寫cgi,支持運行時修改,無需重啟server
在fastpy.py同一目錄下
隨便建一個python 文件
例如:
example.py:
#-*- coding:utf-8 -*-
import sys
#定義一個同名example類
#定義一個tt函數(shù):
reload(sys)
sys.setdefaultencoding('utf8')
FastpyAutoUpdate=True
class example():
def tt(self, request, response_head):
#print request.form
#print request.getdic
#fileitem = request.filedic["upload_file"]
#fileitem.filename
#fileitem.file.read()
return "ccb"+request.path
則訪問該函數(shù)的url為 http://ip:port/example.tt
協(xié)程的使用上面已經(jīng)演示過這里不再重復,詳情請看代碼示例
cgi所有使用例子:
sample.py 上傳文件和form表單使用等基本api的例子
example.py 使用單例模式和線程+異步返回的例子
WithGevent/dbsample.py 使用gevent+pymysql實現(xiàn)異步讀寫數(shù)據(jù)庫的例子(gevent下線程池實現(xiàn))
WithGevent/sample.py 使用gevent實現(xiàn)異步發(fā)送http請求的例子
sendfile/sendfile.py 多線程文件上傳服務(wù)端代碼
sendfile/sendfile_client.py 多線程文件上傳客戶端代碼
proxy_server/proxy.py 正向代理服務(wù)器代碼
跨平臺/ 跨平臺版本的fastpy.py
3、支持超大文件上傳下載
默認靜態(tài)文件(包括html,js、css、圖片、文件等)放在static文件夾下
html和js、css會自動壓縮加速
例如把a.jpg放到static文件夾下
訪問的url為 http://ip:port/static/a.jpg
支持etag 客戶端緩存功能
(server 使用sendfile進行文件發(fā)送,不占內(nèi)存且快速)
4、支持網(wǎng)頁模板編寫
模版引擎代碼只有十幾行 在WithGevent/common/core.py 文件里的
class FeimaTpl
模版用法很簡單
1、用于寫python代碼的控制塊 <% for item in data { %>
<% %> 中間支持python的 if else for while等程序控制塊,
不同是模版用{ }來代替python 晦澀的縮進來區(qū)分控制塊范圍
2、取值塊 <%=item["id"]>
<%= %> 里寫的是python的變量指即可,可在1中控制塊內(nèi)使用
下面看個例子
創(chuàng)建一個模板 a.html
<html>
<HEAD><TITLE><%=title%></TITLE></HEAD>
<BODY>
<% for item in data{ %>
<%=item["id"]%>,<%= item["name"] %>
array data:
<% for i in item["array"] {%><%= i %><%}%>
</br>
<%}%>
</BODY>
</html>
則對應(yīng)的使用
from common import core
tpl = core.FeimaTpl(filepath="./a.html")
d = []
d.append({"id":1,"name":"name1","array":[2,4,5,6]})
d.append({"id":2,"name":"name2","array":[1,3,5,7,9]})
tpl.assign("title", "my title")
tpl.assign("data", d)
print tpl.render()
則生成:
<html>
<HEAD><TITLE>my title</TITLE></HEAD>
<BODY>
1,name1
array data:
2456
</br>
2,name2
array data:
13579
</br>
</BODY>
</html>
5、支持http/https透明代理
python proxy.py 8995
啟動后再瀏覽器配置代理即可使用,可以彌補nginx 不支持https代理的不足
c++解決方案: fastrpc——本框架就是為c++語言從應(yīng)用的角度,封裝了盡量易用協(xié)程特性,包括了:
1、協(xié)程下同步編碼異步化(使得mysql/redis/socket在不用修改任何代碼情況下同步自動轉(zhuǎn)異步)
2、協(xié)程下定時器
3、協(xié)程下生產(chǎn)者消費者隊列
4、甚至協(xié)程下的線程池等,
同時結(jié)合了rpc server、http server和游戲中的應(yīng)用,提供完整的協(xié)程示例解決方案。
項目地址: https://git.oschina.net/feimat/fastrpc
下面是例子
演示例子包括:
1、同步、異步方式
2、client發(fā)請求的協(xié)程方式
3、server轉(zhuǎn)發(fā)請求的協(xié)程方式
4、http的請求處理
5、將他人的同步接口變異步
6、協(xié)程怎樣和多線程切換使用
7、定時器使用和單向推送演示
8、網(wǎng)絡(luò)異常處理等回調(diào)的使用
1、同步、異步方式
首先定義 protobuf 文件 (rpc作用和protobuf的作用請自行百度,這里不詳述)
package echo;
message EchoRequest {
required string message = 1;
};
message EchoResponse {
required string response = 1;
};
service EchoService
{
rpc Echo(EchoRequest) returns (EchoResponse);
};
option cc_generic_services = true;
使用protoc 生成代碼 echo.pb.h echo.pb.cc
然后服務(wù)端實現(xiàn) service:
#include "rpc_server.h"
#include "echo.pb.h"
class EchoServiceImpl : public echo::EchoService {
virtual void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
response->set_response(response->response()+" server hello");
if (done) {
done->Run();
}
}
}
服務(wù)端啟動:
int main(int argc, char *argv[])
{
RpcServer server("127.0.0.1", 8996);
::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
server.RegiService(rpc_service);
server.start();
return 0;
}
客戶端如何請求:
同步方式
RpcClient client("192.168.1.13", 8999, 10000,true); // 1host 2port 3超時時間 4是否使用多線程模式
echo::EchoService::Stub stub(&client);
echo::EchoRequest req;
req.set_message("client hello");
echo::EchoResponse res;
CliController controller;
stub.Echo(&controller,&req,&res,NULL); // 最后一個參數(shù)為回調(diào),回調(diào)為空是同步,不為空是異步
// controller 用于記錄運行信息,如超時、錯誤內(nèi)容
std::cout << "is timeout:" << controller.IsTimeOut() << "error:" << controller.ErrorText();
異步方式
Test test;
echo::EchoRequest* request = new echo::EchoRequest();
request->set_message("client hello");
echo::EchoResponse* response = new echo::EchoResponse();
Closure* callback_callback = NULL; // 可以遞歸無限回調(diào)
Closure* callback = pbext::NewCallback(&test,&Test::echo_done,request,response);
stub->Echo(NULL,request,response,callback); // 異步
class Test {
public:
void echo_done(echo::EchoRequest* request, echo::EchoResponse* response, Closure* done) {
std::string res = response->response();
printf("async: %s\n", res.c_str());
if (done) done->Run(); // 如果有下個回調(diào),就繼續(xù)執(zhí)行下個回調(diào)
delete request; // 沒用智能指針c++要記得釋放
deleted response;
}
};
2、client發(fā)請求的協(xié)程方式
上面使用中同步調(diào)用會阻塞一個線程, 異步模式的話又要注冊回調(diào),這樣會導致代碼不可觀。
協(xié)程模式客戶端:
void cro_job(echo::EchoService_Stub::Stub* stub, int i) {
std::stringstream ss;
ss << i;
echo::EchoRequest req;
req.set_message("cli hello "+ ss.str());
echo::EchoResponse res;
stub->Echo(NULL, &req, &res, NULL); // 同步編碼,異步的效果
}
stub = new echo::EchoService_Stub::Stub(client);
for (int i =0; i < try_time; ++i) {
// 這個函數(shù)內(nèi)都是可以同步轉(zhuǎn)異步的
::google::protobuf::Closure* routine =
::google::protobuf::NewCallback(&cro_job,
stub, i);
ProcessWithNewCro(routine);
}
3、server轉(zhuǎn)發(fā)請求的協(xié)程方式
這里client 請求 server1 再請求server2
客戶端和上面代碼一樣使用即可,
Server1調(diào)用 rpc例子
class EchoServiceImpl : public echo::EchoService {
virtual void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
// 再向server2發(fā)請求這里是協(xié)程同步會放權(quán),不用擔心阻塞
echo_service->Echo(NULL, request, response, NULL);
printf("recv request from client and send to server2\n");
response->set_response(response->response()+" add server1 echo");
if (done) {
done->Run();
}
}
public:
RpcClient* m_rpc_client;
}
int main(int argc, char *argv[])
{
RpcServer server("127.0.0.1", 8996);
::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
// 創(chuàng)建rpcclient和server2(8998端口)通信
rpc_service->m_rpc_client = new RpcClient("127.0.0.1", 8996, 5000);
server.RegiService(rpc_service);
server.start();
return 0;
}
在剛剛的server echo實現(xiàn)里,再使用rpcclient 透傳請求給server2。這里使用協(xié)程同步方式,
不會造成線程阻塞,和異步回調(diào)取結(jié)果是一樣的效果。
注意:本框架默認業(yè)務(wù)邏輯worker是單線程協(xié)程的模式,要使用多線程協(xié)程,請參考后面的
線程池使用例子。
4、處理http請求例子
注冊httphandler
class MyHttpHandler : public HttpHandler {
public:
MyHttpHandler(::google::protobuf::RpcChannel* a_rpc_client) {
m_rpc_client = (RpcClient*)a_rpc_client;
echo_service = new echo::EchoService_Stub::Stub(m_rpc_client);
}
virtual void Init(CASyncSvr* svr) {}
void test_tp_run(HttpRequest* request,
::google::protobuf::Closure *done) {
::echo::EchoRequest req;
::echo::EchoResponse res;
req.set_message("browse req");
// 再向server2發(fā)請求這里是協(xié)程同步會放權(quán),不用擔心阻塞
echo_service->Echo(NULL, &req, &res, NULL);
CHttpParser* ps = request->ps;
ps->parse_form_body();
std::string kk = ps->get_param("kk");
string str_cmd = ps->get_object();
string get_uri = ps->get_uri();
std::stringstream ss;
ss << "kk:" << kk << "<br/>"
<< "cmd:" << str_cmd << "<br/>"
<< "uri:" << get_uri << "<br/>"
<< "rpc res:" << res.DebugString();
std::string content_type = "text/html";
std::string add_head = "Connection: keep-alive\r\n";
CHttpResponseMaker::make_string(res.response(),request->response,content_type,add_head);
if (done) done->Run();
}
virtual void Finish(CASyncSvr* svr) {}
RpcClient* m_rpc_client;
echo::EchoService_Stub::Stub* echo_service;
}
int main(int argc, char *argv[])
{
RpcServer server("127.0.0.1", 8996);
::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
// 創(chuàng)建rpcclient和server2(8998端口)通信
rpc_service->m_rpc_client = new RpcClient("127.0.0.1", 8996, 5000);
server.RegiService(rpc_service);
HttpHandler *http_handler = new MyHttpHandler(rpc_service->m_rpc_client);
server.RegiHttpHandler(http_handler);
server.start();
return 0;
}
瀏覽器訪問地址為 http://127.0.0.1:8999/static/index.html?kk=23423424
支持http頭解釋、url解釋和postform解析,一般作為網(wǎng)頁http接入接口然后再通過rpc和內(nèi)部服務(wù)器之間通信
5、將他人的同步接口變異步
在很多時候,我們業(yè)務(wù)中可能需要用到別人的同步接口,例如mysql查詢數(shù)據(jù)庫,這個
時候協(xié)程里就會有阻塞,會一定程度上影響性能(python框架里是通過加這兩句實現(xiàn):import gevent.monkey gevent.monkey.patch_socket() )。
我們這里提供了對sys sockethook,當執(zhí)行到系統(tǒng)的socket操作時,會hook住然后放權(quán),把事件觸發(fā)
交給epoll處理,當有事件過來時再resume執(zhí)行,這樣就不會造成線程阻塞并且可以開很多的同步并發(fā)了。
// 開始sys hook后
// 系統(tǒng)的socket操作函數(shù)遇到阻塞會自動放權(quán)
// 不用修改任何代碼,可將同步socket變異步
co_enable_hook_sys();
XSocket sock;
sock.open(SOCK_STREAM);
if (sock.connect(XSockAddr("127.0.0.1", 8998), 1000) < 0) {
printf("connect fail\n");
abort();
}
std::stringstream ss;
ss << "GET http://127.0.0.1:8998/aaa?kk=" << i << " HTTP/1.1\r\n"
<< "Host: 127.0.0.1:8998\r\n"
<< "\r\n";
std::string send_str = ss.str();
printf("%d send %d\n", sock.get_handle(), i);
sock.send_n(send_str.c_str(), send_str.size(), 100000);
string res;
sock.recv_one_http(res, 2000);
printf("%d recv %s\n", sock.get_handle(), res.c_str());
if (++recv_c == try_time) {
printf("send all finish\n");
}
sock.close();
co_disable_hook_sys();
6、協(xié)程和多線程混合編程(線程池的使用)
目前我們只提供了socket的hook。如果我們使用中需要用到一些非c socket的同步接口
例如c++調(diào)用python, 在python內(nèi)部會調(diào)用到一些同步接口。
這個時候我們做常用的做法是起另外一個線程去做然后yield,做完了再回來告訴我,繼續(xù)往下做。
我們針對協(xié)程提供了線程池接口:
class EchoServiceImpl : public echo::EchoService {
void TestThreadPool(::echo::EchoResponse* response) {
response->set_response(response->response() + " add tp echo");
}
virtual void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
echo_service->Echo(NULL, request, response, NULL);
printf("recv request from client and send to server2\n");
response->set_response(response->response()+" add server1 echo");
// 切換到多線程執(zhí)行,放權(quán),等線程池執(zhí)行完后resume
tp_mgr->TPRun(this, &EchoServiceImpl::TestThreadPool, response);
if (done) {done->Run();}
}
public:
RpcServer* _rpc_server;
RpcClient* m_rpc_client;
echo::EchoService_Stub::Stub* echo_service;
TPMgr* tp_mgr;
上面例子中我們起了一個線程池,當要支付時就把他扔到線程去做,當前協(xié)程yield放權(quán),等到線程池
把支付任務(wù)處理完了,再resume,繼續(xù)進行下面的任務(wù)。
另外在線程池內(nèi)部也會起協(xié)程,所以內(nèi)部調(diào)用rpcclient也是會自動同步轉(zhuǎn)異步的哦。
線程池還有異步接口TPAsynRun. (注意還有AsynRun這個宏,用于給當前進程異步執(zhí)行指定函數(shù)用)
可以看到多線程和協(xié)程之間切換編程多么輕松。
7、單向推送例子與定時器使用
這里的功能需求是 client 向server發(fā)去一個請求后,server會注冊一個定時任務(wù),每隔一秒向client發(fā)一個消息,連續(xù)發(fā)5次。
首先客戶端要注冊單向推送處理handler
// mes name: echo.EchoResponse
void ext_processer(std::string mes_name, std::string data, void* param) {
::google::protobuf::Message* response = PbMgr::CreateMessage(mes_name);
if (response) {
response->ParseFromString(data);
std::cout << "recv push mes, name:" << mes_name
<< " data:" << response->DebugString();
}
}
RpcClient client("192.168.1.13", 8999, 10000,true); // 1host 2port 3超時時間 4是否使用多線程模式
echo::EchoService::Stub stub(&client);
client.RegiExtProcesser(ext_processer, NULL);
服務(wù)器則要在收到請求后注冊定時任務(wù)
class EchoServiceImpl : public echo::EchoService {
void PeriodPush(CASyncSvr* svr, unsigned cli_flow) {
::echo::EchoResponse response;
response.set_response("period push mes");
RpcServer::PushToClient(svr, cli_flow, &response); // 這是服務(wù)器向客戶端定時推消息
}
virtual void Echo(::google::protobuf::RpcController* controller,
const ::echo::EchoRequest* request,
::echo::EchoResponse* response,
::google::protobuf::Closure* done) {
echo_service->Echo(NULL, request, response, NULL);
// 演示定時推送
RpcController* p_con = (RpcController*)controller;
unsigned cli_flow = p_con->_cli_flow;
CASyncSvr* svr = p_con->_svr;
Closure<void>* period_job =
NewPermanentClosure(this, &EchoServiceImpl::PeriodPush, svr, cli_flow); // 注意這里一定要用permanentclosure
timer_mgr.AddJob(1000, period_job, 5);
tp_mgr->TPRun(this, &EchoServiceImpl::TestThreadPool, response);
if (done) {
done->Run();
}
}
public:
TimerMgr timer_mgr;
然后就客戶端發(fā)出一次請求后就可以看到陸續(xù)收到5個推送消息,每個相隔一秒
recv push mes, name:echo.EchoResponse data:response: "period push mes"
由上面我們看到協(xié)程結(jié)合線程池、定時器編程是多么輕松。
自由的控制定時任務(wù)由哪些線程或協(xié)程按分什么順序執(zhí)行,完全面向?qū)ο蟮木幋a風格,同步的編碼方式,獲得異步的效果。
8、斷開事件處理
功能需求是假設(shè)是游戲客戶端,在客戶端異常斷開時,服務(wù)器應(yīng)該要觸發(fā)事件,以便服務(wù)器在這里改變游戲角色的在線狀態(tài),另外客戶端也應(yīng)該在與服務(wù)器斷開是有提示或做相應(yīng)重連處理
服務(wù)器注冊斷開事件處理函數(shù)
int close_handler(CASyncSvr* svr, unsigned cli_flow, void* param) {
std::stringstream ss;
ss << "svr_id:" << svr->_svr_id
<< " cli:" << cli_flow
<< " param:" << *((int*)param) << "\n";
//printf(ss.str().c_str());
return 0;
}
int main(int argc, char *argv[])
{
RpcServer server("127.0.0.1", 8996);
int p = 123; // 附帶參數(shù)
server.RegiClientCloseHandler(close_handler, &p);
}
客戶端和上面服務(wù)器一樣定義處理函數(shù)和注冊即可
以上所有handler 包括http、close 和單向推送的handler都是在協(xié)程里面處理的,也就是說handler里面都是可以使用 rpc client協(xié)程同步模式去向別的服務(wù)器發(fā)請求,而不會阻塞線程的
兩個框架項目地址: https://git.oschina.net/feimat
歡迎加入qq群339711102,一起探討優(yōu)化哦
