@SEGA Project Sekai
507 字
1 分钟
LeetCode 1188
LeetCode 1188 的无条件变量解法
首先我们看题目要求
/** * LeetCode 1188. 设计有限阻塞队列 (Design Bounded Blocking Queue) * 题目要求: * 1. 实现一个线程安全的有限阻塞队列,包含以下方法: * - BoundedBlockingQueue(int capacity):构造函数,初始化队列最大容量 * - void enqueue(int element):入队操作,队列满时阻塞线程,直到队列有空闲空间 * - int dequeue():出队操作,返回并移除队尾元素;队列空时阻塞线程,直到队列有元素 * - int size():返回当前队列元素个数(线程安全) * 2. 测试场景:多线程并发访问,线程分为仅调用enqueue的生产者、仅调用dequeue的消费者 * 3. 约束: * - 1 <= capacity <= 30 * - 1 <= 生产者线程数 <= 8 * - 1 <= 消费者线程数 <= 8 */由于LeetCode此题需要会员,我让ai给我扒了一下题目
class BoundedBlockingQueue {private: int _capacity; // 队列最大容量 vector<int> _q; // 底层存储容器 mutex mtx; // 保护临界区的互斥锁 mutex log_mtx; // 日志打印锁,避免输出错乱
public: // 构造函数:初始化队列容量 BoundedBlockingQueue(int capacity) : _capacity(capacity) { _q.reserve(capacity); // 预留空间,减少vector扩容开销 }
// 入队:队列满时阻塞(无条件变量,用优化版忙等) void enqueue(int element) { while (true) { // 第一步:加锁检查队列是否有空间 { lock_guard<mutex> lock(mtx); if (_q.size() < _capacity) { // 有空间,执行入队 _q.push_back(element); return; // 入队成功,退出 } } // 锁自动释放,避免阻塞其他线程
// 第二步:队列满,短暂休眠后重试(优化忙等,减少CPU占用) this_thread::sleep_for(chrono::milliseconds(1)); // 1ms休眠,替代yield } }
// 出队:队列空时阻塞(无条件变量,用优化版忙等) int dequeue() { while (true) { // 第一步:加锁检查队列是否有元素 { lock_guard<mutex> lock(mtx); if (!_q.empty()) { // 有元素,执行出队(取队尾+移除队尾,符合题目要求) int val = _q.back(); _q.pop_back(); // 真正移除元素,修复你之前的核心错误 return val; // 出队成功,退出 } } // 锁自动释放
// 第二步:队列空,短暂休眠后重试 this_thread::sleep_for(chrono::milliseconds(1)); } }
// 返回队列大小(线程安全) int size() { lock_guard<mutex> lock(mtx); // RAII加锁,自动释放 return _q.size(); // 直接返回真实大小,无需自定义计数 }};
// ===================== 测试代码(无修改) =====================atomic<int> produced_count(0);atomic<int> consumed_count(0);
const int QUEUE_CAPACITY = 5; // 队列容量const int PRODUCER_NUM = 3; // 生产者线程数const int CONSUMER_NUM = 2; // 消费者线程数const int TOTAL_ELEMENTS = 20; // 总生产元素数
// 生产者函数void producer(BoundedBlockingQueue& queue) { while (produced_count < TOTAL_ELEMENTS) { int val = produced_count++; queue.enqueue(val); this_thread::sleep_for(chrono::milliseconds(50)); // 模拟生产耗时 }}
// 消费者函数void consumer(BoundedBlockingQueue& queue) { while (consumed_count < TOTAL_ELEMENTS) { int val = queue.dequeue(); consumed_count++; this_thread::sleep_for(chrono::milliseconds(100)); // 模拟消费耗时 }}
int main() { BoundedBlockingQueue queue(QUEUE_CAPACITY);
// 创建生产者线程 vector<thread> producers; for (int i = 0; i < PRODUCER_NUM; ++i) { producers.emplace_back(producer, ref(queue)); }
// 创建消费者线程 vector<thread> consumers; for (int i = 0; i < CONSUMER_NUM; ++i) { consumers.emplace_back(consumer, ref(queue)); }
// 等待所有线程完成 for (auto& t : producers) t.join(); for (auto& t : consumers) t.join();
// 输出最终测试结果 cout << "===== 最终测试结果 =====" << endl; bool is_success = (produced_count == TOTAL_ELEMENTS) && (consumed_count == TOTAL_ELEMENTS) && (queue.size() == 0);
if (is_success) { cout << "✅ 测试通过:所有元素正常生产并消费,队列最终为空" << endl; } else { cout << "❌ 测试失败:" << endl; cout << " - 计划生产/消费数:" << TOTAL_ELEMENTS << endl; cout << " - 实际生产数:" << produced_count << endl; cout << " - 实际消费数:" << consumed_count << endl; cout << " - 队列最终大小:" << queue.size() << endl; }
return 0;}解析
正常的解法需要我们用条件变量去维护,不用std::this_thread::yield()去阻塞线程。
但是我在思考一种不用条件变量的原始一些的办法,原方法采用yield去阻塞线程。但是这样会耗费CPU,并不是标准的解法。
void enqueue(int element) { while (true) { // 第一步:加锁检查队列是否有空间 { lock_guard<mutex> lock(mtx); if (_q.size() < _capacity) { // 有空间,执行入队 _q.push_back(element); return; // 入队成功,退出 } } // 锁自动释放,避免阻塞其他线程
// 第二步:队列满,短暂休眠后重试(优化忙等,减少CPU占用) this_thread::sleep_for(chrono::milliseconds(1)); // 1ms休眠,替代yield } }
// 出队:队列空时阻塞(无条件变量,用优化版忙等) int dequeue() { while (true) { // 第一步:加锁检查队列是否有元素 { lock_guard<mutex> lock(mtx); if (!_q.empty()) { // 有元素,执行出队(取队尾+移除队尾,符合题目要求) int val = _q.back(); _q.pop_back(); // 移除元素 return val; // 出队成功,退出 } } // 锁自动释放
// 第二步:队列空,短暂休眠后重试 this_thread::sleep_for(chrono::milliseconds(1)); } }
// 返回队列大小(线程安全) int size() { lock_guard<mutex> lock(mtx); // RAII加锁,自动释放 return _q.size(); // 直接返回真实大小,无需自定义计数 }我们看到,BoundedBlockingQueue是基于vector<int>的阻塞队列。采用std::mutex的锁作为线程安全的基础。
我们看到,enque和deque都采用while循环进行加锁操作,退出作用区域的时候RAII机制会自动释放锁。
根据底层vector<int> _q的条件判断就不用细说了,采用while的原因是这样的
问题根源:虚假唤醒(Spurious Wakeup)
操作系统的条件变量存在「虚假唤醒」—— 线程可能被无缘无故唤醒(比如系统调度、信号干扰),或多个线程被同时唤醒:
- 比如队列容量 = 5,已装满,2 个生产者线程因 “队列满” 等待;
- 消费者消费 1 个元素,调用
notify_one(),但操作系统可能同时唤醒这 2 个生产者线程(虚假唤醒); - 第一个生产者入队后,队列又满(5 个);
- 第二个生产者如果是
if,不会重新检查,直接入队→队列大小 = 6,违反 “有限容量” 要求; - 如果是
while,第二个生产者唤醒后会重新检查条件(_q.size() >= _capacity),发现仍满,继续等待,避免错误。
void enqueue(int element) { unique_lock<mutex> lock(mtx); // while:唤醒后重新检查,条件不满足就继续等 while (_q.size() >= _capacity) { not_full.wait(lock); } _q.push_back(element); // 此时条件一定满足! not_empty.notify_one();}根据条件判断去进行唤醒操作,更准确,而且不会出现其他状况外的事情。
当然我们现在使用的是无条件变量的方式,这样的话
// 正确:while循环,直到条件满足void enqueue(int element) { while (true) { // 循环重试 lock_guard<mutex> lock(mtx); if (_q.size() < _capacity) { // 此处if可省略,直接while判断 _q.push_back(element); return; } } // 解锁后休眠,再循环→直到有空间 cout << "Waiting to enqueue..." << endl; this_thread::sleep_for(1ms);}其实也是同样的道理
LeetCode 1188
https://rinzemoon.top/posts/l1188/leet1188/ 部分信息可能已经过时






