<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Netty 如何做到單機(jī)秒級接收 35 萬個對象?

          共 12125字,需瀏覽 25分鐘

           ·

          2021-06-22 16:54


          -     前言     - 


          單純netty結(jié)合protostuff進(jìn)行rpc對象傳輸?shù)膁emo網(wǎng)上有很多,大部分都是一個模子刻出來的,一開始我也是抄了一個,本地測試暢通無阻,未發(fā)生任何異常。


          部署預(yù)發(fā)環(huán)境,進(jìn)行壓測后,問題巨多,各種報錯層出不窮。當(dāng)然,壓測時我用的數(shù)據(jù)量大、發(fā)送請求非常密集,單機(jī)是每秒前100ms發(fā)送2萬個對象,其他900ms歇息,死循環(huán)發(fā)送,共計40臺機(jī)器作為客戶端,同時往2臺netty Server服務(wù)器發(fā)送對象,那么平均每個server每秒大概要接收40萬個對象,由于后面還有業(yè)務(wù)邏輯,邏輯每秒只能處理35萬實測。


          對于網(wǎng)上的代碼,進(jìn)行了多次修改,反復(fù)測試,最終是達(dá)到了不報錯無異常,單機(jī)秒級接收35萬個對象以上,故寫篇文章記錄一下,文中代碼會和線上邏輯保持一致。



          -     Protostuff 序列化和反序列化     - 


          這個沒什么特殊的,網(wǎng)上找個工具類就好了。


          引入pom:


          <protostuff.version>1.7.2</protostuff.version>
          <dependency>
              <groupId>io.protostuff</groupId>
              <artifactId>protostuff-core</artifactId>
              <version>${protostuff.version}</version>
          </dependency>

          <dependency>
              <groupId>io.protostuff</groupId>
              <artifactId>protostuff-runtime</artifactId>
              <version>${protostuff.version}</version>
          </dependency>

          public class ProtostuffUtils {
              /**
               * 避免每次序列化都重新申請Buffer空間
               * 這句話在實際生產(chǎn)上沒有意義,耗時減少的極小,但高并發(fā)下,如果還用這個buffer,會報異常說buffer還沒清空,就又被使用了
               */

          //    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
              /**
               * 緩存Schema
               */

              private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
           
              /**
               * 序列化方法,把指定對象序列化成字節(jié)數(shù)組
               *
               * @param obj
               * @param <T>
               * @return
               */

              @SuppressWarnings("unchecked")
              public static <T> byte[] serialize(T obj) {
                  Class<T> clazz = (Class<T>) obj.getClass();
                  Schema<T> schema = getSchema(clazz);
                  LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
                  byte[] data;
                  try {
                      data = ProtobufIOUtil.toByteArray(obj, schema, buffer);
          //            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
                  } finally {
                      buffer.clear();
                  }
           
                  return data;
              }
           
              /**
               * 反序列化方法,將字節(jié)數(shù)組反序列化成指定Class類型
               *
               * @param data
               * @param clazz
               * @param <T>
               * @return
               */

              public static <T> deserialize(byte[] data, Class<T> clazz) {
                  Schema<T> schema = getSchema(clazz);
                  T obj = schema.newMessage();
                  ProtobufIOUtil.mergeFrom(data, obj, schema);
          //        ProtostuffIOUtil.mergeFrom(data, obj, schema);
                  return obj;
              }
           
              @SuppressWarnings("unchecked")
              private static <T> Schema<T> getSchema(Class<T> clazz) {
                  Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
                  if (Objects.isNull(schema)) {
                      //這個schema通過RuntimeSchema進(jìn)行懶創(chuàng)建并緩存
                      //所以可以一直調(diào)用RuntimeSchema.getSchema(),這個方法是線程安全的
                      schema = RuntimeSchema.getSchema(clazz);
                      if (Objects.nonNull(schema)) {
                          schemaCache.put(clazz, schema);
                      }
                  }
           
                  return schema;
              }
          }


          此處有坑,就是最上面大部分網(wǎng)上代碼都是用了static的buffer。在單線程情況下沒有問題。在多線程情況下,非常容易出現(xiàn)buffer一次使用后尚未被clear,就再次被另一個線程使用,會拋異常。而所謂的避免每次都申請buffer空間,實測性能影響極其微小。


          另里面兩次ProtostuffIOUtil都改成了ProtobufIOUtil,因為也是出過異常,修改后未見有異常。



          -     自定義序列化方式     - 


          解碼器decoder:


          import com.jd.platform.hotkey.common.model.HotKeyMsg;
          import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
          import io.netty.buffer.ByteBuf;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.handler.codec.ByteToMessageDecoder;
           
          import java.util.List;
           
          /**
           * @author wuweifeng
           * @version 1.0
           * @date 2020-07-29
           */

          public class MsgDecoder extends ByteToMessageDecoder {
              @Override
              protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) {
                  try {
           
                      byte[] body = new byte[in.readableBytes()];  //傳輸正常
                      in.readBytes(body);
           
                      list.add(ProtostuffUtils.deserialize(body, HotKeyMsg.class));
           
          //            if (in.readableBytes() < 4) {
          //                return;
          //            }
          //            in.markReaderIndex();
          //            int dataLength = in.readInt();
          //            if (dataLength < 0) {
          //                channelHandlerContext.close();
          //            }
          //            if (in.readableBytes() < dataLength) {
          //                in.resetReaderIndex();
          //                return;
          //            }
          //
          //            byte[] data = new byte[dataLength];
          //            in.readBytes(data);
          //
          //            Object obj = ProtostuffUtils.deserialize(data, HotKeyMsg.class);
          //            list.add(obj);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }

          編碼器 encoder:


          import com.jd.platform.hotkey.common.model.HotKeyMsg;
          import com.jd.platform.hotkey.common.tool.Constant;
          import com.jd.platform.hotkey.common.tool.ProtostuffUtils;
          import io.netty.buffer.ByteBuf;
          import io.netty.channel.ChannelHandlerContext;
          import io.netty.handler.codec.MessageToByteEncoder;
           
          /**
           * @author wuweifeng
           * @version 1.0
           * @date 2020-07-30
           */

          public class MsgEncoder extends MessageToByteEncoder {
           
              @Override
              public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {
                  if (in instanceof HotKeyMsg) {
                      byte[] bytes = ProtostuffUtils.serialize(in);
                      byte[] delimiter = Constant.DELIMITER.getBytes();
           
                      byte[] total = new byte[bytes.length + delimiter.length];
                      System.arraycopy(bytes, 0, total, 0, bytes.length);
                      System.arraycopy(delimiter, 0, total, bytes.length, delimiter.length);
           
                      out.writeBytes(total);
                  }
              }
          }


          先看Decoder解碼器,這個是用來netty收到消息后,進(jìn)行解碼,將字節(jié)轉(zhuǎn)為對象(自定義的HotKeyMsg)用的。里面有一堆被我注釋掉了,注釋掉的,應(yīng)該在網(wǎng)上找到的帖子都是那么寫的。這種方式本身在普通場景下是沒問題的,解碼還算正常,但是當(dāng)上幾十萬時非常容易出現(xiàn)粘包問題。所以我是在這個解碼器前增加了一個DelimiterBasedFrameDecoder分隔符解碼器。


          當(dāng)收到消息時,先過這個分隔符解碼器,之后到MsgDecoder那里時,就是已經(jīng)分隔好的一個對象字節(jié)流了,就可以直接用proto工具類進(jìn)行反序列化的。Constant.DELIMITER是我自定義的一個特殊字符串,用來做分隔符。


          再看encoder,編碼器,首先將要傳輸?shù)膶ο笥肞rotostuffUtils序列化為byte[],然后在尾巴上掛上我自定義的那個分隔符。這樣在對外發(fā)送對象時,就會走這個編碼器,并被加上分隔符。


          對應(yīng)的server端代碼大概是這樣:



          之后在Handler里就可以直接使用這個傳輸?shù)膶ο罅恕?/span>


          再看client端:



          和Server端是一樣的,也是這幾個編解碼器,沒有區(qū)別。因為netty和server之間通訊,我都是用的同一個對象定義。



          同理handler也是一樣的。


          -     單機(jī)和集群     - 


          以上都寫完后,其實就可以測試了,我們可以啟動一個client,一個server,然后搞個死循環(huán)往Server發(fā)這個對象了,然后你在server端在收到這個對象后,再直接把這個對象也寫回來,原樣發(fā)送到客戶端。會發(fā)現(xiàn)運(yùn)行的很順暢,每秒發(fā)N萬個沒問題,編解碼都正常,client和server端都比較正常,當(dāng)前前提是ProtoBuf的工具類和我的一樣,不要共享那個buffer。網(wǎng)上找的文章基本上到這樣也就結(jié)束了,隨便發(fā)幾個消息沒問題也就算OK。然而實際上,這種代碼上線后,會坑的不要不要的。


          其實本地測試也很容易,再啟動幾個客戶端,都連同一個Server,然后給他死循環(huán)發(fā)對象,再看看兩端會不會有異常。這種情況下,和第一種的區(qū)別其實客戶端沒什么變化,Server端就有變化了,之前同時只給一個client發(fā)消息,現(xiàn)在同時給兩個client發(fā)消息,這一步如果不謹(jǐn)慎就會出問題了,建議自行嘗試。


          之后,我們再加點料,我啟動兩個Server,分別用兩個端口,線上其實是兩臺不同的server服務(wù)器,client會同時往兩臺server死循環(huán)發(fā)對象,如下圖代碼。


          發(fā)消息,我們常用的就是channel.writeAndFlush(),大家可以把那個sync去掉,然后跑一下代碼看看。會發(fā)現(xiàn)異常拋的一坨一坨的。我們明明是往兩個不同的channel發(fā)消息,只不過時間是同時,結(jié)果就是發(fā)生了嚴(yán)重的粘包。server端收到的消息很多都是不規(guī)范的,會大量報錯。如果在兩個channel發(fā)送間隔100ms,情況就解決了。當(dāng)然,最終我們可以使用sync同步發(fā)送,這樣就不會拋異常了。



          以上代碼經(jīng)測試,40臺client,2臺Server,平均每個server每秒大概接收40萬個對象,可以持續(xù)穩(wěn)定運(yùn)行。

          者:天涯淚小武

          來源:

          https://blog.csdn.net/tianyaleixiaowu/article/details/107714868

          瀏覽 38
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  国产精品视频播放豆花网址 | 午夜成人一区二区 | 作爱视频免费 | 一起草视频网 | 亚洲特级毛片精品久久 |