線程池的使用場景和代碼實現(xiàn)!
前言:
大家周末好,今天給大家?guī)硪黄夹g文章,是關于線程池的實現(xiàn)和使用場景;我相信大家在公司里面的代碼里面經常看到這個線程池的用法,或者甚至大家可能會聽到內存池、對象池、連接池等這些專業(yè)術語,反正就很多帶池的專業(yè)術語,不過你會發(fā)現(xiàn)他們都有一個共同的特點就是“屁股”末尾都帶一個“池”字,池字,簡單理解就是用來存東西的,舉個簡單例子來說,你比如游泳池里面可以用來存儲水!
好了簡單說了一下,后面的哪些什么內存池、連接池,后期復習都再給大家分享吧,今天我們的主題是線程池。
一、線程池的實現(xiàn):
1、為啥要用到線程池?
多線程編程,大家這個應該很熟悉了,上次有一位朋友問了一個問題,一個線程大概占用多大內存大小,一般按照POSIX標準來算的話,一個線程大概在8M左右,但是我們一般內存資源有限,在進行高并發(fā)的時候,比如說,多個客戶端同時向服務器端發(fā)送請求:

這個時候,你想一下給這么多客戶端都分配開一個大概8M的內存大小,這現(xiàn)實嘛,顯然行不通的嘛,我們來計算一下:
一個線程:8M
1024M可以開128個線程
16G內存大小可以開16x128,計算下來大概在2048個線程
所以百萬級個客戶端都分配開一個線程的話,那內存資源肯定是不夠的,所以這涉及到我們的線程池了,這也是為什么在這種場景下要使用線程池了!
為了幫助大家更好的理解線程池這個概念,我們還是舉一個生活當中的實際場景吧;去銀行存錢或者辦理相關業(yè)務,這個大家都不陌生吧,你到了銀行里面,一般來說的話,都要排隊在窗口等待前面的人把業(yè)務辦理完,才能夠輪到你來辦理你想要辦理的業(yè)務,而窗口里面就是幫你辦理各種業(yè)務的銀行工作人員,同時一般窗口辦理業(yè)務上面有一個提示電子信息,如果輪到了你,就會通知你,你就知道了輪到自己辦理業(yè)務了。
這里換個專業(yè)的角度來說(也不專業(yè)哈,只是一個打比方),你來辦理的這個業(yè)務就是一個任務(也就是一個線程,可以說成任務隊列,因為要排隊嘛,不可能一下子執(zhí)行那么多任務,任務隊列里面的任務必須一個一個執(zhí)行),而銀行工作人員相當于從任務隊列里面拿一個任務來執(zhí)行,你可以把銀行工作人員看成是執(zhí)行任務隊列;而電子顯示通知信息,你可以把它看成防止多個業(yè)務同時在一個窗口讓一個銀行工作人員來辦理,兩個窗口也就是兩個銀行工作人員同時辦理一個業(yè)務,也就是說這個電子顯示信息是一個管理組件,管理任務是否可以去辦理,管理著銀行工作人員是否開始辦理業(yè)務任務,不讓他們亂套了,合理有效的執(zhí)行任務。
那么你從上面可以看到,使用線程池的優(yōu)點了:
避免線程太多,使得內存耗盡
開始的時候,你可以把創(chuàng)建好的線程放入到線程池當中去,當我們要用的時候,就可以從線程池里面拿一個線程來用,用完這個線程的時候,再把這個線程放回到線程池里面;避免創(chuàng)建線程與銷毀的代價
2、線程池實現(xiàn)模板步驟:
其實這個線程池的實現(xiàn)大概流程步驟都差不多,如果大家平時仔細看公司代碼或者說自己去實現(xiàn)一個線程池的話,大概實現(xiàn)模板如下:
任務隊列(前來辦理業(yè)務的人)
執(zhí)行隊列(就是銀行工作人員執(zhí)行任務隊列里面的任務)
管理組件(管理任務有序的執(zhí)行)
3、線程池實現(xiàn)結構體定義:
任務隊列:
struct nTask
{
//用函數(shù)指針來存放不同的任務
void (*task_func)(struct nTask *task);
//這個參數(shù)用來做任務執(zhí)行的參數(shù)
void *user_data;
//鏈表節(jié)點的定義,這里采用鏈表的方式實現(xiàn)
struct nTask *prev;
struct nTask *next;
};
執(zhí)行隊列:
struct nWorker
{
pthread_t threadid;//線程id
int terminate;//表示是否終止任務
//表示銀行工作人員要執(zhí)行任務還要向執(zhí)行組件通告一下
struct nManager *manager;
//還是通過鏈表的方式來實現(xiàn)執(zhí)行隊列
struct nWorker *prev;
struct nWorker *next;
};
注意:這里如果沒有辦理業(yè)務的人來,銀行工作人員只能在哪里等待任務的到來,然后再執(zhí)行任務。
管理組件:
typedef struct nManager
{
struct nTask *task;
struct nWorker *workers;
pthread_mutex_t mutex;//互斥鎖
pthread_cond_t cond;//條件變量
}ThreadPool;
鏈表的插入和刪除模板:
//插入
#define LIST_INSERT(item,list) do{\
item->prev=NULL; \
item->next=list; \
if((list)!=NULL) list->prev=item;\
list=item;
}while(0)
//刪除
#define LIST_REMOVE(item,list) do{ \
if(item->prev != NULL) item->prev->next = item->next; \
if(item->next !=NULL) item->next->prev=item->prev; \
if(list == item)list = item->netx; \
item->prev=item->next=NULL;\
}while(0)
}
4、線程池接口定義如下:
1、線程池初始化接口:
int nThreadPoolCreate(ThreadPool *pool,int numWorkers)
{
//參數(shù)pool表示線程池,numWorkers表示線程池里面有多少個任務
}
2、線程池銷毀接口:
int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{
}
3、往線程池里面添加任務接口:
int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{
}
4、線程回調函數(shù):
void *nThreadPoolCallback(void *arg)
{
}
二、線程池工程代碼:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>
//鏈表插入
#define LIST_INSTER(item,list)do{ \
item->prev=NULL; \
item->next=next; \
if(list!=NULL) list->prev=item; \
list=item;
}while(0)
//刪除
#define LIST_REMOVE(item,list)do { \
if(item->prev!=NULL)item->prev->next=item->next; \
if(item->next!=NULL)itme->next->prev=item->prev;\
if(list==item)list=item->next;
item->prev=item->next=NULL;
}while(0)
//任務隊列
struct nTask
{
void(*task_funt)(struct nTask *task);
void *uset_data;
struct nTask *prev;
struct nTask *next;
};
//執(zhí)行隊列
struct nWorker
{
pthread_t threadid;
int terminate;
struct nManager *manager;
struct nWorker *prev;
struct nWorker *next;
};
//管理組件
typedef struct nManager
{
struct nTask *tasks;
struct nWoker *workers;
pthread_mutex_t mutex;
pthread_cond_t cond;
}ThreadPool;
//線程回調函數(shù)
void *nThreadPoolCallback(void *arg)
{
struct nWorker *worker=(struct nWorker*)arg;
while(1)
{
//判斷是否有任務
pthread_mutex_lock(&worker->manager-mutex);
while(worker->manager->tasks==NULL)
{
if(worker-terminate)
break;
pthread_cond_wait(&worker->manager->cond,&worker->manager->mutex);//如果沒有任務,一直等待任務的到來
}
if(worker->terminate)
{
pthread_mutex_unlock(&worker->manager->mutex);
break;
}
struct nTask *task = worker->manager->tasks;
LIST_REMOVE(task,worker->manager->tasks);
pthread_mutex_unlock(&worker->manager->mutex);
task->task_func(task);
}
free(worker);
}
//創(chuàng)建線程池
int nThreadPoolCreate(ThreadPool *pool, int numWorkers)
{
if(pool == NULL) return -1;
if(numWorkers < 1)numWorkers =1;
memset(&pool,0,sizeof(ThreadPool));
//開始初始化
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
memcpy(&pool->cond,&blank_cond,sizeof(pthread_cond_t));
pthread_mutex_t blank_mutex =PTHREAD_MUTEX_INITIALIZER;
memcpy(&pool->mutex,&blank_mutex,sizeof(pthread_mutex_t));
int i =0;//開線程的個數(shù),也就是執(zhí)行任務的個數(shù)
for(i=0;i < numWorkers;i++)
{
struct nWorker *worker =(struct nWorker*)malloc(sizeof(struct nWorker));
if(worker == NUll)
{
perror("malloc");
return -2;
}
memset(worker,0,sizeof(struct nWorker));
worker->manager=pool;
//創(chuàng)建線程
int ret=pthread_create(&worker->pthreadid,NULL,nThreadPoolCallback,worker);
if(ret)
{
perror("pthread_create");
free(worker);
return -3;
}
LIST_INSERT(worker,pool->workers);
}
}
//線程池銷毀
int nThreadPoolDestory(ThreadPool *pool,int nWorker)
{
struct nWorker *worker = NULL;
for(worker=pool->workers;worker!=NULL;worker=worker->next)
{
worker->terminate;
}
pthread_mutex_lock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);//做一個廣播通知
pthread_mutex_unlock(&pool->mutex);
pool->workers = NULL;
pool->tasks = NULL;
}
//往線程池里面添加任務
int nThreadPoolPushTask(ThreadPool *pool,struct nTask *task)
{
pthread_mutex_lock(&pool->mutex);
LIST_INSERTER(task,pool->tasks);
pthread_cond_sigal(&pool->cond);// 發(fā)送一個信號,有人來辦理業(yè)務了
pthread_mutex_unlock(&pool-mutex);
}
#if 1
#define THREADPOOL_INIT_COUNT 20
#define TASK_INIT_SIZE 1000
void task_entry(struct nTask *task) { //type
//struct nTask *task = (struct nTask*)task;
int idx = *(int *)task->user_data;
printf("idx: %d\n", idx);
free(task->user_data);
free(task);
}
int main(void) {
ThreadPool pool = {0};
nThreadPoolCreate(&pool, THREADPOOL_INIT_COUNT);
// pool --> memset();
int i = 0;
for (i = 0;i < TASK_INIT_SIZE;i ++) {
struct nTask *task = (struct nTask *)malloc(sizeof(struct nTask));
if (task == NULL) {
perror("malloc");
exit(1);
}
memset(task, 0, sizeof(struct nTask));
task->task_func = task_entry;
task->user_data = malloc(sizeof(int));
*(int*)task->user_data = i;
nThreadPoolPushTask(&pool, task);
}
getchar();
}
代碼量稍微有點多,大家可以多多看看幾遍!
三、總結:
今天的分享暫時到這里了,今天線程池的分享花的時間比較多,才整理出來,下個禮拜開始音視頻的分享,后面的其他復習知識,會慢慢整理分享出來。
如果大家對音視頻方向感興趣,可以加我微信,同時我的朋友圈,也會經常分享一些感悟和干貨,以及和一些大佬的交流經驗分享,記得,加我微信,可以來圍觀朋友圈:tu18879499804
