042 生产者 - 消费者模型

生产者 - 消费者模型(CP 问题)

1. 生产者-消费者模型(CP 问题)是什么?

这是并发编程中最经典的问题之一,主要描述 两个线程/进程之间的数据交换协作问题

  • 生产者:不断生产数据,放入缓冲区(仓库、通道)。
  • 消费者:不断从缓冲区中取出数据进行处理。

但问题在于:

  1. 缓冲区有 容量限制
  2. 多线程并发会导致 竞争访问资源

所以需要设计好 同步机制(比如互斥锁、条件变量、信号量等)保证:

  • 生产者不能在缓冲区满的时候继续放;
  • 消费者不能在缓冲区空的时候继续取;
  • 多个线程操作共享资源不会冲突。

image-20250803204315786

2. 什么是“解耦”?为什么要解耦?

1. 解耦的本质

解耦指的是将系统中的不同组件或模块之间的依赖关系降低,使它们能够独立地进行开发、修改和维护。在生产者 - 消费者模型中,解耦就是要让生产者和消费者之间的直接关联尽可能减少,各自可以独立地运行和变化,而不会因为一方的改变对另一方造成过大的影响。简单来说 解耦就是降低模块之间的依赖性,提高系统的可扩展性和灵活性

在 CP 问题中,供货商和消费者通过缓冲区(超市)进行 间接通信,实现了解耦。

2. 为什么要解耦?

  • 不解耦的弊端:供货商得知道哪个消费者要什么、什么时候要,耦合性太强,代码难维护。
  • 解耦后的优势
    • 可以任意增加/减少生产者或消费者数量(线程扩展方便)。
    • 各自只关心“缓冲区的状态”,不需要处理对方的逻辑。
    • 模块职责清晰,便于调试、优化和扩展。

3. 现实类比:供应链的“供货商 - 超市 - 消费者”

1. 现实类比

角色 类比 现实意义
供货商 生产者 批量生产商品的人
超市 缓冲区 Buffer / 仓库 商品流通的中转站,有货架/仓库
消费者 消费者 前来购物的顾客

2. “321 原则” 理解 CP 问题


1. “3 种关系”

  • 生产者 vs 生产者(互斥):不同的供货商之间存在竞争关系。例如,两家不同品牌的饮料供货商,他们都希望自己的产品能够在超市中占据更多的货架空间,获得更高的销量。为了实现这一目标,他们会在产品质量、价格、营销策略等方面展开竞争,这种竞争关系就是互斥的,因为在一定的市场份额下,一家供货商销量的增加往往意味着其他供货商销量的减少。
  • 消费者 vs 消费者(互斥):消费者之间也存在一定的互斥关系。比如在超市进行促销活动时,某些热门商品的数量有限,消费者之间就会为了抢购这些商品而产生竞争。例如,限量版的鞋子、热门的电子产品等,先到的消费者有更大的机会购买到,而后到的消费者可能就会错失购买机会。
  • 生产者 vs 消费者(互斥、同步):
    • 互斥:供货商希望以较高的价格出售商品以获取更多利润,而消费者则希望以较低的价格购买到心仪的商品,双方在价格方面存在利益冲突,这是互斥的表现。
    • 同步:供货商需要根据消费者的需求来生产商品,如果生产的商品不符合消费者的需求,就会造成库存积压。而消费者的购买行为也会影响供货商的生产计划,例如当某种商品的销量大增时,供货商可能会增加该商品的生产。所以生产者和消费者之间需要保持一定的同步关系,以维持市场的供需平衡。

供货商与消费者 不需要彼此知道对方是谁,通过中间的缓冲区(超市)就能协作完成“商品流转” —— 这就是 解耦

2. “2 种角色”

角色 行为
生产者(供货商) 负责生成产品,放入缓冲区
消费者(顾客) 负责从缓冲区获取产品,进行消费

注意:

  • 二者不直接通信,不依赖对方的状态;
  • 只是“对缓冲区”的操作要同步协调。

3. “1 个交易场所”

缓冲区就像是“交易中转站”,相当于现实生活的“超市”。

1 个交易场所:超市就是生产者(供货商)和消费者进行交易的场所。超市为供货商提供了销售渠道,将众多供货商的商品集中展示,方便消费者进行选购。

特点:

  • 有容量上限:超市货架就这么大,不能无限放。
  • 是中间角色:解耦了供货商和消费者之间的依赖。

4. 代码示例 —— 基于 BlockingQueue 的生产者消费者模型

