@SEGA Project Sekai
240 字
1 分钟
RingBuffer的分析
RingBuffer的分析
先拿出部分源码
#pragma once
#include <memory>#include <vector>#include <thread>#include <atomic>#include <condition_variable>#include <utility> // std::move/std::forward#include <cassert> // assert
template<typename T>class RingBuffer {private: // ========== 核心修改:存储unique_ptr ========== std::vector<std::unique_ptr<T>> buffer_; size_t capacity_; size_t head_ = 0; // 写位置 size_t tail_ = 0; // 读位置 size_t count_ = 0; mutable std::mutex mutex_; std::condition_variable not_empty_; std::condition_variable not_full_; std::atomic<bool> stop_ = false; // 停止标志
// ========== 移动push:高效转移所有权 ========== bool push(T&& item, int timeout_ms = 10) { return emplace(timeout_ms, std::move(item)); }
// ========== 非阻塞try_push ========== bool try_push(const T& item) { return emplace(0, item); }
// ========== 返回unique_ptr的pop(推荐使用) ========== std::unique_ptr<T> pop_ptr(int timeout_ms = 10) { std::unique_lock<std::mutex> lock(mutex_);
// ========== 新增:等待条件(含停止判断) ========== auto waitCondition = [this]() { return count_ > 0 || needStop(); };
if (count_ == 0) { if (timeout_ms == 0) return nullptr; // ========== 新增:支持timeout_ms=-1 无限等待 ========== if (timeout_ms == -1) { not_empty_.wait(lock, waitCondition); } else { if (!not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms), waitCondition)) { return nullptr; } } }
// ========== 新增:停止判断 ========== if (needStop() && count_ == 0) { return nullptr; }
// ========== 关键优化:转移指针所有权 ========== std::unique_ptr<T> result = std::move(buffer_[tail_]);
tail_ = (tail_ + 1) % capacity_; count_--;
not_full_.notify_one(); return result; }
// ========== 兼容原有接口的pop ========== bool pop(T& item, int timeout_ms = 10) { auto ptr = pop_ptr(timeout_ms); if (!ptr) return false;
// 移动对象内容到参数 item = std::move(*ptr); return true; }
// ========== 查看但不弹出(peek) ========== std::unique_ptr<T> peek(int timeout_ms = 10) { std::unique_lock<std::mutex> lock(mutex_);
auto waitCondition = [this]() { return count_ > 0 || needStop(); };
if (count_ == 0) { if (timeout_ms == 0) return nullptr; if (timeout_ms == -1) { not_empty_.wait(lock, waitCondition); } else { if (!not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms), waitCondition)) { return nullptr; } } }
if (needStop() && count_ == 0) { return nullptr; }
// 创建对象的副本 if (!buffer_[tail_]) return nullptr; return std::make_unique<T>(*buffer_[tail_]); }
// ========== 辅助方法 ========== size_t size() const { std::lock_guard<std::mutex> lock(mutex_); return count_; }
size_t capacity() const { return capacity_; }
bool empty() const { std::lock_guard<std::mutex> lock(mutex_); return count_ == 0; }
bool full() const { std::lock_guard<std::mutex> lock(mutex_); return count_ == capacity_; }
void clear() { std::lock_guard<std::mutex> lock(mutex_);
// 释放所有指针 for (size_t i = 0; i < capacity_; ++i) { buffer_[i].reset(); }
head_ = tail_ = count_ = 0; not_full_.notify_all(); }
bool needStop() const { return stop_.load(std::memory_order_acquire); }
void stop() { stop_.store(true, std::memory_order_release); not_empty_.notify_all(); not_full_.notify_all(); }
void reset() { std::lock_guard<std::mutex> lock(mutex_); stop_.store(false, std::memory_order_release); clear(); }
// ========== 直接访问方法(谨慎使用) ========== const T* peek_raw() const { std::lock_guard<std::mutex> lock(mutex_); if (count_ == 0) return nullptr; return buffer_[tail_].get(); }
T* peek_raw() { std::lock_guard<std::mutex> lock(mutex_); if (count_ == 0) return nullptr; return buffer_[tail_].get(); }
// ========== 预留额外容量(动态扩容) ========== bool reserve(size_t new_capacity) { std::lock_guard<std::mutex> lock(mutex_);
if (new_capacity <= capacity_) { return false; // 只能扩容,不能缩容 }
// 创建新缓冲区 std::vector<std::unique_ptr<T>> new_buffer(new_capacity);
// 复制现有元素(保持顺序) for (size_t i = 0; i < count_; ++i) { size_t src_idx = (tail_ + i) % capacity_; size_t dst_idx = i; // 在新缓冲区中从0开始排列
if (buffer_[src_idx]) { new_buffer[dst_idx] = std::move(buffer_[src_idx]); } }
// 更新状态 buffer_ = std::move(new_buffer); capacity_ = new_capacity; head_ = count_; // 写位置在最后一个元素之后 tail_ = 0; // 读位置在开头
not_full_.notify_all(); return true; }
};我们可以看到,这个RingBuffer底层是基于
std::vector<T>的
它是一个加锁的环形缓冲区
构造函数
RingBuffer(size_t capacity) : capacity_(capacity) { // 只创建指针数组,不创建实际对象 buffer_.resize(capacity_);
// 初始化所有指针为nullptr for (size_t i = 0; i < capacity_; ++i) { buffer_[i] = nullptr; } }传
capacity构造底层的std::vector<std::unique_ptr<T>> buffer_,并以nullptr初始化。
emplace()
// ========== 核心emplace方法 ========== template<typename... Args> bool emplace(int timeout_ms = 10, Args&&... args) { std::unique_lock<std::mutex> lock(mutex_);
// 等待缓冲区未满 if (count_ >= capacity_) { if (timeout_ms == 0) return false; if (!not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms), [this]() { return count_ < capacity_; })) { return false; } }
// ========== 关键优化:动态创建对象 ========== // 如果当前位置已有对象,先释放 if (buffer_[head_]) { buffer_[head_].reset(); }
// 创建新对象(移动构造或拷贝构造) buffer_[head_] = std::make_unique<T>(std::forward<Args>(args)...);
// 更新指针和计数 head_ = (head_ + 1) % capacity_; count_++;
not_empty_.notify_one(); return true; }在这个部分,采用
template<typename... Args>我们为什么要这么做呢?
这是因为外部通常调用push(const T& item, int timeout_ms = 10)。这通常会关联到T的构造函数。
T的构造函数可能会有多参数,于是我们采用参数包template<typename... Args>的方式来传构造参数。
emplace中调用std::make_unique
std::forward<Args>(args)... 在拷贝或者是移动构造是怎样的形式呢
buffer_[head_] = std::make_unique<T>(std::forward<const T&>(item));展开后实则是这样的形式。
timeoutms 策略
timeoutms = 0;
auto ptr = buffer.pop_ptr(0); // 非阻塞调用
// 如果缓冲区为空:// 1. 立即返回 nullptr// 2. 不会等待,直接返回
// 适用场景:// - 实时系统,不能阻塞// - 轮询检查// - 游戏循环等需要保持响应的场景timeoutms = -1;
auto ptr = buffer.pop_ptr(-1); // 无限等待
// 行为:// 1. 如果缓冲区有数据,立即返回// 2. 如果缓冲区为空,一直等待直到:// a) 有数据被 push// b) stop() 被调用// 3. 永远不会因超时返回 nullptr
// 适用场景:// - 消费者线程需要持续处理数据// - 主事件循环// - 服务器监听线程timeoutms = N > 0
auto ptr = buffer.pop_ptr(100); // 等待100ms
// 行为:// 1. 如果缓冲区有数据,立即返回// 2. 如果缓冲区为空,等待最多100ms// 3. 100ms内如果有数据,立即返回// 4. 100ms后仍无数据,返回 nullptr
// 适用场景:// - 需要响应的GUI应用// - 网络请求超时控制// - 实时系统中的软实时要求tail_ = (tail_ + 1) % capacity_;
这是环形缓冲区的策略。
// 线性缓冲区的缺陷:buffer: [0][1][2][3][4] // 容量5head=0, tail=0, count=0
// 插入3个元素:[ A ][ B ][ C ][ ][ ]head↑ tail↑ count=3
// 弹出2个元素:[ ][ ][ C ][ ][ ] ↑head ↑tail?// 问题:tail应该指向哪?// 如果我们只是 tail++,会超出数组边界!
// 使用取模运算使索引"循环"buffer: [0][1][2][3][4] // 容量5
tail_ = (tail_ + 1) % capacity_;// 当 tail_ = 4 时,下一个 tail_ = (4+1)%5 = 0// 回到数组开头!这样就处理好循环的效果了,这是因为:数学上:数据在 [tail_, head_) 的循环区间内
RingBuffer的分析
https://rinzemoon.top/posts/rb/ringbuffer的实现解析/ 部分信息可能已经过时






