上篇文章主要讲了线程池的使用
Qt中的线程池QThreadPool
本篇文章使用Qt的条件变量 QWaitCondition ,实现一个简单的线程池
关于 QWaitCondition 的使用,可以参照 线程的互斥和同步(7)- Qt的条件变量QWaitCondition
先来说一下简单的思路,线程池初始化时创建一定数量的线程(8个) ,所有的线程处于挂起状态。
当任务添加到列表时,唤醒一个线程执行任务。如果当前未执行任务数>线程池中最大线程数时,等待有空闲的线程再执行。
这里定义了四个类来实现线程池:
完整代码下载:
https://github.com/douzhongqiang/threadCode/tree/master/CThreadPool
接下来看一下代码:
线程池类 CThreadPool 中的定义,头文件:
class CThreadPool
{
public:
CThreadPool();
~CThreadPool();
// 添加任务
void addTask(CRunnable* runnable);
private:
// 线程池
QVector<std::shared_ptr<CThread>> m_threads;
// 线程数目
int m_nThreadCount = 8;
// 数据
CThreadPoolData m_poolData;
};
函数 addTask 为添加任务接口,m_threads 为线程数组,存储所有的线程对象。
m_poolData 表示需要用到的数据,它的定义如下:
// 线程池相关数据类
class CThreadPoolData
{
public:
CThreadPoolData();
~CThreadPoolData();
// 任务列表
QList<std::shared_ptr<CRunnable>> m_taskList;
// 互斥锁和条件变量
QMutex m_mutex;
QWaitCondition m_waitCondition;
// 结束线程控制
std::atomic<bool> m_isRunning;
};
线程池的实现:
CThreadPool::CThreadPool()
{
// 设置线程池数目
m_nThreadCount = QThread::idealThreadCount();
// 创建线程池数组,并初始化
for (int i=0; i<m_nThreadCount; ++i)
{
CThread *pThread = new CThread(m_poolData);
m_threads.push_back(std::shared_ptr<CThread>(pThread));
pThread->start();
}
}
CThreadPool::~CThreadPool()
{
// 等待线程全部退出
m_poolData.m_isRunning = false;
// 全部唤醒
m_poolData.m_waitCondition.wakeAll();
// 等待所有线程退出
for (auto iter = m_threads.begin(); iter != m_threads.end(); ++iter)
(*iter)->wait();
}
void CThreadPool::addTask(CRunnable* runnable)
{
QMutexLocker locker(&m_poolData.m_mutex);
// 添加到任务列表
m_poolData.m_taskList.push_back(std::shared_ptr<CRunnable>(runnable));
// 唤醒一个线程,如果当前没有休眠线程则该信号会被忽略
m_poolData.m_waitCondition.wakeOne();
}
构造函数中,创建了 QThread::idealThreadCount() 个线程,放入到了线程对象数组中,并开启所有线程。
析构函数中,唤醒所有的线程,并等待线程结束。变量 m_isRunning 是每个线程中,退出 while() 循环的控制变量。
addTask 函数,将任务存入到任务列表中,并唤醒一个线程执行。如果当前没有挂起的线程,该信号会被忽略。
CThread 是继承自 QThread 的线程类,它的头文件定义:
class CThread : public QThread
{
Q_OBJECT
public:
CThread(CThreadPoolData& data);
~CThread();
void run(void) override;
private:
CThreadPoolData& m_data;
};
cpp文件:
CThread::CThread(CThreadPoolData& data)
:m_data(data)
{
}
CThread::~CThread()
{
}
void CThread::run(void)
{
while (m_data.m_isRunning)
{
// 加锁并等待被唤醒
QMutexLocker locker(&m_data.m_mutex);
m_data.m_waitCondition.wait(&m_data.m_mutex);
// 判断队列是否为空
while (!m_data.m_taskList.isEmpty())
{
auto runnable = m_data.m_taskList.takeFirst();
locker.unlock();
// Do Task(无锁状态执行)
runnable->run();
locker.relock();
}
}
}
run函数中,等待被唤醒,如果被唤醒,从任务队列中取出任务,并执行。
CRunnable 为任务基类,它的定义比较简单
class CRunnable
{
public:
CRunnable();
virtual ~CRunnable();
virtual void run(void) = 0;
};
接下来使用一个例子,测试一下这个线程池:
测试任务 TestTaskRunnable
头文件:
#include "CRunnable.h"
class TestTaskRunnable : public CRunnable
{
public:
void run(void) override;
};
cpp文件
std::atomic<int> g_number(0);
void TestTaskRunnable::run(void)
{
std::cout << "Number is " << g_number++ \
<< ", Thread Id is " << QThread::currentThreadId() \
<< std::endl;
}
这里任务执行的内容比较简单,将全部变量 g_number 自增1,并打印线程ID。
main 函数中调用
int main(int argc, char *argv[])
{
CThreadPool threadpool;
for (int i = 0; i < 20; ++i)
{
TestTaskRunnable* taskRunnable = new TestTaskRunnable;
threadpool.addTask(taskRunnable);
}
system("pause");
return 0;
}
因为这里添加任务时使用的带引用计数的智能指针,无需我们考虑内存释放的问题。
运行结果:
Number is 0, Thread Id is 0000000000004DF8
Number is 8, Thread Id is 0000000000004DF8
Number is 9, Thread Id is 0000000000004DF8
Number is 7, Thread Id is 0000000000000468
Number is 4, Thread Id is 00000000000034D4
Number is 2, Thread Id is 0000000000001680
Number is 10, Thread Id is 0000000000004DF8
Number is 13, Thread Id is 0000000000004DF8
Number is 14, Thread Id is 0000000000004DF8
Number is 12, Thread Id is 0000000000001680
Number is 15, Thread Id is 0000000000001680
Number is 11, Thread Id is 00000000000034D4
Number is 1, Thread Id is 0000000000003D80
Number is 5, Thread Id is 0000000000003168
Number is 3, Thread Id is 000000000000057C
Number is 16, Thread Id is 00000000000034D4
Number is 6, Thread Id is 0000000000004220
Number is 19, Thread Id is 0000000000001680
Number is 18, Thread Id is 0000000000003168
Number is 17, Thread Id is 0000000000004DF8