手寫線程池 - C++版
在《手寫線程池 - C語言版》中,已經(jīng)實(shí)現(xiàn)了 C 語言版的線程池,如果我們也學(xué)過 C++ 的話,可以將其改為 C++ 版本,這樣代碼不管是從使用還是從感觀上都會(huì)更簡潔一些。
對(duì)這些代碼做從 C 到 C++ 的遷移主要用到了 C++ 三大特性中的封裝,因此難度不大,對(duì)應(yīng) C++ 初學(xué)者來說有助于提高編碼水平和對(duì)面向?qū)ο蟮睦斫?,?duì)于熟練掌握了 C++ 的人來說就是張飛吃豆芽 -- 小菜一碟(so easy)。
關(guān)于線程的在此就不再過多闡述,對(duì)于前面文章中設(shè)計(jì)的線程池,按照面向?qū)ο蟮乃枷脒M(jìn)行拆分可以分為兩部分(純屬個(gè)人見解,有不同的想法也正常):任務(wù)隊(duì)列類 和線程池類。
1. 任務(wù)隊(duì)列
1.1 類聲明
// 定義任務(wù)結(jié)構(gòu)體
using callback = void(*)(void*);
struct Task
{
Task()
{
function = nullptr;
arg = nullptr;
}
Task(callback f, void* arg)
{
function = f;
this->arg = arg;
}
callback function;
void* arg;
};
// 任務(wù)隊(duì)列
class TaskQueue
{
public:
TaskQueue();
~TaskQueue();
// 添加任務(wù)
void addTask(Task& task);
void addTask(callback func, void* arg);
// 取出一個(gè)任務(wù)
Task takeTask();
// 獲取當(dāng)前隊(duì)列中任務(wù)個(gè)數(shù)
inline int taskNumber()
{
return m_queue.size();
}
private:
pthread_mutex_t m_mutex; // 互斥鎖
std::queue<Task> m_queue; // 任務(wù)隊(duì)列
};
其中 Task 是任務(wù)類,里邊有兩個(gè)成員,分別是兩個(gè)指針 void(*)(void*) 和 void*
另外一個(gè)類 TaskQueue 是任務(wù)隊(duì)列,提供了添加任務(wù)、取出任務(wù)、存儲(chǔ)任務(wù)、獲取任務(wù)個(gè)數(shù)、線程同步的功能。
1.2 類定義
TaskQueue::TaskQueue()
{
pthread_mutex_init(&m_mutex, NULL);
}
TaskQueue::~TaskQueue()
{
pthread_mutex_destroy(&m_mutex);
}
void TaskQueue::addTask(Task& task)
{
pthread_mutex_lock(&m_mutex);
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}
void TaskQueue::addTask(callback func, void* arg)
{
pthread_mutex_lock(&m_mutex);
Task task;
task.function = func;
task.arg = arg;
m_queue.push(task);
pthread_mutex_unlock(&m_mutex);
}
Task TaskQueue::takeTask()
{
Task t;
pthread_mutex_lock(&m_mutex);
if (m_queue.size() > 0)
{
t = m_queue.front();
m_queue.pop();
}
pthread_mutex_unlock(&m_mutex);
return t;
}
2. 線程池
2.1 類聲明
class ThreadPool
{
public:
ThreadPool(int min, int max);
~ThreadPool();
// 添加任務(wù)
void addTask(Task task);
// 獲取忙線程的個(gè)數(shù)
int getBusyNumber();
// 獲取活著的線程個(gè)數(shù)
int getAliveNumber();
private:
// 工作的線程的任務(wù)函數(shù)
static void* worker(void* arg);
// 管理者線程的任務(wù)函數(shù)
static void* manager(void* arg);
void threadExit();
private:
pthread_mutex_t m_lock;
pthread_cond_t m_notEmpty;
pthread_t* m_threadIDs;
pthread_t m_managerID;
TaskQueue* m_taskQ;
int m_minNum;
int m_maxNum;
int m_busyNum;
int m_aliveNum;
int m_exitNum;
bool m_shutdown = false;
};
2.2 類定義
ThreadPool::ThreadPool(int minNum, int maxNum)
{
// 實(shí)例化任務(wù)隊(duì)列
m_taskQ = new TaskQueue;
do {
// 初始化線程池
m_minNum = minNum;
m_maxNum = maxNum;
m_busyNum = 0;
m_aliveNum = minNum;
// 根據(jù)線程的最大上限給線程數(shù)組分配內(nèi)存
m_threadIDs = new pthread_t[maxNum];
if (m_threadIDs == nullptr)
{
cout << "malloc thread_t[] 失敗...." << endl;;
break;
}
// 初始化
memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);
// 初始化互斥鎖,條件變量
if (pthread_mutex_init(&m_lock, NULL) != 0 ||
pthread_cond_init(&m_notEmpty, NULL) != 0)
{
cout << "init mutex or condition fail..." << endl;
break;
}
/////////////////// 創(chuàng)建線程 //////////////////
// 根據(jù)最小線程個(gè)數(shù), 創(chuàng)建線程
for (int i = 0; i < minNum; ++i)
{
pthread_create(&m_threadIDs[i], NULL, worker, this);
cout << "創(chuàng)建子線程, ID: " << to_string(m_threadIDs[i]) << endl;
}
// 創(chuàng)建管理者線程, 1個(gè)
pthread_create(&m_managerID, NULL, manager, this);
} while (0);
}
ThreadPool::~ThreadPool()
{
m_shutdown = 1;
// 銷毀管理者線程
pthread_join(m_managerID, NULL);
// 喚醒所有消費(fèi)者線程
for (int i = 0; i < m_aliveNum; ++i)
{
pthread_cond_signal(&m_notEmpty);
}
if (m_taskQ) delete m_taskQ;
if (m_threadIDs) delete[]m_threadIDs;
pthread_mutex_destroy(&m_lock);
pthread_cond_destroy(&m_notEmpty);
}
void ThreadPool::addTask(Task task)
{
if (m_shutdown)
{
return;
}
// 添加任務(wù),不需要加鎖,任務(wù)隊(duì)列中有鎖
m_taskQ->addTask(task);
// 喚醒工作的線程
pthread_cond_signal(&m_notEmpty);
}
int ThreadPool::getAliveNumber()
{
int threadNum = 0;
pthread_mutex_lock(&m_lock);
threadNum = m_aliveNum;
pthread_mutex_unlock(&m_lock);
return threadNum;
}
int ThreadPool::getBusyNumber()
{
int busyNum = 0;
pthread_mutex_lock(&m_lock);
busyNum = m_busyNum;
pthread_mutex_unlock(&m_lock);
return busyNum;
}
// 工作線程任務(wù)函數(shù)
void* ThreadPool::worker(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 一直不停的工作
while (true)
{
// 訪問任務(wù)隊(duì)列(共享資源)加鎖
pthread_mutex_lock(&pool->m_lock);
// 判斷任務(wù)隊(duì)列是否為空, 如果為空工作線程阻塞
while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown)
{
cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;
// 阻塞線程
pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);
// 解除阻塞之后, 判斷是否要銷毀線程
if (pool->m_exitNum > 0)
{
pool->m_exitNum--;
if (pool->m_aliveNum > pool->m_minNum)
{
pool->m_aliveNum--;
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}
}
}
// 判斷線程池是否被關(guān)閉了
if (pool->m_shutdown)
{
pthread_mutex_unlock(&pool->m_lock);
pool->threadExit();
}
// 從任務(wù)隊(duì)列中取出一個(gè)任務(wù)
Task task = pool->m_taskQ->takeTask();
// 工作的線程+1
pool->m_busyNum++;
// 線程池解鎖
pthread_mutex_unlock(&pool->m_lock);
// 執(zhí)行任務(wù)
cout << "thread " << to_string(pthread_self()) << " start working..." << endl;
task.function(task.arg);
delete task.arg;
task.arg = nullptr;
// 任務(wù)處理結(jié)束
cout << "thread " << to_string(pthread_self()) << " end working...";
pthread_mutex_lock(&pool->m_lock);
pool->m_busyNum--;
pthread_mutex_unlock(&pool->m_lock);
}
return nullptr;
}
// 管理者線程任務(wù)函數(shù)
void* ThreadPool::manager(void* arg)
{
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 如果線程池沒有關(guān)閉, 就一直檢測
while (!pool->m_shutdown)
{
// 每隔5s檢測一次
sleep(5);
// 取出線程池中的任務(wù)數(shù)和線程數(shù)量
// 取出工作的線程池?cái)?shù)量
pthread_mutex_lock(&pool->m_lock);
int queueSize = pool->m_taskQ->taskNumber();
int liveNum = pool->m_aliveNum;
int busyNum = pool->m_busyNum;
pthread_mutex_unlock(&pool->m_lock);
// 創(chuàng)建線程
const int NUMBER = 2;
// 當(dāng)前任務(wù)個(gè)數(shù)>存活的線程數(shù) && 存活的線程數(shù)<最大線程個(gè)數(shù)
if (queueSize > liveNum && liveNum < pool->m_maxNum)
{
// 線程池加鎖
pthread_mutex_lock(&pool->m_lock);
int num = 0;
for (int i = 0; i < pool->m_maxNum && num < NUMBER
&& pool->m_aliveNum < pool->m_maxNum; ++i)
{
if (pool->m_threadIDs[i] == 0)
{
pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);
num++;
pool->m_aliveNum++;
}
}
pthread_mutex_unlock(&pool->m_lock);
}
// 銷毀多余的線程
// 忙線程*2 < 存活的線程數(shù)目 && 存活的線程數(shù) > 最小線程數(shù)量
if (busyNum * 2 < liveNum && liveNum > pool->m_minNum)
{
pthread_mutex_lock(&pool->m_lock);
pool->m_exitNum = NUMBER;
pthread_mutex_unlock(&pool->m_lock);
for (int i = 0; i < NUMBER; ++i)
{
pthread_cond_signal(&pool->m_notEmpty);
}
}
}
return nullptr;
}
// 線程退出
void ThreadPool::threadExit()
{
pthread_t tid = pthread_self();
for (int i = 0; i < m_maxNum; ++i)
{
if (m_threadIDs[i] == tid)
{
cout << "threadExit() function: thread "
<< to_string(pthread_self()) << " exiting..." << endl;
m_threadIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
文章鏈接:https://subingwen.com/linux/threadpool-cpp/
