043 线程池与线程封装

线程池与线程封装

1. 线程池

1. ThreadPool.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
using namespace std;

// 线程信息结构体:保存每个线程的基本信息
struct ThreadInfo
{
pthread_t tid; // 线程ID
string name; // 线程名称
};

static const int default_num = 5; // 默认线程数量

template <class T>
class ThreadPool
{
public:
// 加锁操作:保护共享资源不被多个线程同时访问
void Lock()
{
pthread_mutex_lock(&mutex_); // 获取互斥锁
}

// 解锁操作:释放互斥锁,让其他线程可以访问共享资源
void Unlock()
{
pthread_mutex_unlock(&mutex_); // 释放互斥锁
}

// 唤醒一个等待的线程:当有新任务时唤醒空闲的线程
void Wakeup()
{
pthread_cond_signal(&cond_); // 唤醒一个等待的线程
}

// 线程睡眠等待:让线程进入等待状态,节省CPU资源
void ThreadSleep()
{
pthread_cond_wait(&cond_, &mutex_); // 等待条件变量,会自动释放锁
}

// 判断任务队列是否为空
bool IsQueueEmpty()
{
return tasks_.empty(); // 返回任务队列是否为空
}

// 根据线程ID获取线程名称:用于日志输出,知道是哪个线程在工作
string GetThreadName(pthread_t tid)
{
for (const auto &ti : threads_)
{
if (ti.tid == tid)
return ti.name;
}

return "Unknown"; // 找不到返回未知
}

public:
// 线程执行函数:每个工作线程都会执行这个函数
static void *HandlerTask(void *args)
{
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); // 获取线程池对象指针
string name = tp->GetThreadName(pthread_self()); // 获取当前线程的名字

while (true) // 线程一直运行,不会退出
{
tp->Lock(); // 加锁保护任务队列

// 等待任务:如果任务队列为空,线程就睡觉等待
while (tp->IsQueueEmpty())
{
cout << name << " 等待任务..." << endl;
tp->ThreadSleep(); // 睡觉等待新任务
}

T t = tp->Pop(); // 从队列取出一个任务
tp->Unlock(); // 解锁,让其他线程可以访问队列

t(); // 执行任务(调用任务的operator())
cout << name << " 运行任务,结果是:" << t.get_result() << endl;
}

return nullptr;
}

// 启动线程池:创建所有的工作线程
void Start()
{
int num = threads_.size(); // 获取要创建的线程数量
cout << "启动 " << num << " 个线程..." << endl;

for (int i = 0; i < num; i++)
{
threads_[i].name = "[线程 " + to_string(i + 1) + "]"; // 给每个线程起名字
// 创建线程:&threads_[i].tid保存线程ID,HandlerTask是线程要执行的函数,this是传给函数的参数
pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this);
cout << "创建线程 " << threads_[i].name << endl;
}
}

// 从任务队列取出任务
T Pop()
{
T t = tasks_.front(); // 取出队列第一个任务
tasks_.pop(); // 从队列中删除这个任务
return t; // 返回任务
}

// 向任务队列添加任务:外部程序调用这个函数来提交任务
void Push(const T &t)
{
Lock(); // 加锁
tasks_.push(t); // 把新任务加入队列
Wakeup(); // 唤醒一个睡觉的线程来处理任务
Unlock(); // 解锁
}

// 获取线程池单例实例:保证整个程序只有一个线程池对象
static ThreadPool<T> *GetInstance()
{
// 第一次检查:如果已经创建了实例,就直接返回(不需要加锁,提高性能)
if (nullptr == tp_)
{
pthread_mutex_lock(&lock_); // 加锁:防止多个线程同时创建实例
// 第二次检查:再次确认还没有创建实例(防止在等待锁的时候其他线程已经创建了)
if (nullptr == tp_)
{
cout << "log:单例线程池首次创建完成!" << endl;
tp_ = new ThreadPool<T>(); // 创建唯一的线程池实例
}
pthread_mutex_unlock(&lock_); // 解锁
}

return tp_; // 返回线程池实例
}

private:
// 私有构造函数:防止外部直接创建对象,只能通过GetInstance获取
ThreadPool(int num = default_num) : threads_(num)
{
pthread_mutex_init(&mutex_, nullptr); // 初始化互斥锁
pthread_cond_init(&cond_, nullptr); // 初始化条件变量
}