image-20250803204513691

1. BlockingQueue.hpp

阻塞队列类模板,提供线程安全的任务存储和同步机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;

template <class T>
class BlockingQueue
{
private:
queue<T> q_; // STL队列,存储实际数据
int max_capacity; // 队列最大容量
pthread_mutex_t mutex_; // 互斥锁,保护共享资源
pthread_cond_t not_empty_; // 非空条件变量,消费者等待
pthread_cond_t not_full_; // 非满条件变量,生产者等待

public:
static const int DEFAULT_MAX_CAPACITY = 10; // 队列默认初始容量

// 构造函数:初始化队列和同步变量
BlockingQueue(int max_capacity = DEFAULT_MAX_CAPACITY)
:max_capacity(max_capacity)
{
pthread_mutex_init(&mutex_, nullptr); // 初始化互斥锁
pthread_cond_init(&not_empty_, nullptr); // 初始化非空条件变量
pthread_cond_init(&not_full_, nullptr); // 初始化非满条件变量
}

// 析构函数:清理资源
~BlockingQueue()
{
pthread_mutex_destroy(&mutex_); // 销毁互斥锁
pthread_cond_destroy(&not_empty_); // 销毁非空条件变量
pthread_cond_destroy(&not_full_); // 销毁非满条件变量
}

// 生产者添加元素进行入队
void push(const T& item)
{
pthread_mutex_lock(&mutex_); // 加锁,保护共享资源

// 队列已满,等待非满条件变量
// 使用while防止虚假唤醒
while (q_.size() >= max_capacity)
{
pthread_cond_wait(&not_full_, &mutex_); // 等待队列有空间
}

q_.push(item); // 入队
pthread_cond_signal(&not_empty_); // 唤醒等待的消费者线程
pthread_mutex_unlock(&mutex_); // 解锁
}

// 消费者取出元素进行出队
T pop()
{
pthread_mutex_lock(&mutex_); // 加锁

// 队列空则等待
// 使用while防止虚假唤醒
while (q_.empty())
{
pthread_cond_wait(&not_empty_, &mutex_); // 等待队列有数据
}

T item = q_.front(); // 获取队首元素
q_.pop(); // 出队

pthread_cond_signal(&not_full_); // 唤醒等待的生产者线程
pthread_mutex_unlock(&mutex_); // 解锁

return item; // 返回元素
}
};

2. Task.hpp

任务类,封装数学运算任务的执行和结果处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#pragma once
#include <iostream>
#include <string>
using namespace std;

// 错误代码枚举
enum
{
SUCCESS = 0, // 成功
DIV_ERROR = 1, // 除零错误
MOD_ERROR = 2, // 取模零错误
};

class Task
{
private:
int _x, _y; // 两个操作数
int _ret; // 运算结果
char _op; // 操作符
int _code; // 错误代码

public:
// 构造函数:初始化任务参数
Task(int x = 0, int y = 0, char op = '+')
:_x(x),
_y(y),
_op(op),
_ret(0),
_code(SUCCESS)
{
}

// 执行运算任务
void run()
{
switch(_op)
{
case '+':
_ret = _x + _y;
break;
case '-':
_ret = _x - _y;
break;
case '*':
_ret = _x * _y;
break;
case '/':
if(_y == 0)
{
_code = DIV_ERROR; // 除零错误
_ret = 0;
}
else
{
_ret = _x / _y;
}
break;
case '%':
if(_y == 0)
{
_code = MOD_ERROR; // 取模零错误
_ret = 0;
}
else
{
_ret = _x % _y;
}
break;
default:
_code = MOD_ERROR; // 未知操作符错误
_ret = 0;
break;
}
}

// 重载函数调用操作符,使Task对象可以像函数一样调用
void operator()()
{
run(); // 执行运算
}

// 获取任务描述字符串
string get_task() const
{
return to_string(_x) + _op + to_string(_y) + "= ???"; // 格式:x op y = ???
}

// 获取运算结果字符串
string get_ret() const
{
string ret = to_string(_x) + _op + to_string(_y) + "=" + to_string(_ret) +
" [错误代码]:" + to_string(_code);
return ret;
}
};

3. main.cpp

主程序,创建多个生产者线程生成随机任务,多个消费者线程处理任务并输出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include "BlockingQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
#include <sys/time.h>

const string ops = "+-*/%"; // 操作符
BlockingQueue<Task>* TaskQueue; // 全局任务队列指针
pthread_mutex_t g_print_mutex = PTHREAD_MUTEX_INITIALIZER; // 全局打印互斥锁,确保输出尽量不交错

