optimized ring buffer

This commit is contained in:
2025-10-03 17:01:59 -05:00
parent d02ce43fd1
commit c338d35c4f
2 changed files with 73 additions and 70 deletions

View File

@@ -70,7 +70,7 @@ private:
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void reader_thread();
void reader_thread(bool is_forward);
void update_position(std::size_t count, bool is_forward);
@@ -78,7 +78,7 @@ private:
protected:
[[nodiscard]] auto has_reader_thread() const -> bool {
return reader_thread_ != nullptr;
return forward_reader_thread_ != nullptr;
}
[[nodiscard]] auto get_ring_size() const -> std::size_t {

View File

@@ -71,9 +71,9 @@ auto ring_buffer_base::check_start() -> api_error {
event_system::instance().raise<download_begin>(
get_api_path(), get_source_path(), function_name);
forward_reader_thread_ =
std::make_unique<std::thread>([this]() { forward_reader_thread(); });
std::make_unique<std::thread>([this]() { reader_thread(true); });
reverse_reader_thread_ =
std::make_unique<std::thread>([this]() { reverse_reader_thread(); });
std::make_unique<std::thread>([this]() { reader_thread(false); });
return api_error::success;
} catch (const std::exception &ex) {
utils::error::raise_api_path_error(function_name, get_api_path(),
@@ -86,15 +86,25 @@ auto ring_buffer_base::check_start() -> api_error {
auto ring_buffer_base::close() -> bool {
stop_requested_ = true;
std::unique_ptr<std::thread> forward_reader_thread{nullptr};
std::unique_ptr<std::thread> reverse_reader_thread{nullptr};
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
std::swap(forward_reader_thread, forward_reader_thread_);
std::swap(reverse_reader_thread, reverse_reader_thread_);
chunk_lock.unlock();
auto res = open_file_base::close();
if (forward_reader_thread_) {
forward_reader_thread_->join();
forward_reader_thread_.reset();
if (forward_reader_thread) {
forward_reader_thread->join();
forward_reader_thread.reset();
}
if (reverse_reader_thread) {
reverse_reader_thread->join();
reverse_reader_thread.reset();
}
return res;
@@ -184,58 +194,6 @@ void ring_buffer_base::forward(std::size_t count) {
update_position(count, true);
}
void ring_buffer_base::forward_reader_thread() {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto notify_and_unlock = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
auto last_pos = ring_pos_;
auto next_chunk = ring_pos_;
notify_and_unlock();
while (not get_stop_requested()) {
chunk_lock.lock();
if (last_pos == ring_pos_) {
++next_chunk;
} else {
next_chunk = ring_pos_ + 1U;
last_pos = ring_pos_;
}
if (next_chunk > ring_end_) {
next_chunk = ring_begin_;
}
if (read_state_[next_chunk % read_state_.size()]) {
if (get_stop_requested()) {
notify_and_unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
last_pos = ring_pos_;
next_chunk = ring_pos_;
}
notify_and_unlock();
continue;
}
notify_and_unlock();
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped,
function_name);
}
auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return read_state_;
@@ -318,7 +276,7 @@ auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset,
return get_stop_requested() ? api_error::download_stopped : res;
}
void ring_buffer_base::reverse_reader_thread() {
void ring_buffer_base::reader_thread(bool is_forward) {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock chunk_lock(chunk_mtx_);
@@ -327,15 +285,50 @@ void ring_buffer_base::reverse_reader_thread() {
chunk_lock.unlock();
};
auto last_begin = ring_begin_;
const auto has_unread_forward = [this]() -> bool {
auto ring_size = read_state_.size();
if (ring_pos_ >= ring_end_) {
return false;
}
for (auto idx = ring_pos_ + 1U; idx <= ring_end_; ++idx) {
if (not read_state_[idx % ring_size]) {
return true;
}
}
return false;
};
const auto has_unread_reverse = [this]() -> bool {
auto ring_size = read_state_.size();
for (auto idx = ring_pos_; idx-- > ring_begin_;) {
if (not read_state_[idx % ring_size]) {
return true;
}
}
return false;
};
auto last_marker = is_forward ? ring_pos_ : ring_begin_;
auto next_chunk = ring_pos_;
notify_and_unlock();
while (not get_stop_requested()) {
chunk_lock.lock();
if (last_begin != ring_begin_) {
last_begin = ring_begin_;
if (is_forward) {
if (last_marker == ring_pos_) {
++next_chunk;
} else {
next_chunk = ring_pos_ + 1U;
last_marker = ring_pos_;
}
if (next_chunk > ring_end_) {
next_chunk = ring_pos_;
}
} else if (last_marker != ring_begin_) {
last_marker = ring_begin_;
next_chunk = ring_pos_;
}
@@ -346,13 +339,20 @@ void ring_buffer_base::reverse_reader_thread() {
if (read_state_[next_chunk % read_state_.size()]) {
if (get_stop_requested()) {
notify_and_unlock();
return;
break;
}
if (get_read_state().all()) {
auto has_unread =
is_forward ? has_unread_forward() : has_unread_reverse();
if (not has_unread) {
chunk_notify_.wait(chunk_lock);
last_begin = ring_begin_;
next_chunk = ring_pos_;
if (is_forward) {
last_marker = ring_pos_;
next_chunk = ring_pos_;
} else {
last_marker = ring_begin_;
next_chunk = ring_pos_;
}
}
notify_and_unlock();
@@ -363,10 +363,13 @@ void ring_buffer_base::reverse_reader_thread() {
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped,
function_name);
if (is_forward) {
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped,
function_name);
}
}
void ring_buffer_base::reverse(std::size_t count) {
update_position(count, false);
}