// 私有析构函数:防止外部随意销毁线程池
~ThreadPool()
{
pthread_mutex_destroy(&mutex_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}

// 禁用拷贝构造函数:防止线程池对象被复制(因为线程不能被复制)
// ThreadPool tp1; ThreadPool tp2 = tp1; // 这样就不允许
ThreadPool(const ThreadPool<T> &) = delete;

// 禁用赋值操作符:防止线程池对象被赋值(因为线程不能被赋值)
// ThreadPool tp1, tp2; tp2 = tp1; // 这样就不允许
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

private:
vector<ThreadInfo> threads_; // 保存所有工作线程的信息
queue<T> tasks_; // 任务队列:存放等待执行的任务

pthread_mutex_t mutex_; // 互斥锁:保护任务队列不被多个线程同时修改
pthread_cond_t cond_; // 条件变量:用于线程间的等待和唤醒

// 单例指针:指向唯一的线程池实例
// static修饰:这个变量属于类本身,不属于任何对象,所有对象共享这一个变量
static ThreadPool<T> *tp_;

// 单例保护锁:保护创建单例实例的过程,防止多个线程同时创建
// static修饰:这个锁属于类本身,用于保护静态成员变量tp_
static pthread_mutex_t lock_;
};

// 下面是静态成员变量定义:必须在类外单独定义,这些变量在程序启动时就存在,不依赖于任何对象

// 开始时为空,第一次调用GetInstance时才会创建
template <class T>
ThreadPool<T> *ThreadPool<T>::tp_ = nullptr;

// 使用PTHREAD_MUTEX_INITIALIZER初始化
template <class T>
pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;

2. Task.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#pragma once
#include <iostream>
#include <string>
using namespace std;

const string opers = "+-*/%"; // 支持的操作符集合

// 错误代码枚举定义
enum
{
SUCCESS = 0, // 成功
DIV_ERROR = 1, // 除零错误
MOD_ERROR = 2, // 取模零错误
UNKNOWN_ERROR = 3 // 未知操作符错误
};

class Task
{
public:
// 默认构造函数
Task()
:_x(0), _y(0), _op('+'), _ret(0), _code(SUCCESS)
{}

// 带参数构造函数:初始化任务参数
Task(int x, int y, char op = '+')
:_x(x),
_y(y),
_op(op),
_ret(0),
_code(SUCCESS)
{
// 验证操作符是否合法
if(op != '+' && op != '-' && op != '*' && op != '/' && op != '%')
{
_op = '+'; // 默认为加法
_code = UNKNOWN_ERROR; // 设置错误码
}
}

// 执行运算任务
void run()
{
// 重置结果和错误码(避免重复调用时的问题)
_ret = 0;
_code = SUCCESS;

switch(_op)
{
case '+':
_ret = _x + _y;
break;
case '-':
_ret = _x - _y;
break;
case '*':
_ret = _x * _y;
break;
case '/':
if(_y == 0)
{
_code = DIV_ERROR; // 除零错误
_ret = 0;
}
else
{
_ret = _x / _y;
}
break;
case '%':
if(_y == 0)
{
_code = MOD_ERROR; // 取模零错误
_ret = 0;
}
else
{
_ret = _x % _y;
}
break;
default:
_code = UNKNOWN_ERROR; // 未知操作符错误
_ret = 0;
break;
}
}

// 重载函数调用操作符,使Task对象可以像函数一样调用
void operator()()
{
run(); // 执行运算
}

// 获取任务描述字符串
string get_task() const
{
return to_string(_x) + _op + to_string(_y) + "= ???"; // 格式:x op y = ???
}

// 获取运算结果字符串
string get_ret() const
{
string ret = to_string(_x) + _op + to_string(_y) + "=" + to_string(_ret) +
" [错误代码:" + to_string(_code) + "]";
return ret;
}

// 获取操作符
char get_operator() const
{
return _op;
}

// 获取第一个操作数
int get_first_operand() const
{
return _x;
}

// 获取第二个操作数
int get_second_operand() const
{
return _y;
}

// 获取运算结果
int get_result() const
{
return _ret;
}

// 获取错误代码
int get_error_code() const
{
return _code;
}

// 析构函数
~Task()
{
}

private:
int _x, _y; // 两个操作数
int _ret; // 运算结果
char _op; // 操作符
int _code; // 错误代码
};

