From d02ce43fd18b29f15aa98e165b6e872f207d92fd Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Fri, 3 Oct 2025 12:44:34 -0500 Subject: [PATCH] broken build-adding forward/reverse reader threads --- .../include/file_manager/ring_buffer_base.hpp | 3 +- .../src/file_manager/ring_buffer_base.cpp | 83 +++++++++++++++---- 2 files changed, 69 insertions(+), 17 deletions(-) diff --git a/repertory/librepertory/include/file_manager/ring_buffer_base.hpp b/repertory/librepertory/include/file_manager/ring_buffer_base.hpp index 1026ee71..9040c34f 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_base.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_base.hpp @@ -57,8 +57,9 @@ private: private: std::condition_variable chunk_notify_; mutable std::mutex chunk_mtx_; + std::unique_ptr forward_reader_thread_; std::mutex read_mtx_; - std::unique_ptr reader_thread_; + std::unique_ptr reverse_reader_thread_; std::size_t ring_begin_{}; std::size_t ring_end_{}; std::size_t ring_pos_{}; diff --git a/repertory/librepertory/src/file_manager/ring_buffer_base.cpp b/repertory/librepertory/src/file_manager/ring_buffer_base.cpp index fe7b4d1c..04f4d1f4 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_base.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_base.cpp @@ -70,8 +70,10 @@ auto ring_buffer_base::check_start() -> api_error { event_system::instance().raise( get_api_path(), get_source_path(), function_name); - reader_thread_ = - std::make_unique([this]() { reader_thread(); }); + forward_reader_thread_ = + std::make_unique([this]() { forward_reader_thread(); }); + reverse_reader_thread_ = + std::make_unique([this]() { reverse_reader_thread(); }); return api_error::success; } catch (const std::exception &ex) { utils::error::raise_api_path_error(function_name, get_api_path(), @@ -90,9 +92,9 @@ auto ring_buffer_base::close() -> bool { auto res = open_file_base::close(); - if (reader_thread_) { - reader_thread_->join(); - reader_thread_.reset(); + if (forward_reader_thread_) { + forward_reader_thread_->join(); + forward_reader_thread_.reset(); } return res; @@ -182,6 +184,58 @@ void ring_buffer_base::forward(std::size_t count) { update_position(count, true); } +void ring_buffer_base::forward_reader_thread() { + REPERTORY_USES_FUNCTION_NAME(); + + unique_mutex_lock chunk_lock(chunk_mtx_); + const auto notify_and_unlock = [this, &chunk_lock]() { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + auto last_pos = ring_pos_; + auto next_chunk = ring_pos_; + notify_and_unlock(); + + while (not get_stop_requested()) { + chunk_lock.lock(); + + if (last_pos == ring_pos_) { + ++next_chunk; + } else { + next_chunk = ring_pos_ + 1U; + last_pos = ring_pos_; + } + + if (next_chunk > ring_end_) { + next_chunk = ring_begin_; + } + + if (read_state_[next_chunk % read_state_.size()]) { + if (get_stop_requested()) { + notify_and_unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + last_pos = ring_pos_; + next_chunk = ring_pos_; + } + + notify_and_unlock(); + continue; + } + + notify_and_unlock(); + download_chunk(next_chunk, true); + } + + event_system::instance().raise( + get_api_path(), get_source_path(), api_error::download_stopped, + function_name); +} + auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> { recur_mutex_lock file_lock(get_mutex()); return read_state_; @@ -264,7 +318,7 @@ auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset, return get_stop_requested() ? api_error::download_stopped : res; } -void ring_buffer_base::reader_thread() { +void ring_buffer_base::reverse_reader_thread() { REPERTORY_USES_FUNCTION_NAME(); unique_mutex_lock chunk_lock(chunk_mtx_); @@ -273,22 +327,20 @@ void ring_buffer_base::reader_thread() { chunk_lock.unlock(); }; - auto last_pos = ring_pos_; + auto last_begin = ring_begin_; auto next_chunk = ring_pos_; notify_and_unlock(); while (not get_stop_requested()) { chunk_lock.lock(); - if (last_pos == ring_pos_) { - ++next_chunk; - } else { - next_chunk = ring_pos_ + 1U; - last_pos = ring_pos_; + if (last_begin != ring_begin_) { + last_begin = ring_begin_; + next_chunk = ring_pos_; } - if (next_chunk > ring_end_) { - next_chunk = ring_begin_; + if (next_chunk > ring_begin_) { + --next_chunk; } if (read_state_[next_chunk % read_state_.size()]) { @@ -299,7 +351,7 @@ void ring_buffer_base::reader_thread() { if (get_read_state().all()) { chunk_notify_.wait(chunk_lock); - last_pos = ring_pos_; + last_begin = ring_begin_; next_chunk = ring_pos_; } @@ -315,7 +367,6 @@ void ring_buffer_base::reader_thread() { get_api_path(), get_source_path(), api_error::download_stopped, function_name); } - void ring_buffer_base::reverse(std::size_t count) { update_position(count, false); }