// 获取当前时间戳(毫秒),用于标识消息的产生时间
long long get_timestamp()
{
struct timeval tv; // 时间结构体
gettimeofday(&tv, nullptr); // 获取当前时间
return tv.tv_sec * 1000 + tv.tv_usec / 1000; // 转换为毫秒时间戳
}

// 线程安全的打印函数,添加时间戳便于观察执行顺序
void safe_print(const string& message)
{
pthread_mutex_lock(&g_print_mutex); // 加锁,防止输出交错
cout << "[" << get_timestamp() << "] " << message << endl; // 带时间戳的输出
pthread_mutex_unlock(&g_print_mutex); // 解锁
}

// 消费者线程函数
void* consumer(void* arg)
{
int id = *(int*)arg; // 获取线程ID

while(true) // 持续消费
{
Task task = TaskQueue->pop(); // 从队列取出任务(可能阻塞等待)
task(); // 执行任务运算

// 构造消费完成的消息
string s = "[消费者 " + to_string(id) + "] 完成了任务:" + task.get_task() + ",结果为:" + task.get_ret();
safe_print(s); // 线程安全的打印,带时间戳
usleep(500000); // 500ms延迟,控制消费速度
}

pthread_exit(nullptr); // 线程退出
}

// 生产者线程函数
void* producer(void* arg)
{
int id = *(int*)arg; // 获取线程ID
srand(time(nullptr) + id * 1000); // 设置随机种子,避免重复

while(true) // 持续生产
{
int x = rand() % 50 + 1; // 生成随机数 [1, 50],范围更小便于观察
int y = rand() % 50 + 1; // 生成随机数 [1, 50]
char op = ops[rand() % ops.size()]; // 随机选择操作符

Task task(x, y, op); // 创建任务对象

// 先打印生产消息,再放入队列
string s = "[生产者 " + to_string(id) + "] 生产了任务:" + task.get_task();
safe_print(s); // 线程安全的打印,带时间戳

TaskQueue->push(task); // 将任务放入队列(可能阻塞等待)
usleep(1000000); // 1秒延迟,控制生产速度
}

pthread_exit(nullptr); // 线程退出
}

int main()
{
TaskQueue = new BlockingQueue<Task>(5); // 创建容量为5的阻塞队列

int ids[5] = { 1, 2, 3, 4, 5 }; // 线程ID数组

// 创建2个消费者线程
cout << "创建 2 个消费者线程" << endl;
pthread_t Consumers[2];
for(int i = 0; i < 2; i++)
{
pthread_create(&Consumers[i], nullptr, consumer, &ids[i]); // 创建消费者线程
}

// 创建3个生产者线程
cout << "创建 3 个生产者线程" << endl;
pthread_t Producers[3];
for(int i = 0; i < 3; i++)
{
pthread_create(&Producers[i], nullptr, producer, &ids[i+2]); // 创建生产者线程
}

// 等待所有线程结束(实际程序不会退出)
for(int i = 0; i < 2; i++)
{
pthread_join(Consumers[i], nullptr); // 等待消费者线程
}
for(int i = 0; i < 3; i++)
{
pthread_join(Producers[i], nullptr); // 等待生产者线程
}

delete TaskQueue; // 释放队列内存
return 0;
}

5. POSIX 信号量

1. 什么是 POSIX 信号量?有什么用?

POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步,属于 POSIX 标准的一部分(定义在 <semaphore.h> 头文件中),主要用于控制对共享资源的访问,避免竞争条件,实现互斥与同步。

2. POSIX 信号量函数

同样的,这部分函数和之前的 pthread_mutex_init 系列函数十分相似,这里就不详细讲解了,类比使用即可。注意头文件是 <semaphore.h>

作用 函数原型 参数说明 返回值
初始化 信号量 int sem_init(sem_t *sem, int pshared, unsigned int value); sem:信号量指针
pshared:0 表示用于线程间共享;非 0 表示进程间共享
value:初始值,表示资源初始数量
0 表示成功,非 0 表示失败(可用 errno 获取错误)
销毁 信号量 int sem_destroy(sem_t *sem); sem:信号量指针 0 成功,非 0 失败
等待(P 操作) int sem_wait(sem_t *sem); sem:信号量指,如果 sem > 0,则减一并继续执行;值为 0,则阻塞等待 0 成功,-1 失败
非阻塞等待 int sem_trywait(sem_t *sem); sem:信号量指针 如果资源不足,不会阻塞,而是立即返回错误(适合用于非阻塞检测场景) 0 成功,-1 失败
发布(增加)信号量(V 操作) int sem_post(sem_t *sem); sem:信号量指针 将信号量值加 1,如果有等待线程,将唤醒其中一个 0 成功,-1 失败