3. Main.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <signal.h>

// pthread_spinlock_t slock; // 全局自旋锁
volatile bool running = true; // 信号处理标志,用于优雅退出

// 信号处理函数:捕获Ctrl+C等退出信号
void signalHandler(int signum)
{
cout << "\n接收到信号 " << signum << ",正在退出..." << endl;
running = false;
}

int main()
{
// 注册信号处理函数,捕获Ctrl+C (SIGINT) 和终止信号 (SIGTERM)
signal(SIGINT, signalHandler);
signal(SIGTERM, signalHandler);

cout << "线程池启动中..." << endl;
sleep(1);

ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance(); // 获取线程池单例实例并启动线程池
tp->Start(); // 启动工作线程

srand(time(nullptr) ^ getpid()); // 设置随机种子

while(running) // 持续生成任务
{
// 1. 构建随机任务
int x = rand() % 20 + 1; // 生成[1,20]的随机数
usleep(10); // 微秒延迟,增加随机性
int y = rand() % 10; // 生成[0,9]的随机数
char op = opers[rand() % opers.size()]; // 随机选择操作符

Task t(x, y, op); // 创建任务对象

// 2. 将任务提交给线程池处理
tp->Push(t); // 添加任务到线程池

cout << "[主线程] 创建任务: " << t.get_task() << endl;

sleep(1); // 每1秒生成一个任务
}

cout << "程序正常退出" << endl;
return 0;
}

2. C++ 语言层面上的线程封装 demo(简易)

MyThread.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <ctime>
using namespace std;

typedef void (*callback_t)(); // 无参数无返回值的函数指针
static int thread_num = 0; // 全局线程编号计数器

class Thread
{
public:
static void* thread_func(void* arg) // 线程函数
{
Thread* thread = static_cast<Thread*>(arg); // 转换为线程指针
thread->Enter_callback();

}
public:
Thread(callback_t cb)
:tid_(0), // 初始化线程ID为0
name_(""), // 初始化线程名称为空
start_timestamp_(0), // 初始化启动时间戳为0
isrunning_(false), // 初始化运行状态为false
cb_(cb) // 保存回调函数
{

}

~Thread() // 析构函数
{

}

void run() // 启动线程
{
name_ = "thread-" + to_string(thread_num++); // 设置线程名称
start_timestamp_ = time(nullptr); // 记录启动时间戳
isrunning_ = true; // 设置运行状态为true
pthread_create(&tid_, nullptr, thread_func, this); // 创建线程
}

void Enter_callback()
{
cb_(); // 执行回调函数
}

bool is_runing() // 判断线程是否运行
{
return isrunning_; // 返回运行状态
}

uint64_t start_timestamp() // 获取启动时间戳
{
return start_timestamp_; // 返回启动时间戳
}

string name() // 获取线程名称
{
return name_; // 返回线程名称
}

void join() // 等待线程结束
{
pthread_join(tid_, nullptr); // 等待线程结束
isrunning_ = false; // 设置运行状态为false
}

private:
pthread_t tid_; // 线程ID
string name_; // 线程名称
uint64_t start_timestamp_; // 启动时间戳
bool isrunning_; // 运行状态标志

callback_t cb_; // 回调函数指针
};

2. Main.cc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include "MyThread.hpp"
#include <vector>
#include <unistd.h>

int threads_num = 3;

void print()
{
int count = 0;
while(count < threads_num)
{
cout << "我是一个 C++ 语言层面上封装的线程!" << "代号是:" << count++ << endl;
sleep(1);
}

cout << "所有线程都结束了!" << endl;
}

int main()
{
vector<Thread> threads;
for (int i = 0; i < threads_num; ++i)
{
threads.push_back(Thread(print));
}

cout << "开始启动所有线程喽~" << endl;

for(auto &x : threads)
{
x.run();
cout << "线程 " << x.name() << " 启动成功!其时间戳的值是:" << x.start_timestamp() << endl;
}

for(auto &x : threads)
{
x.join();
}

cout << "所有线程都结束了!" << endl;

return 0;
}

当然,这个 demo 仅实现了两个函数的封装,也没有进行加锁,会导致数据竞争,整体来说并不完美,仅为了展示如何实现底层封装。