<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          手寫線程池 - C語言版

          共 22250字,需瀏覽 45分鐘

           ·

          2021-09-28 07:03

          1. 線程池原理

          我們使用線程的時候就去創(chuàng)建一個線程,這樣實現(xiàn)起來非常簡便,但是就會有一個問題:如果并發(fā)的線程數(shù)量很多,并且每個線程都是執(zhí)行一個時間很短的任務就結束了,這樣頻繁創(chuàng)建線程就會大大降低系統(tǒng)的效率,因為頻繁創(chuàng)建線程和銷毀線程需要時間。

          那么有沒有一種辦法使得線程可以復用,就是執(zhí)行完一個任務,并不被銷毀,而是可以繼續(xù)執(zhí)行其他的任務呢?

          線程池是一種多線程處理形式,處理過程中將任務添加到隊列,然后在創(chuàng)建線程后自動啟動這些任務。線程池線程都是后臺線程。每個線程都使用默認的堆棧大小,以默認的優(yōu)先級運行,并處于多線程單元中。如果某個線程在托管代碼中空閑(如正在等待某個事件), 則線程池將插入另一個輔助線程來使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊列中包含掛起的工作,則線程池將在一段時間后創(chuàng)建另一個輔助線程但線程的數(shù)目永遠不會超過最大值。超過最大值的線程可以排隊,但他們要等到其他線程完成后才啟動。

          在各個編程語言的語種中都有線程池的概念,并且很多語言中直接提供了線程池,作為程序猿直接使用就可以了,下面給大家介紹一下線程池的實現(xiàn)原理:

          線程池的組成主要分為 3 個部分,這三部分配合工作就可以得到一個完整的線程池:

          1. 任務隊列,存儲需要處理的任務,由工作的線程來處理這些任務
          • 通過線程池提供的 API 函數(shù),將一個待處理的任務添加到任務隊列,或者從任務隊列中刪除
          • 已處理的任務會被從任務隊列中刪除
          • 線程池的使用者,也就是調(diào)用線程池函數(shù)往任務隊列中添加任務的線程就是生產(chǎn)者線程
          1. 工作的線程(任務隊列任務的消費者) ,N個
          • 線程池中維護了一定數(shù)量的工作線程,他們的作用是是不停的讀任務隊列,從里邊取出任務并處理
          • 工作的線程相當于是任務隊列的消費者角色,
          • 如果任務隊列為空,工作的線程將會被阻塞 (使用條件變量 / 信號量阻塞)
          • 如果阻塞之后有了新的任務,由生產(chǎn)者將阻塞解除,工作線程開始工作
          1. 管理者線程(不處理任務隊列中的任務),1個
          • 它的任務是周期性的對任務隊列中的任務數(shù)量以及處于忙狀態(tài)的工作線程個數(shù)進行檢測
          • 當任務過多的時候,可以適當?shù)膭?chuàng)建一些新的工作線程
          • 當任務過少的時候,可以適當?shù)匿N毀一些工作的線程

          2. 任務隊列

          // 任務結構體
          typedef struct Task
          {
              void (*function)(void* arg);
              void* arg;
          }Task;

          3. 線程池定義

          // 線程池結構體
          struct ThreadPool
          {
              // 任務隊列
              Task* taskQ;
              int queueCapacity;  // 容量
              int queueSize;      // 當前任務個數(shù)
              int queueFront;     // 隊頭 -> 取數(shù)據(jù)
              int queueRear;      // 隊尾 -> 放數(shù)據(jù)

              pthread_t managerID;    // 管理者線程ID
              pthread_t *threadIDs;   // 工作的線程ID
              int minNum;             // 最小線程數(shù)量
              int maxNum;             // 最大線程數(shù)量
              int busyNum;            // 忙的線程的個數(shù)
              int liveNum;            // 存活的線程的個數(shù)
              int exitNum;            // 要銷毀的線程個數(shù)
              pthread_mutex_t mutexPool;  // 鎖整個的線程池
              pthread_mutex_t mutexBusy;  // 鎖busyNum變量
              pthread_cond_t notFull;     // 任務隊列是不是滿了
              pthread_cond_t notEmpty;    // 任務隊列是不是空了

              int shutdown;           // 是不是要銷毀線程池, 銷毀為1, 不銷毀為0
          };

          4. 頭文件聲明

          #ifndef _THREADPOOL_H
          #define _THREADPOOL_H

          typedef struct ThreadPool ThreadPool;
          // 創(chuàng)建線程池并初始化
          ThreadPool *threadPoolCreate(int min, int max, int queueSize);

          // 銷毀線程池
          int threadPoolDestroy(ThreadPool* pool);

          // 給線程池添加任務
          void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

          // 獲取線程池中工作的線程的個數(shù)
          int threadPoolBusyNum(ThreadPool* pool);

          // 獲取線程池中活著的線程的個數(shù)
          int threadPoolAliveNum(ThreadPool* pool);

          //////////////////////
          // 工作的線程(消費者線程)任務函數(shù)
          void* worker(void* arg);
          // 管理者線程任務函數(shù)
          void* manager(void* arg);
          // 單個線程退出
          void threadExit(ThreadPool* pool);
          #endif  // _THREADPOOL_H

          5. 源文件定義

          ThreadPool* threadPoolCreate(int min, int max, int queueSize)
          {
              ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
              do 
              {
                  if (pool == NULL)
                  {
                      printf("malloc threadpool fail...\n");
                      break;
                  }

                  pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
                  if (pool->threadIDs == NULL)
                  {
                      printf("malloc threadIDs fail...\n");
                      break;
                  }
                  memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
                  pool->minNum = min;
                  pool->maxNum = max;
                  pool->busyNum = 0;
                  pool->liveNum = min;    // 和最小個數(shù)相等
                  pool->exitNum = 0;

                  if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
                      pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
                      pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
                      pthread_cond_init(&pool->notFull, NULL) != 0)
                  {
                      printf("mutex or condition init fail...\n");
                      break;
                  }

                  // 任務隊列
                  pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
                  pool->queueCapacity = queueSize;
                  pool->queueSize = 0;
                  pool->queueFront = 0;
                  pool->queueRear = 0;

                  pool->shutdown = 0;

                  // 創(chuàng)建線程
                  pthread_create(&pool->managerID, NULL, manager, pool);
                  for (int i = 0; i < min; ++i)
                  {
                      pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                  }
                  return pool;
              } while (0);

              // 釋放資源
              if (pool && pool->threadIDs) free(pool->threadIDs);
              if (pool && pool->taskQ) free(pool->taskQ);
              if (pool) free(pool);

              return NULL;
          }

          int threadPoolDestroy(ThreadPool* pool)
          {
              if (pool == NULL)
              {
                  return -1;
              }

              // 關閉線程池
              pool->shutdown = 1;
              // 阻塞回收管理者線程
              pthread_join(pool->managerID, NULL);
              // 喚醒阻塞的消費者線程
              for (int i = 0; i < pool->liveNum; ++i)
              {
                  pthread_cond_signal(&pool->notEmpty);
              }
              // 釋放堆內(nèi)存
              if (pool->taskQ)
              {
                  free(pool->taskQ);
              }
              if (pool->threadIDs)
              {
                  free(pool->threadIDs);
              }

              pthread_mutex_destroy(&pool->mutexPool);
              pthread_mutex_destroy(&pool->mutexBusy);
              pthread_cond_destroy(&pool->notEmpty);
              pthread_cond_destroy(&pool->notFull);

              free(pool);
              pool = NULL;

              return 0;
          }


          void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
          {
              pthread_mutex_lock(&pool->mutexPool);
              while (pool->queueSize == pool->queueCapacity && !pool->shutdown)
              {
                  // 阻塞生產(chǎn)者線程
                  pthread_cond_wait(&pool->notFull, &pool->mutexPool);
              }
              if (pool->shutdown)
              {
                  pthread_mutex_unlock(&pool->mutexPool);
                  return;
              }
              // 添加任務
              pool->taskQ[pool->queueRear].function = func;
              pool->taskQ[pool->queueRear].arg = arg;
              pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
              pool->queueSize++;

              pthread_cond_signal(&pool->notEmpty);
              pthread_mutex_unlock(&pool->mutexPool);
          }

          int threadPoolBusyNum(ThreadPool* pool)
          {
              pthread_mutex_lock(&pool->mutexBusy);
              int busyNum = pool->busyNum;
              pthread_mutex_unlock(&pool->mutexBusy);
              return busyNum;
          }

          int threadPoolAliveNum(ThreadPool* pool)
          {
              pthread_mutex_lock(&pool->mutexPool);
              int aliveNum = pool->liveNum;
              pthread_mutex_unlock(&pool->mutexPool);
              return aliveNum;
          }

          void* worker(void* arg)
          {
              ThreadPool* pool = (ThreadPool*)arg;

              while (1)
              {
                  pthread_mutex_lock(&pool->mutexPool);
                  // 當前任務隊列是否為空
                  while (pool->queueSize == 0 && !pool->shutdown)
                  {
                      // 阻塞工作線程
                      pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

                      // 判斷是不是要銷毀線程
                      if (pool->exitNum > 0)
                      {
                          pool->exitNum--;
                          if (pool->liveNum > pool->minNum)
                          {
                              pool->liveNum--;
                              pthread_mutex_unlock(&pool->mutexPool);
                              threadExit(pool);
                          }
                      }
                  }

                  // 判斷線程池是否被關閉了
                  if (pool->shutdown)
                  {
                      pthread_mutex_unlock(&pool->mutexPool);
                      threadExit(pool);
                  }

                  // 從任務隊列中取出一個任務
                  Task task;
                  task.function = pool->taskQ[pool->queueFront].function;
                  task.arg = pool->taskQ[pool->queueFront].arg;
                  // 移動頭結點
                  pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
                  pool->queueSize--;
                  // 解鎖
                  pthread_cond_signal(&pool->notFull);
                  pthread_mutex_unlock(&pool->mutexPool);

                  printf("thread %ld start working...\n", pthread_self());
                  pthread_mutex_lock(&pool->mutexBusy);
                  pool->busyNum++;
                  pthread_mutex_unlock(&pool->mutexBusy);
                  task.function(task.arg);
                  free(task.arg);
                  task.arg = NULL;

                  printf("thread %ld end working...\n", pthread_self());
                  pthread_mutex_lock(&pool->mutexBusy);
                  pool->busyNum--;
                  pthread_mutex_unlock(&pool->mutexBusy);
              }
              return NULL;
          }

          void* manager(void* arg)
          {
              ThreadPool* pool = (ThreadPool*)arg;
              while (!pool->shutdown)
              {
                  // 每隔3s檢測一次
                  sleep(3);

                  // 取出線程池中任務的數(shù)量和當前線程的數(shù)量
                  pthread_mutex_lock(&pool->mutexPool);
                  int queueSize = pool->queueSize;
                  int liveNum = pool->liveNum;
                  pthread_mutex_unlock(&pool->mutexPool);

                  // 取出忙的線程的數(shù)量
                  pthread_mutex_lock(&pool->mutexBusy);
                  int busyNum = pool->busyNum;
                  pthread_mutex_unlock(&pool->mutexBusy);

                  // 添加線程
                  // 任務的個數(shù)>存活的線程個數(shù) && 存活的線程數(shù)<最大線程數(shù)
                  if (queueSize > liveNum && liveNum < pool->maxNum)
                  {
                      pthread_mutex_lock(&pool->mutexPool);
                      int counter = 0;
                      for (int i = 0; i < pool->maxNum && counter < NUMBER
                          && pool->liveNum < pool->maxNum; ++i)
                      {
                          if (pool->threadIDs[i] == 0)
                          {
                              pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                              counter++;
                              pool->liveNum++;
                          }
                      }
                      pthread_mutex_unlock(&pool->mutexPool);
                  }
                  // 銷毀線程
                  // 忙的線程*2 < 存活的線程數(shù) && 存活的線程>最小線程數(shù)
                  if (busyNum * 2 < liveNum && liveNum > pool->minNum)
                  {
                      pthread_mutex_lock(&pool->mutexPool);
                      pool->exitNum = NUMBER;
                      pthread_mutex_unlock(&pool->mutexPool);
                      // 讓工作的線程自殺
                      for (int i = 0; i < NUMBER; ++i)
                      {
                          pthread_cond_signal(&pool->notEmpty);
                      }
                  }
              }
              return NULL;
          }

          void threadExit(ThreadPool* pool)
          {
              pthread_t tid = pthread_self();
              for (int i = 0; i < pool->maxNum; ++i)
              {
                  if (pool->threadIDs[i] == tid)
                  {
                      pool->threadIDs[i] = 0;
                      printf("threadExit() called, %ld exiting...\n", tid);
                      break;
                  }
              }
              pthread_exit(NULL);
          }

          6. 測試代碼

          void taskFunc(void* arg)
          {
              int num = *(int*)arg;
              printf("thread %ld is working, number = %d\n",
                  pthread_self(), num);
              sleep(1);
          }

          int main()
          {
              // 創(chuàng)建線程池
              ThreadPool* pool = threadPoolCreate(3, 10, 100);
              for (int i = 0; i < 100; ++i)
              {
                  int* num = (int*)malloc(sizeof(int));
                  *num = i + 100;
                  threadPoolAdd(pool, taskFunc, num);
              }

              sleep(30);

              threadPoolDestroy(pool);
              return 0;
          }

          文章鏈接: https://subingwen.cn/linux/threadpool/#1-%E7%BA%BF%E7%A8%8B%E6%B1%A0%E5%8E%9F%E7%90%86

          瀏覽 35
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          評論
          圖片
          表情
          推薦
          點贊
          評論
          收藏
          分享

          手機掃一掃分享

          分享
          舉報
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  黄色三级网站 | 日日骚影院 | 国产精品婷婷午夜在线观看 | 在线伊人91 | 69精品无码一区二区三区 |