这里把信号量的工作流程(P/V 操作)单拎出来进行强调:

操作 含义 行为描述
sem_wait() P 操作(wait) 当前线程尝试获取资源: 若信号量值 > 0,获取成功(减 1) 若 = 0,则阻塞等待
sem_post() V 操作(signal) 当前线程释放资源: 信号量值 +1,若有等待线程,唤醒一个

3. 代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;

sem_t sem;

void* worker(void* arg)
{
int id = *(int*)arg;

sem_wait(&sem); // 请求信号量(如果值为0,则阻塞等待)
cout << "Thread:" << id << " 获取到了信号量" << endl;

sleep(1); // 模拟工作

sem_post(&sem); // 释放信号量
cout << "Thread:" << id << " 释放了信号量" << endl;

return nullptr;
}

int main()
{
sem_init(&sem, 0, 3); // 初始化信号量,最多允许3个线程同时进入

pthread_t threads[10];
int ids[10];
for(int i = 0; i < 10; i++)
{
ids[i] = i;
pthread_create(&threads[i], nullptr, worker, &ids[i]);
}

for(int i = 0; i < 10; i++)
{
pthread_join(threads[i], nullptr);
}

sem_destroy(&sem); // 清理资源
return 0;
}

6. 基于环形队列的生产消费模型

1. 什么是环形队列生产消费模型?

基于环形队列的生产者-消费者模型 也是一种常见的并发编程设计,其核心是通过环形队列(循环队列)作为缓冲区,协调 “生产者” 和 “消费者” 两个角色的工作:生产者负责生成数据并放入队列,消费者负责从队列中取出数据并处理。从而实现生产者与消费者之间的数据传递和解耦。需要注意的是双方互不直接通信,而是通过队列进行 异步协作

image-20250803203650239

2. 特点

  • 环形队列是一个 固定容量、首尾相接的循环缓冲区
  • 使用两个指针:
    • head / front → 消费者读取位置。
    • tail / rear → 生产者写入位置。
  • 每次写或读都会环形移动(取模)。

3. 环形队列中判空 / 判满的处理(重点)

由于 frontrear 都在循环移动,当队列为空或为满时,都会出现 front == rear 的情况,因此环形队列最关键的难点就是 如何区分“队列空”与“队列满”

image-20250803203221100

image-20250803203514374

4. 常见解决方案

