<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手寫線程池 - C++版

          共 19268字,需瀏覽 39分鐘

           ·

          2021-09-28 07:03

          《手寫線程池 - C語言版》中,已經(jīng)實(shí)現(xiàn)了 C 語言版的線程池,如果我們也學(xué)過 C++ 的話,可以將其改為 C++ 版本,這樣代碼不管是從使用還是從感觀上都會(huì)更簡潔一些。

          對(duì)這些代碼做從 C 到 C++ 的遷移主要用到了 C++ 三大特性中的封裝,因此難度不大,對(duì)應(yīng) C++ 初學(xué)者來說有助于提高編碼水平和對(duì)面向?qū)ο蟮睦斫?,?duì)于熟練掌握了 C++ 的人來說就是張飛吃豆芽 -- 小菜一碟(so easy)。

          關(guān)于線程的在此就不再過多闡述,對(duì)于前面文章中設(shè)計(jì)的線程池,按照面向?qū)ο蟮乃枷脒M(jìn)行拆分可以分為兩部分(純屬個(gè)人見解,有不同的想法也正常):任務(wù)隊(duì)列類 和線程池類。

          1. 任務(wù)隊(duì)列

          1.1 類聲明

          // 定義任務(wù)結(jié)構(gòu)體
          using callback = void(*)(void*);
          struct Task
          {
              Task()
              {
                  function = nullptr;
                  arg = nullptr;
              }
              Task(callback f, void* arg)
              {
                  function = f;
                  this->arg = arg;
              }
              callback function;
              void* arg;
          };

          // 任務(wù)隊(duì)列
          class TaskQueue
          {
          public:
              TaskQueue();
              ~TaskQueue();

              // 添加任務(wù)
              void addTask(Task& task);
              void addTask(callback func, void* arg);

              // 取出一個(gè)任務(wù)
              Task takeTask();

              // 獲取當(dāng)前隊(duì)列中任務(wù)個(gè)數(shù)
              inline int taskNumber()
              {
                  return m_queue.size();
              }

          private:
              pthread_mutex_t m_mutex;    // 互斥鎖
              std::queue<Task> m_queue;   // 任務(wù)隊(duì)列
          };

          其中 Task 是任務(wù)類,里邊有兩個(gè)成員,分別是兩個(gè)指針 void(*)(void*)void*

          另外一個(gè)類 TaskQueue 是任務(wù)隊(duì)列,提供了添加任務(wù)、取出任務(wù)、存儲(chǔ)任務(wù)、獲取任務(wù)個(gè)數(shù)、線程同步的功能。

          1.2 類定義

          TaskQueue::TaskQueue()
          {
              pthread_mutex_init(&m_mutex, NULL);
          }

          TaskQueue::~TaskQueue()
          {
              pthread_mutex_destroy(&m_mutex);
          }

          void TaskQueue::addTask(Task& task)
          {
              pthread_mutex_lock(&m_mutex);
              m_queue.push(task);
              pthread_mutex_unlock(&m_mutex);
          }

          void TaskQueue::addTask(callback func, void* arg)
          {
              pthread_mutex_lock(&m_mutex);
              Task task;
              task.function = func;
              task.arg = arg;
              m_queue.push(task);
              pthread_mutex_unlock(&m_mutex);
          }

          Task TaskQueue::takeTask()
          {
              Task t;
              pthread_mutex_lock(&m_mutex);
              if (m_queue.size() > 0)
              {
                  t = m_queue.front();
                  m_queue.pop();
              }
              pthread_mutex_unlock(&m_mutex);
              return t;
          }

          2. 線程池

          2.1 類聲明

          class ThreadPool
          {
          public:
              ThreadPool(int min, int max);
              ~ThreadPool();

              // 添加任務(wù)
              void addTask(Task task);
              // 獲取忙線程的個(gè)數(shù)
              int getBusyNumber();
              // 獲取活著的線程個(gè)數(shù)
              int getAliveNumber();

          private:
              // 工作的線程的任務(wù)函數(shù)
              static void* worker(void* arg);
              // 管理者線程的任務(wù)函數(shù)
              static void* manager(void* arg);
              void threadExit();

          private:
              pthread_mutex_t m_lock;
              pthread_cond_t m_notEmpty;
              pthread_t* m_threadIDs;
              pthread_t m_managerID;
              TaskQueue* m_taskQ;
              int m_minNum;
              int m_maxNum;
              int m_busyNum;
              int m_aliveNum;
              int m_exitNum;
              bool m_shutdown = false;
          };

          2.2 類定義

          ThreadPool::ThreadPool(int minNum, int maxNum)
          {
              // 實(shí)例化任務(wù)隊(duì)列
              m_taskQ = new TaskQueue;
              do {
                  // 初始化線程池
                  m_minNum = minNum;
                  m_maxNum = maxNum;
                  m_busyNum = 0;
                  m_aliveNum = minNum;

                  // 根據(jù)線程的最大上限給線程數(shù)組分配內(nèi)存
                  m_threadIDs = new pthread_t[maxNum];
                  if (m_threadIDs == nullptr)
                  {
                      cout << "malloc thread_t[] 失敗...." << endl;;
                      break;
                  }
                  // 初始化
                  memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);
                  // 初始化互斥鎖,條件變量
                  if (pthread_mutex_init(&m_lock, NULL) != 0 ||
                      pthread_cond_init(&m_notEmpty, NULL) != 0)
                  {
                      cout << "init mutex or condition fail..." << endl;
                      break;
                  }

                  /////////////////// 創(chuàng)建線程 //////////////////
                  // 根據(jù)最小線程個(gè)數(shù), 創(chuàng)建線程
                  for (int i = 0; i < minNum; ++i)
                  {
                      pthread_create(&m_threadIDs[i], NULL, worker, this);
                      cout << "創(chuàng)建子線程, ID: " << to_string(m_threadIDs[i]) << endl;
                  }
                  // 創(chuàng)建管理者線程, 1個(gè)
                  pthread_create(&m_managerID, NULL, manager, this);
              } while (0);
          }

          ThreadPool::~ThreadPool()
          {
              m_shutdown = 1;
              // 銷毀管理者線程
              pthread_join(m_managerID, NULL);
              // 喚醒所有消費(fèi)者線程
              for (int i = 0; i < m_aliveNum; ++i)
              {
                  pthread_cond_signal(&m_notEmpty);
              }

              if (m_taskQ) delete m_taskQ;
              if (m_threadIDs) delete[]m_threadIDs;
              pthread_mutex_destroy(&m_lock);
              pthread_cond_destroy(&m_notEmpty);
          }

          void ThreadPool::addTask(Task task)
          {
              if (m_shutdown)
              {
                  return;
              }
              // 添加任務(wù),不需要加鎖,任務(wù)隊(duì)列中有鎖
              m_taskQ->addTask(task);
              // 喚醒工作的線程
              pthread_cond_signal(&m_notEmpty);
          }

          int ThreadPool::getAliveNumber()
          {
              int threadNum = 0;
              pthread_mutex_lock(&m_lock);
              threadNum = m_aliveNum;
              pthread_mutex_unlock(&m_lock);
              return threadNum;
          }

          int ThreadPool::getBusyNumber()
          {
              int busyNum = 0;
              pthread_mutex_lock(&m_lock);
              busyNum = m_busyNum;
              pthread_mutex_unlock(&m_lock);
              return busyNum;
          }


          // 工作線程任務(wù)函數(shù)
          void* ThreadPool::worker(void* arg)
          {
              ThreadPool* pool = static_cast<ThreadPool*>(arg);
              // 一直不停的工作
              while (true)
              {
                  // 訪問任務(wù)隊(duì)列(共享資源)加鎖
                  pthread_mutex_lock(&pool->m_lock);
                  // 判斷任務(wù)隊(duì)列是否為空, 如果為空工作線程阻塞
                  while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown)
                  {
                      cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;
                      // 阻塞線程
                      pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);

                      // 解除阻塞之后, 判斷是否要銷毀線程
                      if (pool->m_exitNum > 0)
                      {
                          pool->m_exitNum--;
                          if (pool->m_aliveNum > pool->m_minNum)
                          {
                              pool->m_aliveNum--;
                              pthread_mutex_unlock(&pool->m_lock);
                              pool->threadExit();
                          }
                      }
                  }
                  // 判斷線程池是否被關(guān)閉了
                  if (pool->m_shutdown)
                  {
                      pthread_mutex_unlock(&pool->m_lock);
                      pool->threadExit();
                  }

                  // 從任務(wù)隊(duì)列中取出一個(gè)任務(wù)
                  Task task = pool->m_taskQ->takeTask();
                  // 工作的線程+1
                  pool->m_busyNum++;
                  // 線程池解鎖
                  pthread_mutex_unlock(&pool->m_lock);
                  // 執(zhí)行任務(wù)
                  cout << "thread " << to_string(pthread_self()) << " start working..." << endl;
                  task.function(task.arg);
                  delete task.arg;
                  task.arg = nullptr;

                  // 任務(wù)處理結(jié)束
                  cout << "thread " << to_string(pthread_self()) << " end working...";
                  pthread_mutex_lock(&pool->m_lock);
                  pool->m_busyNum--;
                  pthread_mutex_unlock(&pool->m_lock);
              }

              return nullptr;
          }


          // 管理者線程任務(wù)函數(shù)
          void* ThreadPool::manager(void* arg)
          {
              ThreadPool* pool = static_cast<ThreadPool*>(arg);
              // 如果線程池沒有關(guān)閉, 就一直檢測
              while (!pool->m_shutdown)
              {
                  // 每隔5s檢測一次
                  sleep(5);
                  // 取出線程池中的任務(wù)數(shù)和線程數(shù)量
                  //  取出工作的線程池?cái)?shù)量
                  pthread_mutex_lock(&pool->m_lock);
                  int queueSize = pool->m_taskQ->taskNumber();
                  int liveNum = pool->m_aliveNum;
                  int busyNum = pool->m_busyNum;
                  pthread_mutex_unlock(&pool->m_lock);

                  // 創(chuàng)建線程
                  const int NUMBER = 2;
                  // 當(dāng)前任務(wù)個(gè)數(shù)>存活的線程數(shù) && 存活的線程數(shù)<最大線程個(gè)數(shù)
                  if (queueSize > liveNum && liveNum < pool->m_maxNum)
                  {
                      // 線程池加鎖
                      pthread_mutex_lock(&pool->m_lock);
                      int num = 0;
                      for (int i = 0; i < pool->m_maxNum && num < NUMBER
                          && pool->m_aliveNum < pool->m_maxNum; ++i)
                      {
                          if (pool->m_threadIDs[i] == 0)
                          {
                              pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
                              num++;
                              pool->m_aliveNum++;
                          }
                      }
                      pthread_mutex_unlock(&pool->m_lock);
                  }

                  // 銷毀多余的線程
                  // 忙線程*2 < 存活的線程數(shù)目 && 存活的線程數(shù) > 最小線程數(shù)量
                  if (busyNum * 2 < liveNum && liveNum > pool->m_minNum)
                  {
                      pthread_mutex_lock(&pool->m_lock);
                      pool->m_exitNum = NUMBER;
                      pthread_mutex_unlock(&pool->m_lock);
                      for (int i = 0; i < NUMBER; ++i)
                      {
                          pthread_cond_signal(&pool->m_notEmpty);
                      }
                  }
              }
              return nullptr;
          }

          // 線程退出
          void ThreadPool::threadExit()
          {
              pthread_t tid = pthread_self();
              for (int i = 0; i < m_maxNum; ++i)
              {
                  if (m_threadIDs[i] == tid)
                  {
                      cout << "threadExit() function: thread " 
                          << to_string(pthread_self()) << " exiting..." << endl;
                      m_threadIDs[i] = 0;
                      break;
                  }
              }
              pthread_exit(NULL);
          }

          文章鏈接:https://subingwen.com/linux/threadpool-cpp/

          瀏覽 36
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  色色色色网 | 翔田千里av | 欧美性爱69 | 99视频免费在线观看 | 欧美射精视频 |