<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          go-zero微服務(wù)實戰(zhàn)系列(八、如何處理每秒上萬次的下單請求)

          共 8885字,需瀏覽 18分鐘

           ·

          2022-07-26 05:48

          在前幾篇的文章中,我們花了很大的篇幅介紹如何利用緩存優(yōu)化系統(tǒng)的讀性能,究其原因在于我們的產(chǎn)品大多是一個讀多寫少的場景,尤其是在產(chǎn)品的初期,可能多數(shù)的用戶只是過來查看商品,真正下單的用戶非常少。但隨著業(yè)務(wù)的發(fā)展,我們就會遇到一些高并發(fā)寫請求的場景,秒殺搶購就是最典型的高并發(fā)寫場景。在秒殺搶購開始后用戶就會瘋狂的刷新頁面讓自己盡早的看到商品,所以秒殺場景同時也是高并發(fā)讀場景。那么應(yīng)對高并發(fā)讀寫場景我們怎么進行優(yōu)化呢?

          處理熱點數(shù)據(jù)

          秒殺的數(shù)據(jù)通常都是熱點數(shù)據(jù),處理熱點數(shù)據(jù)一般有幾種思路:一是優(yōu)化,二是限制,三是隔離。

          優(yōu)化

          優(yōu)化熱點數(shù)據(jù)最有效的辦法就是緩存熱點數(shù)據(jù),我們可以把熱點數(shù)據(jù)緩存到內(nèi)存緩存中。

          限制

          限制更多的是一種保護機制,當秒殺開始后用戶就會不斷地刷新頁面獲取數(shù)據(jù),這時候我們可以限制單用戶的請求次數(shù),比如一秒鐘只能請求一次,超過限制直接返回錯誤,返回的錯誤盡量對用戶友好,比如 "店小二正在忙" 等友好提示。

          隔離

          秒殺系統(tǒng)設(shè)計的第一個原則就是將這種熱點數(shù)據(jù)隔離出來,不要讓1%的請求影響到另外的99%,隔離出來后也更方便對這1%的請求做針對性的優(yōu)化。具體到實現(xiàn)上,我們需要做服務(wù)隔離,即秒殺功能獨立為一個服務(wù),通知要做數(shù)據(jù)隔離,秒殺所調(diào)用的大部分是熱點數(shù)據(jù),我們需要使用單獨的Redis集群和單獨的Mysql,目的也是不想讓1%的數(shù)據(jù)有機會影響99%的數(shù)據(jù)。

          流量削峰

          針對秒殺場景,它的特點是在秒殺開始那一剎那瞬間涌入大量的請求,這就會導致一個特別高的流量峰值。但最終能夠搶到商品的人數(shù)是固定的,也就是不管是100人還是10000000人發(fā)起請求的結(jié)果都是一樣的,并發(fā)度越高,無效的請求也就越多。但是從業(yè)務(wù)角度來說,秒殺活動是希望有更多的人來參與的,也就是秒殺開始的時候希望有更多的人來刷新頁面,但是真正開始下單時,請求并不是越多越好。因此我們可以設(shè)計一些規(guī)則,讓并發(fā)請求更多的延緩,甚至可以過濾掉一些無效的請求。

          削峰本質(zhì)上是要更多的延緩用戶請求的發(fā)出,以便減少和過濾掉一些無效的請求,它遵從請求數(shù)要盡量少的原則。我們最容易想到的解決方案是用消息隊列來緩沖瞬時的流量,把同步的直接調(diào)用轉(zhuǎn)換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑的將消息推送出去,如下圖所示:

          采用消息隊列異步處理后,那么秒殺的結(jié)果是不太好同步返回的,所以我們的思路是當用戶發(fā)起秒殺請求后,同步返回響應(yīng)用戶 "秒殺結(jié)果正在計算中..." 的提示信息,當計算完之后我們?nèi)绾畏祷亟Y(jié)果給用戶呢?其實也是有多種方案的。

          • 一是在頁面中采用輪詢的方式定時主動去服務(wù)端查詢結(jié)果,例如每秒請求一次服務(wù)端看看有沒有處理結(jié)果,這種方式的缺點是服務(wù)端的請求數(shù)會增加不少。
          • 二是主動push的方式,這種就要求服務(wù)端和客戶端保持長連接了,服務(wù)端處理完請求后主動push給客戶端,這種方式的缺點是服務(wù)端的連接數(shù)會比較多。

          還有一個問題就是如果異步的請求失敗了該怎么辦?我覺得對于秒殺場景來說,失敗了就直接丟棄就好了,最壞的結(jié)果就是這個用戶沒有搶到而已。如果想要盡量的保證公平的話,那么失敗了以后也可以做重試。

          如何保證消息只被消費一次

          kafka是能夠保證"At Least Once"的機制的,即消息不會丟失,但有可能會導致重復消費,消息一旦被重復消費那么就會造成業(yè)務(wù)邏輯處理的錯誤,那么我們?nèi)绾伪苊庀⒌闹貜拖M呢?

          我們只要保證即使消費到了重復的消息,從消費的最終結(jié)果來看和只消費一次的結(jié)果等同就好了,也就是保證在消息的生產(chǎn)和消費的過程是冪等的。什么是冪等呢?如果我們消費一條消息的時候,要給現(xiàn)有的庫存數(shù)量減1,那么如果消費兩條相同的消息就給庫存的數(shù)量減2,這就不是冪等的。而如果消費一條消息后處理邏輯是將庫存的數(shù)量設(shè)置為0,或者是如果當前庫存的數(shù)量為10時則減1,這樣在消費多條消息時所得到的結(jié)果就是相同的,這就是冪等的。說白了就是一件事無論你做多少次和做一次產(chǎn)生的結(jié)果都是一樣的,那么這就是冪等性。

          我們可以在消息被消費后,把唯一id存儲在數(shù)據(jù)庫中,這里的唯一id可以使用用戶id和商品id的組合,在處理下一條消息之前先從數(shù)據(jù)庫中查詢這個id看是否被消費過,如果消費過就放棄。偽代碼如下:

          isConsume := getByID(id)
          if isConsume {
            return  

          process(message)
          save(id)

          還有一種方式是通過數(shù)據(jù)庫中的唯一索引來保證冪等性,不過這個要看具體的業(yè)務(wù),在這里不再贅述。

          代碼實現(xiàn)

          整個秒殺流程圖如下:

          使用kafka作為消息隊列,所以要先在本地安裝kafka,我使用的是mac可以用homebrew直接安裝,kafka依賴zookeeper也會自動安裝

          brew install kafka

          安裝完后通過brew services start啟動zookeeper和kafka,kafka默認偵聽在9092端口

          brew services start zookeeper

          brew services start kafka

          seckill-rpc的SeckillOrder方法實現(xiàn)秒殺邏輯,我們先限制用戶的請求次數(shù),比如限制用戶每秒只能請求一次,這里使用go-zero提供的PeriodLimit功能實現(xiàn),如果超出限制直接返回

          code, _ := l.limiter.Take(strconv.FormatInt(in.UserId, 10))
          if code == limit.OverQuota {
            return nil, status.Errorf(codes.OutOfRange, "Number of requests exceeded the limit")
          }

          接著查看當前搶購商品的庫存,如果庫存不足就直接返回,如果庫存足夠的話則認為可以進入下單流程,發(fā)消息到kafka,這里kafka使用go-zero提供的kq庫,非常簡單易用,為秒殺新建一個Topic,配置初始化和邏輯如下:

          Kafka:
            Addrs:
              - 127.0.0.1:9092
            SeckillTopic: seckill-topic
          KafkaPusher: kq.NewPusher(c.Kafka.Addrs, c.Kafka.SeckillTopic)
          p, err := l.svcCtx.ProductRPC.Product(l.ctx, &product.ProductItemRequest{ProductId: in.ProductId})
          if err != nil {
            return nil, err
          }
          if p.Stock <= 0 {
            return nil, status.Errorf(codes.OutOfRange, "Insufficient stock")
          }
          kd, err := json.Marshal(&KafkaData{Uid: in.UserId, Pid: in.ProductId})
          if err != nil {
            return nil, err
          }
          if err := l.svcCtx.KafkaPusher.Push(string(kd)); err != nil {
            return nil, err
          }

          seckill-rmq消費seckill-rpc生產(chǎn)的數(shù)據(jù)進行下單操作,我們新建seckill-rmq服務(wù),結(jié)構(gòu)如下:

          tree ./rmq

          ./rmq
          ├── etc
          │   └── seckill.yaml
          ├── internal
          │   ├── config
          │   │   └── config.go
          │   └── service
          │       └── service.go
          └── seckill.go

          4 directories, 4 files

          依然是使用kq初始化啟動服務(wù),這里我們需要注冊一個ConsumeHand方法,該方法用以消費kafka數(shù)據(jù)

          srv := service.NewService(c)
          queue := kq.MustNewQueue(c.Kafka, kq.WithHandle(srv.Consume))
          defer queue.Stop()

          fmt.Println("seckill started!!!")
          queue.Start()

          在Consume方法中,消費到數(shù)據(jù)后先反序列化,然后調(diào)用product-rpc查看當前商品的庫存,如果庫存足夠的話我們認為可以下單,調(diào)用order-rpc進行創(chuàng)建訂單操作,最后再更新庫存

          func (s *Service) Consume(_ string, value string) error {
            logx.Infof("Consume value: %s\n", value)
            var data KafkaData
            if err := json.Unmarshal([]byte(value), &data); err != nil {
              return err
            }
            p, err := s.ProductRPC.Product(context.Background(), &product.ProductItemRequest{ProductId: data.Pid})
            if err != nil {
              return err
            }
            if p.Stock <= 0 {
              return nil
            }
            _, err = s.OrderRPC.CreateOrder(context.Background(), &order.CreateOrderRequest{Uid: data.Uid, Pid: data.Pid})
            if err != nil {
              logx.Errorf("CreateOrder uid: %d pid: %d error: %v", data.Uid, data.Pid, err)
              return err
            }
            _, err = s.ProductRPC.UpdateProductStock(context.Background(), &product.UpdateProductStockRequest{ProductId: data.Pid, Num: 1})
            if err != nil {
              logx.Errorf("UpdateProductStock uid: %d pid: %d error: %v", data.Uid, data.Pid, err)
              return err
            }
            // TODO notify user of successful order placement
            return nil
          }

          在創(chuàng)建訂單過程中涉及到兩張表orders和orderitem,所以我們要使用本地事務(wù)進行插入,代碼如下:

          func (m *customOrdersModel) CreateOrder(ctx context.Context, oid string, uid, pid int64) error {
            _, err := m.ExecCtx(ctx, func(ctx context.Context, conn sqlx.SqlConn) (sql.Result, error) {
              err := conn.TransactCtx(ctx, func(ctx context.Context, session sqlx.Session) error {
                _, err := session.ExecCtx(ctx, "INSERT INTO orders(id, userid) VALUES(?,?)", oid, uid)
                if err != nil {
                  return err
                }
                _, err = session.ExecCtx(ctx, "INSERT INTO orderitem(orderid, userid, proid) VALUES(?,?,?)""", uid, pid)
                return err
              })
              return nil, err
            })
            return err
          }

          訂單號生成邏輯如下,這里使用時間加上自增數(shù)進行訂單生成

          var num int64

          func genOrderID(t time.Time) string {
            s := t.Format("20060102150405")
            m := t.UnixNano()/1e6 - t.UnixNano()/1e9*1e3
            ms := sup(m, 3)
            p := os.Getpid() % 1000
            ps := sup(int64(p), 3)
            i := atomic.AddInt64(&num, 1)
            r := i % 10000
            rs := sup(r, 4)
            n := fmt.Sprintf("%s%s%s%s", s, ms, ps, rs)
            return n
          }

          func sup(i int64, n int) string {
            m := fmt.Sprintf("%d", i)
            for len(m) < n {
              m = fmt.Sprintf("0%s", m)
            }
            return m
          }

          最后分別啟動product-rpc、order-rpc、seckill-rpc和seckill-rmq服務(wù)還有zookeeper、kafka、mysql和redis,啟動后我們調(diào)用seckill-rpc進行秒殺下單

          grpcurl -plaintext -d '{"user_id": 111, "product_id": 10}' 127.0.0.1:9889 seckill.Seckill.SeckillOrder

          在seckill-rmq中打印了消費記錄,輸出如下

          {"@timestamp":"2022-06-26T10:11:42.997+08:00","caller":"service/service.go:35","content":"Consume value: {\"uid\":111,\"pid\":10}\n","level":"info"}

          這個時候查看orders表中已經(jīng)創(chuàng)建了訂單,同時商品庫存減一

          結(jié)束語

          本質(zhì)上秒殺是一個高并發(fā)讀和高并發(fā)寫的場景,上面我們介紹了秒殺的注意事項以及優(yōu)化點,我們這個秒殺場景相對來說比較簡單,但其實也沒有一個通用的秒殺的框架,我們需要根據(jù)實際的業(yè)務(wù)場景進行優(yōu)化,不同量級的請求優(yōu)化的手段也不盡相同。這里我們只展示了服務(wù)端的相關(guān)優(yōu)化,但對于秒殺場景來說整個請求鏈路都是需要優(yōu)化的,比如對于靜態(tài)數(shù)據(jù)我們可以使用CDN做加速,為了防止流量洪峰我們可以在前端設(shè)置答題功能等等。

          希望本篇文章對你有所幫助,謝謝。

          代碼倉庫: https://github.com/zhoushuguang/lebron



          推薦閱讀


          福利

          我為大家整理了一份從入門到進階的Go學習資料禮包,包含學習建議:入門看什么,進階看什么。關(guān)注公眾號 「polarisxu」,回復 ebook 獲取;還可以回復「進群」,和數(shù)萬 Gopher 交流學習。

          瀏覽 56
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  在线观看高清无码 | 东京热视频一区 | 欧美一级A片在免费看 | 日韩一区二区不卡视频 | 黄视频大几吧操逼 |