方法 原理 判空条件 判满条件 优缺点
1. 多空一法(保留一个空位,即浪费一个存储单元 规定队列容量为 N 时,最多存 N-1 个元素。 head == tail (tail + 1) % size == head ✅ 简单高效,逻辑清晰,边界状态安全
❌ 实际可用容量为 size - 1,浪费一个单元空间
2. 引入计数器 count 使用一个 int count 变量单独记录当前队列中元素数量。 count == 0 count == size ✅ 空间完全利用
❌ 需要额外同步 count 变量,线程并发时处理更复杂
3. 标志(记)位法(flag) 设置一个标志位 bool full 来判断当前状态。 head == tail && !full head == tail && full ✅ 读写位置逻辑简洁清晰
❌ 实现复杂,易出错

5. 代码示例

1. RingQueue.hpp

基于信号量和互斥锁实现的线程安全环形队列模板类,使用两个信号量分别控制数据资源和空间资源,两个互斥锁保护生产者和消费者的并发访问位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#pragma once
#include <iostream>
#include <vector>
#include <pthread.h>
#include <semaphore.h>
using namespace std;

const static int default_capacity = 5; // 环形队列默认容量

template <class T>
class RingQueue
{
private:
// P操作:等待信号量(资源减少)
void P(sem_t& sem)
{
sem_wait(&sem); // 等待信号量,原子操作
}

// V操作:释放信号量(资源增加)
void V(sem_t& sem)
{
sem_post(&sem); // 释放信号量,原子操作
}

// 加锁操作
void Lock(pthread_mutex_t& mutex)
{
pthread_mutex_lock(&mutex); // 加锁(获取互斥锁),原子操作
}

// 解锁操作
void Unlock(pthread_mutex_t& mutex)
{
pthread_mutex_unlock(&mutex); // 解锁(释放互斥锁),原子操作
}

public:
RingQueue(int capacity = default_capacity)
:ringqueue_(capacity),
capacity_(capacity),
c_index_(0),
p_index_(0)
{
sem_init(&cdata_sem_, 0, 0); // 初始化消费者数据信号量为0(初始无数据)
sem_init(&pspace_sem_, 0, capacity_); // 初始化生产者空间信号量为cap(初始全部空间)

pthread_mutex_init(&c_mutex_, nullptr); // 初始化消费者互斥锁
pthread_mutex_init(&p_mutex_, nullptr); // 初始化生产者互斥锁
}

~RingQueue()
{
sem_destroy(&cdata_sem_); // 销毁消费者数据信号量
sem_destroy(&pspace_sem_); // 销毁生产者空间信号量

pthread_mutex_destroy(&c_mutex_); // 销毁消费者互斥锁
pthread_mutex_destroy(&p_mutex_); // 销毁生产者互斥锁
}

// 生产者:向环形队列添加元素
void Push(const T& in)
{
P(pspace_sem_); // 等待可用空间(空间资源-1)

Lock(p_mutex_); // 加锁保护生产者写入位置
ringqueue_[p_index_] = in; // 在生产者位置写入数据

// 位置后移,维持环形特性
p_index_++; // 生产者下标后移
p_index_ %= capacity_; // 取模实现环形
Unlock(p_mutex_); // 解锁

V(cdata_sem_); // 增加数据资源(唤醒消费者)
}

// 消费者:从环形队列取出元素
void Pop(T* out)
{
P(cdata_sem_); // 等待可用数据(数据资源-1)

Lock(c_mutex_); // 加锁保护消费者读取位置
*out = ringqueue_[c_index_]; // 从消费者位置读取数据

// 位置后移,维持环形特性
c_index_++; // 消费者下标后移
c_index_ %= capacity_; // 取模实现环形
Unlock(c_mutex_); // 解锁

V(pspace_sem_); // 增加空间资源(唤醒生产者)
}

private:
vector<T> ringqueue_; // 存储数据的环形数组
int capacity_; // 队列容量

int c_index_; // 消费者当前位置下标
int p_index_; // 生产者当前位置下标

sem_t cdata_sem_; // 消费者关注的数据资源信号量(有多少数据可消费)
sem_t pspace_sem_; // 生产者关注的空间资源信号量(有多少空间可生产)

pthread_mutex_t c_mutex_; // 消费者互斥锁,保护消费者位置
pthread_mutex_t p_mutex_; // 生产者互斥锁,保护生产者位置
};

2. Task.hpp

封装数学运算任务的类,支持加减乘除取模五种运算,包含错误处理机制,提供任务执行和结果获取功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#pragma once
#include <iostream>
#include <string>
using namespace std;

const string opers = "+-*/%"; // 支持的操作符集合

// 错误代码枚举定义
enum
{
SUCCESS = 0, // 成功
DIV_ERROR = 1, // 除零错误
MOD_ERROR = 2, // 取模零错误
UNKNOWN_ERROR = 3 // 未知操作符错误
};

class Task
{
public:
// 默认构造函数
Task()
:_x(0), _y(0), _op('+'), _ret(0), _code(SUCCESS)
{}

// 带参数构造函数:初始化任务参数
Task(int x, int y, char op = '+')
:_x(x),
_y(y),
_op(op),
_ret(0),
_code(SUCCESS)
{
// 验证操作符是否合法
if(op != '+' && op != '-' && op != '*' && op != '/' && op != '%')
{
_op = '+'; // 默认为加法
_code = UNKNOWN_ERROR; // 设置错误码
}
}

// 执行运算任务
void run()
{
// 重置结果和错误码(避免重复调用时的问题)
_ret = 0;
_code = SUCCESS;

switch(_op)
{
case '+':
_ret = _x + _y;
break;
case '-':
_ret = _x - _y;
break;
case '*':
_ret = _x * _y;
break;
case '/':
if(_y == 0)
{
_code = DIV_ERROR; // 除零错误
_ret = 0;
}
else
{
_ret = _x / _y;
}
break;
case '%':
if(_y == 0)
{
_code = MOD_ERROR; // 取模零错误
_ret = 0;
}
else
{
_ret = _x % _y;
}
break;
default:
_code = UNKNOWN_ERROR; // 未知操作符错误
_ret = 0;
break;
}
}

// 重载函数调用操作符,使Task对象可以像函数一样调用
void operator()()
{
run(); // 执行运算
}

// 获取任务描述字符串
string get_task() const
{
return to_string(_x) + _op + to_string(_y) + "= ???"; // 格式:x op y = ???
}

// 获取运算结果字符串
string get_ret() const
{
string ret = to_string(_x) + _op + to_string(_y) + "=" + to_string(_ret) +
" [错误代码:" + to_string(_code) + "]";
return ret;
}

// 获取操作符
char get_operator() const
{
return _op;
}

// 获取第一个操作数
int get_first_operand() const
{
return _x;
}

// 获取第二个操作数
int get_second_operand() const
{
return _y;
}

// 获取运算结果
int get_result() const
{
return _ret;
}

// 获取错误代码
int get_error_code() const
{
return _code;
}

// 析构函数
~Task()
{
}

private:
int _x, _y; // 两个操作数
int _ret; // 运算结果
char _op; // 操作符
int _code; // 错误代码
};

3. Main.cc

创建多个生产者和消费者线程,生产者生成随机运算任务放入环形队列,消费者从队列取出任务并执行计算,演示多线程协作的生产消费模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
#include <iostream>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"
using namespace std;

// 线程数据结构,传递给线程函数
struct ThreadData
{
RingQueue<Task> *rq; // 环形队列指针
string threadname; // 线程名称
};

// 生产者线程函数
void *Productor(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args); // 获取线程数据
RingQueue<Task> *rq = td->rq; // 获取环形队列
string name = td->threadname; // 获取线程名称
int len = opers.size(); // 操作符个数

while (true) // 持续生产
{
// 1. 获取数据:生成随机任务
int data1 = rand() % 10 + 1; // 生成[1,10]随机数
usleep(10); // 微秒延迟
int data2 = rand() % 10; // 生成[0,9]随机数
char op = opers[rand() % len]; // 随机选择操作符
Task t(data1, data2, op); // 创建任务对象

// 2. 生产数据:将任务放入环形队列
rq->Push(t); // 生产任务

// 修复打印格式,添加换行符使输出更清晰
cout << name << " 生产任务:" << t.get_task() << endl;

sleep(1); // 1秒生产间隔
}
return nullptr;
}

