mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5
240 字
1 分钟
RingBuffer的分析
2026-02-10
统计加载中...

RingBuffer的分析#

先拿出部分源码

#pragma once
#include <memory>
#include <vector>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <utility> // std::move/std::forward
#include <cassert> // assert
template<typename T>
class RingBuffer {
private:
// ========== 核心修改:存储unique_ptr ==========
std::vector<std::unique_ptr<T>> buffer_;
size_t capacity_;
size_t head_ = 0; // 写位置
size_t tail_ = 0; // 读位置
size_t count_ = 0;
mutable std::mutex mutex_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
std::atomic<bool> stop_ = false; // 停止标志
// ========== 移动push:高效转移所有权 ==========
bool push(T&& item, int timeout_ms = 10) {
return emplace(timeout_ms, std::move(item));
}
// ========== 非阻塞try_push ==========
bool try_push(const T& item) {
return emplace(0, item);
}
// ========== 返回unique_ptr的pop(推荐使用) ==========
std::unique_ptr<T> pop_ptr(int timeout_ms = 10) {
std::unique_lock<std::mutex> lock(mutex_);
// ========== 新增:等待条件(含停止判断) ==========
auto waitCondition = [this]() { return count_ > 0 || needStop(); };
if (count_ == 0) {
if (timeout_ms == 0) return nullptr;
// ========== 新增:支持timeout_ms=-1 无限等待 ==========
if (timeout_ms == -1) {
not_empty_.wait(lock, waitCondition);
} else {
if (!not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms), waitCondition)) {
return nullptr;
}
}
}
// ========== 新增:停止判断 ==========
if (needStop() && count_ == 0) {
return nullptr;
}
// ========== 关键优化:转移指针所有权 ==========
std::unique_ptr<T> result = std::move(buffer_[tail_]);
tail_ = (tail_ + 1) % capacity_;
count_--;
not_full_.notify_one();
return result;
}
// ========== 兼容原有接口的pop ==========
bool pop(T& item, int timeout_ms = 10) {
auto ptr = pop_ptr(timeout_ms);
if (!ptr) return false;
// 移动对象内容到参数
item = std::move(*ptr);
return true;
}
// ========== 查看但不弹出(peek) ==========
std::unique_ptr<T> peek(int timeout_ms = 10) {
std::unique_lock<std::mutex> lock(mutex_);
auto waitCondition = [this]() { return count_ > 0 || needStop(); };
if (count_ == 0) {
if (timeout_ms == 0) return nullptr;
if (timeout_ms == -1) {
not_empty_.wait(lock, waitCondition);
} else {
if (!not_empty_.wait_for(lock, std::chrono::milliseconds(timeout_ms), waitCondition)) {
return nullptr;
}
}
}
if (needStop() && count_ == 0) {
return nullptr;
}
// 创建对象的副本
if (!buffer_[tail_]) return nullptr;
return std::make_unique<T>(*buffer_[tail_]);
}
// ========== 辅助方法 ==========
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return count_;
}
size_t capacity() const {
return capacity_;
}
bool empty() const {
std::lock_guard<std::mutex> lock(mutex_);
return count_ == 0;
}
bool full() const {
std::lock_guard<std::mutex> lock(mutex_);
return count_ == capacity_;
}
void clear() {
std::lock_guard<std::mutex> lock(mutex_);
// 释放所有指针
for (size_t i = 0; i < capacity_; ++i) {
buffer_[i].reset();
}
head_ = tail_ = count_ = 0;
not_full_.notify_all();
}
bool needStop() const {
return stop_.load(std::memory_order_acquire);
}
void stop() {
stop_.store(true, std::memory_order_release);
not_empty_.notify_all();
not_full_.notify_all();
}
void reset() {
std::lock_guard<std::mutex> lock(mutex_);
stop_.store(false, std::memory_order_release);
clear();
}
// ========== 直接访问方法(谨慎使用) ==========
const T* peek_raw() const {
std::lock_guard<std::mutex> lock(mutex_);
if (count_ == 0) return nullptr;
return buffer_[tail_].get();
}
T* peek_raw() {
std::lock_guard<std::mutex> lock(mutex_);
if (count_ == 0) return nullptr;
return buffer_[tail_].get();
}
// ========== 预留额外容量(动态扩容) ==========
bool reserve(size_t new_capacity) {
std::lock_guard<std::mutex> lock(mutex_);
if (new_capacity <= capacity_) {
return false; // 只能扩容,不能缩容
}
// 创建新缓冲区
std::vector<std::unique_ptr<T>> new_buffer(new_capacity);
// 复制现有元素(保持顺序)
for (size_t i = 0; i < count_; ++i) {
size_t src_idx = (tail_ + i) % capacity_;
size_t dst_idx = i; // 在新缓冲区中从0开始排列
if (buffer_[src_idx]) {
new_buffer[dst_idx] = std::move(buffer_[src_idx]);
}
}
// 更新状态
buffer_ = std::move(new_buffer);
capacity_ = new_capacity;
head_ = count_; // 写位置在最后一个元素之后
tail_ = 0; // 读位置在开头
not_full_.notify_all();
return true;
}
};

