<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          fastrpc高性能跨平臺c++協(xié)程rpc框架

          聯(lián)合創(chuàng)作 · 2023-09-23 10:18


          用過go erlang gevent的親們應(yīng)該都會知道協(xié)程在應(yīng)用中帶來的方便。

          如果對協(xié)程不理解的同學,通過閱讀下面例子可以快速了解我們框架的協(xié)程的意義,已了解的可以跳過這部分。

          協(xié)程例子:假設(shè)我們要發(fā)個Get請求獲取百度首頁內(nèi)容;

          php同步方式:$result = file_get_contents("http://www.baidu.com"), php果然是世界上最好的語言,多么簡潔。

          然后java和c++的同學開始不屑了: "呵呵, 同步,鄙視你不解釋。"

          好了那就異步吧,php也有異步的方式啦,例如使用騰訊的swoole框架,

          <?php
          
          $cli = new swoole_http_client('127.0.0.1', 8993);
          
          $cli->get('ww.baidu.com', function ($cli) {
          
              echo $cli->body;
          
          });
          

           執(zhí)行到get請求時,當前進程不阻塞等待,可以繼續(xù)去處理后面的事情,等到響應(yīng)結(jié)果返回再去執(zhí)行回調(diào)function里的代碼。

          這樣是不是就完美了呢,我們來看看異步的問題:

          這里這個代碼被拆分成兩個塊調(diào)用$cli->get函數(shù)之前塊和回調(diào)函數(shù)里面的塊,問題就出在這里,現(xiàn)在假設(shè)有一個需求,我要給發(fā)Get請求之前的一個html變量追加百度返回的內(nèi)容,那么我必須這么做:

          <?php
          
          $html = "init";
          
          $cli = new swoole_http_client('127.0.0.1', 8993);
          
          $cli->html = $html;
          
          $cli->get('w.baidu.com', function ($cli) {
          
              echo $cli->html.$cli->body;
          
          });
          

          也就是說get之前和回調(diào)函數(shù)里是兩個不同的代碼區(qū)域,要在兩個不同代碼區(qū)域連續(xù)使用一個局部變量,那么我們的做法就是必須傳值(傳值有很多種方式了,這里不一一列舉).

          也許有的同學會說就傳個值而已嘛,也沒什么不好的呀。好那么我們來看看一段實際項目中的代碼(不用真的去看懂):

          //代碼塊1
          
          $url = $_SERVER['HTTP_HOST'] . $_SERVER["REQUEST_URI"];
          
          $this->history_add( $wid, $sku_id, 2 );
          
          $smarty = get_smarty();
          
          $sku = new Sku( $sku_id );
          
          $goods = new Goods( $sku->goods_id );
          
          $smarty->assign('pinlei_name', $goods->goods_name );
          
          $smarty->assign("title_name", PageListWorksItem::trans_goods_name( $goods->goods_name ));
          
          $this->goods_id = $goods->goods_id;
          
          $work = new Works( $wid ); 
          
          $work_data = $work->get_data();
          
          $s = new ImgArgs( $work_data );
          
          //插入點 在這里插入獲取百度首頁內(nèi)容
          
          //代碼塊2
          
          $merge_img_url = $s->get_product_merge_img_url(  $sku_id , 2  ); 
          
          $work_data['img_url'] = $merge_img_url;
          
          $work_data['real_price'] = $work_data['money'] + $sku->price;
          
          // $reply_count = WorksComment::get_works_comment_list_count2(  $sku->goods_id );
          
          $reply_count = WorksComment::get_works_comment_list_count(  $sku->goods_id );
          
          $smarty->assign('reply_count', $reply_count );
          
          ...
          
          后面代碼省略,其中以上所有對象和變量在后續(xù)代碼中都使用到
          

           在上面插入點這個地方 我們需要調(diào)用異步發(fā)請求去獲取百度首頁內(nèi)容,然后在代碼塊2中使用到百度的響應(yīng)結(jié)果,

          這樣我們就要把代碼塊2這部分全部遷移到回調(diào)函數(shù)中,接著修改的同學就要抓狂了,要把代碼塊1中一個一個變量去檢查,看后續(xù)是否用到,然后一個一個去傳值給回調(diào)函數(shù)。

          同理,在中間插入點,我們要做其它操作,例如查mysql 查redis時,同樣會出現(xiàn)上述 同步和異步的選擇困難問題。

           好,到這里使用js的同學表示不服了,這有什么難,看我大js:

          function asynGetBaidu(){ 
          
                  var url = "xxx";
          
                  var html = "init";
          
                  $.get(url, {}, function(data,status) {
          
                      alert(html+data);
          
                  });
          
          }
          

           你看,不是不需要傳值在回調(diào)內(nèi)也可以使用外部變量嗎?

          這是因為,js在底層閉包的自動傳值機制,把asynGetBaidu函數(shù)內(nèi)的變量都可以在回調(diào)函數(shù)內(nèi)作用。

          那么問題又來了,如果原來函數(shù)是有返回值的,我們在這加了異步get后返回值還能有效嗎?

           function asynGetBaidu(){ 
          
                  var url = "xxx";
          
                  var html = "init";
          
                  $.get(url, {}, function(data,status) {
          
                      html = html+data;
          
                      return html;
          
                  });
          
          }
          
          var res = asynGetBaidu();
          
          alert(res);
          

           彈出結(jié)果是undefine。有人又說了,真無聊,網(wǎng)頁端可以通過全局變量,或網(wǎng)頁標簽取返回值就行了,這里返回值沒什么意思啦。

          是的對網(wǎng)頁端來說jquery功能完全夠用了,但是對服務(wù)器轉(zhuǎn)發(fā)請求來說,我們又能設(shè)多少個這樣的全局變量呢?(最新的es6 es7已經(jīng)支持協(xié)程新特性,可查詢koa的 yield 或 async/await使用

          那怎么辦呢?我們假設(shè)場景是這樣: 在服務(wù)器然后端 processer函數(shù)用來處理客戶端來的http請求,然后再轉(zhuǎn)向百度獲取信息,打印返回給客戶端。

          來看看經(jīng)過我們封裝后python的實現(xiàn)(人生苦短,我用python):

          import gevent.monkey
          
          gevent.monkey.patch_socket() 
          
          def asynGetBaidu():
          
              html = "init"
          
              ...
          
              f=urllib.urlopen("ww.baidu.com")
          
              ...
          
              return html + f.read()
          
              
          
          def processer(self, request, response):
          
              response = asynGetBaidu()
          
              print response
          

           坑誰呢,這明明就是同步發(fā)GET請求。

          沒錯這就是協(xié)程的作用了,同步的編碼方式,異步的效果。上述代碼要在我們fastpy框架下使用才會真的起到異步的效果。

          每當有一個http請求到來時,框架會新開一個協(xié)程,這個協(xié)程執(zhí)行processer函數(shù),當執(zhí)行到urllib.urlopen時,因為有io阻塞會主動放棄對cpu的使用權(quán),讓給誰呢?讓給下一個協(xié)程,下個協(xié)程又會執(zhí)行processer去處理下一個http請求。

          等到baidu有響應(yīng)結(jié)果了,框架引擎會恢復之前暫停的協(xié)程,繼續(xù)執(zhí)行processer內(nèi)剩余的代碼,這樣我們想在asynGetBaidu中插入多少段請求別的服務(wù)的代碼都只需要像同步一樣編寫,不需要調(diào)整上下文代碼結(jié)構(gòu)。

          試想想,urlopen處如果是mysql query或redis的get,都可以0代價的同步自動轉(zhuǎn)換為異步,多么方便的一件事啊。

           協(xié)程主要原理:當代碼執(zhí)行帶io阻塞時,把當前進程的調(diào)用的服務(wù)函數(shù)里的局部變量和函數(shù)執(zhí)行堆??截惖絼e的地方,等到io返回就緒后,再把原來的堆棧內(nèi)容拷貝回來,然后既可以實現(xiàn)從當前代碼斷點繼續(xù)執(zhí)行,其實和異步中等待執(zhí)行回調(diào)的方式是類似的,只不過這樣封裝后就不用再顯示聲明回調(diào)函數(shù)了。

           

          說到底,我們其實就是在抄go語言的特性啦,而且抄的遠遠不如那樣, 是不是感覺我們在為go做廣告。

          有人會說了,go語言那么好,那你們?yōu)楹尾挥冒 ?--回答:在國內(nèi)不好找工作啊。

          言歸正傳,看看我們團隊c++和python的兩個協(xié)程解決方案:

          python解決方案: fastpy -————本框架是在gevent基礎(chǔ)上封裝而成,主要面向web應(yīng)用:

          源代碼只有800多行 項目地址: https://git.oschina.net/feimat/fastpy

          性能比較如下

          tornado 4kqps 多進程1wqps
          nginx+tornado 9kqps
          nginx+uwsgi 8kqps

          django和webpy 原生性能較差

          本server 2w qps
          歡迎加入qq群339711102,一起探討優(yōu)化哦

          快速入門:

          1、啟動:
             指定監(jiān)聽端口即可啟動
             python fastpy.py 8992
             (如果需要使用gevent協(xié)程功能請先安裝gevent,參考鏈接http://www.xue163.com/exploit/138/1381297.html

          2、快速編寫cgi,支持運行時修改,無需重啟server

             在fastpy.py同一目錄下
             隨便建一個python 文件
             例如:
             example.py:

             #-*- coding:utf-8 -*-
          
             import sys
          
             #定義一個同名example類
          
             #定義一個tt函數(shù):
          
             reload(sys)
          
             sys.setdefaultencoding('utf8')
          
             FastpyAutoUpdate=True
          
             class example():
          
                 def tt(self, request, response_head):
          
                     #print request.form
          
                     #print request.getdic
          
                     #fileitem = request.filedic["upload_file"]
          
                     #fileitem.filename
          
                     #fileitem.file.read()
          
                     return "ccb"+request.path
          

             則訪問該函數(shù)的url為 http://ip:port/example.tt
             協(xié)程的使用上面已經(jīng)演示過這里不再重復,詳情請看代碼示例
              cgi所有使用例子:
              sample.py  上傳文件和form表單使用等基本api的例子
              example.py 使用單例模式和線程+異步返回的例子
              WithGevent/dbsample.py  使用gevent+pymysql實現(xiàn)異步讀寫數(shù)據(jù)庫的例子(gevent下線程池實現(xiàn))
              WithGevent/sample.py    使用gevent實現(xiàn)異步發(fā)送http請求的例子
              sendfile/sendfile.py         多線程文件上傳服務(wù)端代碼
              sendfile/sendfile_client.py  多線程文件上傳客戶端代碼
              proxy_server/proxy.py        正向代理服務(wù)器代碼
              跨平臺/   跨平臺版本的fastpy.py

          3、支持超大文件上傳下載
             默認靜態(tài)文件(包括html,js、css、圖片、文件等)放在static文件夾下
             html和js、css會自動壓縮加速
             例如把a.jpg放到static文件夾下
             訪問的url為 http://ip:port/static/a.jpg
             支持etag 客戶端緩存功能
             (server 使用sendfile進行文件發(fā)送,不占內(nèi)存且快速)

          4、支持網(wǎng)頁模板編寫
             模版引擎代碼只有十幾行 在WithGevent/common/core.py 文件里的
             class FeimaTpl
             模版用法很簡單
             1、用于寫python代碼的控制塊 <% for item in data { %>
             <% %> 中間支持python的 if else for while等程序控制塊,
             不同是模版用{ }來代替python 晦澀的縮進來區(qū)分控制塊范圍
             2、取值塊 <%=item["id"]>
             <%= %> 里寫的是python的變量指即可,可在1中控制塊內(nèi)使用
            
             下面看個例子
             創(chuàng)建一個模板 a.html

              <html>
          
                  <HEAD><TITLE><%=title%></TITLE></HEAD>
          
                  <BODY>
          
                      <% for item in data{ %>
          
                      <%=item["id"]%>,<%= item["name"] %>
          
                      array data:
          
                      <% for i in item["array"] {%><%= i %><%}%>
          
                      </br>
          
                      <%}%>
          
                  </BODY>
          
              </html>
          

             則對應(yīng)的使用

             from common import core
          
             tpl = core.FeimaTpl(filepath="./a.html")
          
             d = []
          
             d.append({"id":1,"name":"name1","array":[2,4,5,6]})
          
             d.append({"id":2,"name":"name2","array":[1,3,5,7,9]})
          
             tpl.assign("title", "my title")
          
             tpl.assign("data", d)
          
             print tpl.render()
          

             

             則生成:
              <html>
                  <HEAD><TITLE>my title</TITLE></HEAD>
                  <BODY>
                      1,name1
                      array data:
                      2456
                      </br>   
                      2,name2
                      array data:
                      13579
                      </br>
                  </BODY>
              </html>

          5、支持http/https透明代理
             python proxy.py 8995
             啟動后再瀏覽器配置代理即可使用,可以彌補nginx 不支持https代理的不足

           

           

          c++解決方案: fastrpc——本框架就是為c++語言從應(yīng)用的角度,封裝了盡量易用協(xié)程特性,包括了:

          1、協(xié)程下同步編碼異步化(使得mysql/redis/socket在不用修改任何代碼情況下同步自動轉(zhuǎn)異步)

          2、協(xié)程下定時器

          3、協(xié)程下生產(chǎn)者消費者隊列

          4、甚至協(xié)程下的線程池等,

          同時結(jié)合了rpc server、http server和游戲中的應(yīng)用,提供完整的協(xié)程示例解決方案。

           項目地址: https://git.oschina.net/feimat/fastrpc

          下面是例子

          演示例子包括:

          1、同步、異步方式

          2、client發(fā)請求的協(xié)程方式

          3、server轉(zhuǎn)發(fā)請求的協(xié)程方式

          4、http的請求處理

          5、將他人的同步接口變異步

          6、協(xié)程怎樣和多線程切換使用

          7、定時器使用和單向推送演示

          8、網(wǎng)絡(luò)異常處理等回調(diào)的使用

           

          1、同步、異步方式

          首先定義 protobuf 文件 (rpc作用和protobuf的作用請自行百度,這里不詳述)

          package echo;
          
          message EchoRequest {
          
          required string message = 1;
          
          };
          
          message EchoResponse {
          
          required string response = 1;
          
          };
          
          service EchoService
          
          {
          
          rpc Echo(EchoRequest) returns (EchoResponse);
          
          };
          
          option cc_generic_services = true;
          

          使用protoc 生成代碼 echo.pb.h echo.pb.cc

          然后服務(wù)端實現(xiàn) service:

          #include "rpc_server.h"
          
          #include "echo.pb.h" 
          
          class EchoServiceImpl : public echo::EchoService {
          
              virtual void Echo(::google::protobuf::RpcController* controller,
          
                                const ::echo::EchoRequest* request,
          
                                ::echo::EchoResponse* response,
          
                                ::google::protobuf::Closure* done) {
          
                  response->set_response(response->response()+" server hello");
          
                  if (done) {
          
                      done->Run();
          
                  }
          
              }
          
          }
          

          服務(wù)端啟動:

          int main(int argc, char *argv[])
          
          {
          
              RpcServer server("127.0.0.1", 8996);
          
              ::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
          
              server.RegiService(rpc_service);
          
              server.start();
          
              return 0;
          
          }
          

          客戶端如何請求:

          同步方式

          RpcClient client("192.168.1.13", 8999, 10000,true); // 1host 2port 3超時時間 4是否使用多線程模式
          
          echo::EchoService::Stub stub(&client);
          
          echo::EchoRequest req;
          
          req.set_message("client hello");
          
          echo::EchoResponse res;
          
          CliController controller;
          
          stub.Echo(&controller,&req,&res,NULL); // 最后一個參數(shù)為回調(diào),回調(diào)為空是同步,不為空是異步
          
          // controller 用于記錄運行信息,如超時、錯誤內(nèi)容
          
          std::cout << "is timeout:" << controller.IsTimeOut() << "error:" << controller.ErrorText();
          

          異步方式

          Test test; 
          
          echo::EchoRequest* request = new echo::EchoRequest();
          
          request->set_message("client hello");
          
          echo::EchoResponse* response = new echo::EchoResponse();
          
          Closure* callback_callback = NULL; // 可以遞歸無限回調(diào)
          
          Closure* callback = pbext::NewCallback(&test,&Test::echo_done,request,response);
          
          stub->Echo(NULL,request,response,callback); // 異步
          
          class Test {
          
          public:
          
              void echo_done(echo::EchoRequest* request, echo::EchoResponse* response, Closure* done) {
          
                  std::string res = response->response();
          
                  printf("async: %s\n", res.c_str());
          
                  if (done) done->Run(); // 如果有下個回調(diào),就繼續(xù)執(zhí)行下個回調(diào)
          
                  delete request; // 沒用智能指針c++要記得釋放
          
                  deleted response;
          
              }
          
          };
          

          2、client發(fā)請求的協(xié)程方式

              上面使用中同步調(diào)用會阻塞一個線程, 異步模式的話又要注冊回調(diào),這樣會導致代碼不可觀。

          協(xié)程模式客戶端:

           void cro_job(echo::EchoService_Stub::Stub* stub, int i) {
          
              std::stringstream ss;
          
              ss << i;
          
              echo::EchoRequest req;
          
              req.set_message("cli hello  "+ ss.str());
          
              echo::EchoResponse res;
          
              stub->Echo(NULL, &req, &res, NULL); // 同步編碼,異步的效果
          
           }
          
              stub = new echo::EchoService_Stub::Stub(client);
          
              for (int i =0; i < try_time; ++i) {
          
                  // 這個函數(shù)內(nèi)都是可以同步轉(zhuǎn)異步的
          
                  ::google::protobuf::Closure* routine =
          
                      ::google::protobuf::NewCallback(&cro_job,
          
                                                      stub, i);
          
                  ProcessWithNewCro(routine);
          
              }
          

           

          3、server轉(zhuǎn)發(fā)請求的協(xié)程方式

          這里client 請求 server1 再請求server2

          客戶端和上面代碼一樣使用即可,

          Server1調(diào)用 rpc例子

          class EchoServiceImpl : public echo::EchoService {
          
              virtual void Echo(::google::protobuf::RpcController* controller,
          
                                const ::echo::EchoRequest* request,
          
                                ::echo::EchoResponse* response,
          
                                ::google::protobuf::Closure* done) {
          
                  // 再向server2發(fā)請求這里是協(xié)程同步會放權(quán),不用擔心阻塞
          
                  echo_service->Echo(NULL, request, response, NULL);
          
                  printf("recv request from client and send to server2\n");
          
                  response->set_response(response->response()+" add server1 echo");
          
                  if (done) {
          
                      done->Run();
          
                  }
          
              }
          
          public:
          
          RpcClient* m_rpc_client;
          
          }
          
          int main(int argc, char *argv[])
          
          {
          
              RpcServer server("127.0.0.1", 8996);
          
              ::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
          
              // 創(chuàng)建rpcclient和server2(8998端口)通信
          
              rpc_service->m_rpc_client = new RpcClient("127.0.0.1", 8996, 5000);  
          
              server.RegiService(rpc_service);
          
              server.start();
          
              return 0;
          
          }
          

          在剛剛的server echo實現(xiàn)里,再使用rpcclient 透傳請求給server2。這里使用協(xié)程同步方式,

          不會造成線程阻塞,和異步回調(diào)取結(jié)果是一樣的效果。

          注意:本框架默認業(yè)務(wù)邏輯worker是單線程協(xié)程的模式,要使用多線程協(xié)程,請參考后面的

          線程池使用例子。

          4、處理http請求例子

          注冊httphandler

          class MyHttpHandler : public HttpHandler {
          
          public:
          
              MyHttpHandler(::google::protobuf::RpcChannel* a_rpc_client) {
          
                  m_rpc_client = (RpcClient*)a_rpc_client;
          
                  echo_service = new echo::EchoService_Stub::Stub(m_rpc_client);
          
              }
          
              virtual void Init(CASyncSvr* svr) {}
          
              void test_tp_run(HttpRequest* request,
          
                      ::google::protobuf::Closure *done) {
          
                  
          
                  ::echo::EchoRequest req;
          
                  ::echo::EchoResponse res;
          
                  req.set_message("browse req");
          
                  
          
                  // 再向server2發(fā)請求這里是協(xié)程同步會放權(quán),不用擔心阻塞
          
                  echo_service->Echo(NULL, &req, &res, NULL);
          
                  
          
                  CHttpParser* ps = request->ps;
          
                  ps->parse_form_body();
          
                  std::string kk = ps->get_param("kk");
          
                  string str_cmd = ps->get_object();
          
                  string get_uri = ps->get_uri();
          
                  std::stringstream ss;
          
                  ss << "kk:" << kk << "<br/>"
          
                      << "cmd:" << str_cmd << "<br/>"
          
                      << "uri:" << get_uri << "<br/>"
          
                      << "rpc res:" << res.DebugString();
          
                  
          
                  std::string content_type = "text/html";
          
                  std::string add_head = "Connection: keep-alive\r\n";
          
                  CHttpResponseMaker::make_string(res.response(),request->response,content_type,add_head);
          
                  if (done) done->Run();
          
              }
          
              virtual void Finish(CASyncSvr* svr) {}
          
              RpcClient* m_rpc_client;
          
              echo::EchoService_Stub::Stub* echo_service; 
          
           }
          
          int main(int argc, char *argv[])
          
          {
          
              RpcServer server("127.0.0.1", 8996);
          
              ::google::protobuf::Service *rpc_service = new EchoServiceImpl(&server);
          
              // 創(chuàng)建rpcclient和server2(8998端口)通信
          
              rpc_service->m_rpc_client = new RpcClient("127.0.0.1", 8996, 5000);  
          
              server.RegiService(rpc_service);
          
              HttpHandler *http_handler = new MyHttpHandler(rpc_service->m_rpc_client);
          
              server.RegiHttpHandler(http_handler);
          
              server.start();
          
              return 0;
          
          }
          

          瀏覽器訪問地址為 http://127.0.0.1:8999/static/index.html?kk=23423424

          支持http頭解釋、url解釋和postform解析,一般作為網(wǎng)頁http接入接口然后再通過rpc和內(nèi)部服務(wù)器之間通信

          5、將他人的同步接口變異步

             在很多時候,我們業(yè)務(wù)中可能需要用到別人的同步接口,例如mysql查詢數(shù)據(jù)庫,這個

          時候協(xié)程里就會有阻塞,會一定程度上影響性能(python框架里是通過加這兩句實現(xiàn):import gevent.monkey gevent.monkey.patch_socket() )。

             我們這里提供了對sys sockethook,當執(zhí)行到系統(tǒng)的socket操作時,會hook住然后放權(quán),把事件觸發(fā)

          交給epoll處理,當有事件過來時再resume執(zhí)行,這樣就不會造成線程阻塞并且可以開很多的同步并發(fā)了。

              // 開始sys hook后
          
              // 系統(tǒng)的socket操作函數(shù)遇到阻塞會自動放權(quán)
          
              // 不用修改任何代碼,可將同步socket變異步
          
              co_enable_hook_sys();
          
              XSocket sock;
          
              sock.open(SOCK_STREAM);
          
              if (sock.connect(XSockAddr("127.0.0.1", 8998), 1000) < 0) {
          
                  printf("connect fail\n");
          
                  abort();
          
              }
          
              std::stringstream ss;
          
              ss << "GET http://127.0.0.1:8998/aaa?kk=" << i << " HTTP/1.1\r\n"
          
                  << "Host: 127.0.0.1:8998\r\n"
          
                  << "\r\n";
          
              std::string send_str = ss.str();
          
              printf("%d send %d\n", sock.get_handle(), i);
          
              sock.send_n(send_str.c_str(), send_str.size(), 100000);
          
              string res;
          
              sock.recv_one_http(res, 2000);
          
              printf("%d recv %s\n", sock.get_handle(), res.c_str());
          
              if (++recv_c == try_time) {
          
                  printf("send all finish\n");
          
              }
          
              sock.close();
          
              co_disable_hook_sys();
          

          6、協(xié)程和多線程混合編程(線程池的使用)

             目前我們只提供了socket的hook。如果我們使用中需要用到一些非c socket的同步接口

          例如c++調(diào)用python, 在python內(nèi)部會調(diào)用到一些同步接口。

          這個時候我們做常用的做法是起另外一個線程去做然后yield,做完了再回來告訴我,繼續(xù)往下做。

          我們針對協(xié)程提供了線程池接口:

          class EchoServiceImpl : public echo::EchoService {
          
              void TestThreadPool(::echo::EchoResponse* response) {
          
                  response->set_response(response->response() + " add tp echo");
          
              }
          
              virtual void Echo(::google::protobuf::RpcController* controller,
          
                                const ::echo::EchoRequest* request,
          
                                ::echo::EchoResponse* response,
          
                                ::google::protobuf::Closure* done) {
          
                  echo_service->Echo(NULL, request, response, NULL);
          
                  printf("recv request from client and send to server2\n");
          
                  response->set_response(response->response()+" add server1 echo");
          
                  // 切換到多線程執(zhí)行,放權(quán),等線程池執(zhí)行完后resume
          
                  tp_mgr->TPRun(this, &EchoServiceImpl::TestThreadPool, response);
          
                  if (done) {done->Run();}
          
              }
          
          public:
          
              RpcServer* _rpc_server;
          
              RpcClient* m_rpc_client;
          
              echo::EchoService_Stub::Stub* echo_service;
          
              TPMgr* tp_mgr;
          

          上面例子中我們起了一個線程池,當要支付時就把他扔到線程去做,當前協(xié)程yield放權(quán),等到線程池

          把支付任務(wù)處理完了,再resume,繼續(xù)進行下面的任務(wù)。

          另外在線程池內(nèi)部也會起協(xié)程,所以內(nèi)部調(diào)用rpcclient也是會自動同步轉(zhuǎn)異步的哦。

          線程池還有異步接口TPAsynRun. (注意還有AsynRun這個宏,用于給當前進程異步執(zhí)行指定函數(shù)用)

          可以看到多線程和協(xié)程之間切換編程多么輕松。

          7、單向推送例子與定時器使用

          這里的功能需求是 client 向server發(fā)去一個請求后,server會注冊一個定時任務(wù),每隔一秒向client發(fā)一個消息,連續(xù)發(fā)5次。

          首先客戶端要注冊單向推送處理handler

          // mes name: echo.EchoResponse
          
          void ext_processer(std::string mes_name, std::string data, void* param) {
          
              ::google::protobuf::Message* response = PbMgr::CreateMessage(mes_name);
          
              if (response) {
          
                  response->ParseFromString(data);
          
                  std::cout << "recv push mes, name:" << mes_name
          
                      << " data:" << response->DebugString();
          
              }
          
          }
          
          RpcClient client("192.168.1.13", 8999, 10000,true); // 1host 2port 3超時時間 4是否使用多線程模式
          
          echo::EchoService::Stub stub(&client); 
          
          client.RegiExtProcesser(ext_processer, NULL);
          

          服務(wù)器則要在收到請求后注冊定時任務(wù)

          class EchoServiceImpl : public echo::EchoService {
          
              void PeriodPush(CASyncSvr* svr, unsigned cli_flow) {
          
                  ::echo::EchoResponse response;
          
                  response.set_response("period push mes");
          
                  RpcServer::PushToClient(svr, cli_flow, &response); // 這是服務(wù)器向客戶端定時推消息
          
              }
          
              virtual void Echo(::google::protobuf::RpcController* controller,
          
                                const ::echo::EchoRequest* request,
          
                                ::echo::EchoResponse* response,
          
                                ::google::protobuf::Closure* done) {
          
                  echo_service->Echo(NULL, request, response, NULL);
          
                  // 演示定時推送
          
                  RpcController* p_con = (RpcController*)controller;
          
                  unsigned cli_flow = p_con->_cli_flow;
          
                  CASyncSvr* svr = p_con->_svr;
          
                  Closure<void>* period_job =
          
                      NewPermanentClosure(this, &EchoServiceImpl::PeriodPush, svr, cli_flow); // 注意這里一定要用permanentclosure
          
                  timer_mgr.AddJob(1000, period_job, 5);
          
              
          
                  tp_mgr->TPRun(this, &EchoServiceImpl::TestThreadPool, response);
          
              
          
                  if (done) {
          
                      done->Run();
          
                  }
          
              }
          
          public:
          
              TimerMgr timer_mgr;
          

          然后就客戶端發(fā)出一次請求后就可以看到陸續(xù)收到5個推送消息,每個相隔一秒

          recv push mes, name:echo.EchoResponse data:response: "period push mes"

          由上面我們看到協(xié)程結(jié)合線程池、定時器編程是多么輕松。

          自由的控制定時任務(wù)由哪些線程或協(xié)程按分什么順序執(zhí)行,完全面向?qū)ο蟮木幋a風格,同步的編碼方式,獲得異步的效果。

          8、斷開事件處理

          功能需求是假設(shè)是游戲客戶端,在客戶端異常斷開時,服務(wù)器應(yīng)該要觸發(fā)事件,以便服務(wù)器在這里改變游戲角色的在線狀態(tài),另外客戶端也應(yīng)該在與服務(wù)器斷開是有提示或做相應(yīng)重連處理

          服務(wù)器注冊斷開事件處理函數(shù)

          int close_handler(CASyncSvr* svr, unsigned cli_flow, void* param) {
          
              std::stringstream ss;
          
              ss << "svr_id:" << svr->_svr_id
          
                  << " cli:" << cli_flow
          
                  << " param:" << *((int*)param) << "\n";
          
              //printf(ss.str().c_str());
          
              return 0;
          
          }
          
          int main(int argc, char *argv[])
          
          {
          
              RpcServer server("127.0.0.1", 8996);
          
              int p = 123; // 附帶參數(shù)
          
              server.RegiClientCloseHandler(close_handler, &p);
          
          }
          

           客戶端和上面服務(wù)器一樣定義處理函數(shù)和注冊即可

          以上所有handler 包括http、close 和單向推送的handler都是在協(xié)程里面處理的,也就是說handler里面都是可以使用 rpc client協(xié)程同步模式去向別的服務(wù)器發(fā)請求,而不會阻塞線程的

          兩個框架項目地址: https://git.oschina.net/feimat

          歡迎加入qq群339711102,一起探討優(yōu)化哦


          瀏覽 26
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          編輯 分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          編輯 分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  无吗一区区三区四区 | 插BB免费看 | 成人在线看网站 | 围内精品久久久久久久久久变脸 | 偷窥AV在线 |