// 消费者线程函数
void *Consumer(void *args)
{
ThreadData *td = static_cast<ThreadData*>(args); // 获取线程数据
RingQueue<Task> *rq = td->rq; // 获取环形队列
string name = td->threadname; // 获取线程名称

while (true) // 持续消费
{
// 1. 消费数据:从环形队列取出任务
Task t; // 创建任务对象
rq->Pop(&t); // 消费任务(阻塞等待)

// 2. 处理数据:执行任务并输出结果
t(); // 执行运算

// 修复打印格式,添加换行符使输出更清晰
cout << name << " 消费任务:" << t.get_task() << " 结果:" << t.get_ret() << endl;
}
return nullptr;
}

int main()
{
cout << "=== 环形队列生产者-消费者模型 ===" << endl;

srand(time(nullptr) ^ getpid()); // 设置随机种子
RingQueue<Task> *rq = new RingQueue<Task>(10); // 创建容量为10的环形队列(小一点便于测试)

pthread_t consumers[3], producers[2]; // 创建多个消费者和生产者

// 创建2个生产者线程
cout << "创建 2 个生产者线程..." << endl;
for (int i = 0; i < 2; i++)
{
ThreadData *td = new ThreadData(); // 创建线程数据
td->rq = rq; // 设置环形队列
td->threadname = "[生产者-" + to_string(i) + "]"; // 设置线程名称

pthread_create(&producers[i], nullptr, Productor, td); // 创建生产者线程
}

// 创建3个消费者线程
cout << "创建 3 个消费者线程..." << endl;
for (int i = 0; i < 3; i++)
{
ThreadData *td = new ThreadData(); // 创建线程数据
td->rq = rq; // 设置环形队列
td->threadname = "[消费者-" + to_string(i) + "]"; // 设置线程名称

pthread_create(&consumers[i], nullptr, Consumer, td); // 创建消费者线程
}

// 等待线程结束(实际程序不会退出)
for (int i = 0; i < 2; i++)
{
pthread_join(producers[i], nullptr); // 等待生产者线程
}
for (int i = 0; i < 3; i++)
{
pthread_join(consumers[i], nullptr); // 等待消费者线程
}

delete rq; // 释放内存
return 0;
}