我们可以看到,这个RingBuffer底层是基于std::vector<T>

它是一个加锁的环形缓冲区

构造函数#

RingBuffer(size_t capacity) : capacity_(capacity) {
// 只创建指针数组,不创建实际对象
buffer_.resize(capacity_);
// 初始化所有指针为nullptr
for (size_t i = 0; i < capacity_; ++i) {
buffer_[i] = nullptr;
}
}

capacity构造底层的std::vector<std::unique_ptr<T>> buffer_,并以nullptr初始化。

emplace()#

// ========== 核心emplace方法 ==========
template<typename... Args>
bool emplace(int timeout_ms = 10, Args&&... args) {
std::unique_lock<std::mutex> lock(mutex_);
// 等待缓冲区未满
if (count_ >= capacity_) {
if (timeout_ms == 0) return false;
if (!not_full_.wait_for(lock, std::chrono::milliseconds(timeout_ms),
[this]() { return count_ < capacity_; })) {
return false;
}
}
// ========== 关键优化:动态创建对象 ==========
// 如果当前位置已有对象,先释放
if (buffer_[head_]) {
buffer_[head_].reset();
}
// 创建新对象(移动构造或拷贝构造)
buffer_[head_] = std::make_unique<T>(std::forward<Args>(args)...);
// 更新指针和计数
head_ = (head_ + 1) % capacity_;
count_++;
not_empty_.notify_one();
return true;
}

在这个部分,采用template<typename... Args>我们为什么要这么做呢?

这是因为外部通常调用push(const T& item, int timeout_ms = 10)。这通常会关联到T的构造函数。

T的构造函数可能会有多参数,于是我们采用参数包template<typename... Args>的方式来传构造参数。 emplace中调用std::make_unique 来进行拷贝构造。

std::forward<Args>(args)... 在拷贝或者是移动构造是怎样的形式呢

buffer_[head_] = std::make_unique<T>(std::forward<const T&>(item));

展开后实则是这样的形式。

timeoutms 策略#

timeoutms = 0;

auto ptr = buffer.pop_ptr(0); // 非阻塞调用
// 如果缓冲区为空:
// 1. 立即返回 nullptr
// 2. 不会等待,直接返回
// 适用场景:
// - 实时系统,不能阻塞
// - 轮询检查
// - 游戏循环等需要保持响应的场景

timeoutms = -1;

auto ptr = buffer.pop_ptr(-1); // 无限等待
// 行为:
// 1. 如果缓冲区有数据,立即返回
// 2. 如果缓冲区为空,一直等待直到:
// a) 有数据被 push
// b) stop() 被调用
// 3. 永远不会因超时返回 nullptr
// 适用场景:
// - 消费者线程需要持续处理数据
// - 主事件循环
// - 服务器监听线程

timeoutms = N > 0

auto ptr = buffer.pop_ptr(100); // 等待100ms
// 行为:
// 1. 如果缓冲区有数据,立即返回
// 2. 如果缓冲区为空,等待最多100ms
// 3. 100ms内如果有数据,立即返回
// 4. 100ms后仍无数据,返回 nullptr
// 适用场景:
// - 需要响应的GUI应用
// - 网络请求超时控制
// - 实时系统中的软实时要求

tail_ = (tail_ + 1) % capacity_;#

这是环形缓冲区的策略。

// 线性缓冲区的缺陷:
buffer: [0][1][2][3][4] // 容量5
head=0, tail=0, count=0
// 插入3个元素:
[ A ][ B ][ C ][ ][ ]
head↑ tail↑
count=3
// 弹出2个元素:
[ ][ ][ C ][ ][ ]
↑head ↑tail?
// 问题:tail应该指向哪?
// 如果我们只是 tail++,会超出数组边界!
// 使用取模运算使索引"循环"
buffer: [0][1][2][3][4] // 容量5
tail_ = (tail_ + 1) % capacity_;
// 当 tail_ = 4 时,下一个 tail_ = (4+1)%5 = 0
// 回到数组开头!

这样就处理好循环的效果了,这是因为:数学上:数据在 [tail_, head_) 的循环区间内

分享

如果这篇文章对你有帮助,欢迎分享给更多人!

RingBuffer的分析
https://rinzemoon.top/posts/rb/ringbuffer的实现解析/
作者
泠时月
发布于
2026-02-10
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时

封面
Sample Song
Sample Artist
封面
Sample Song
Sample Artist
0:00 / 0:00