broken build-adding forward/reverse reader threads

This commit is contained in:
2025-10-03 12:44:34 -05:00
parent cb092d124e
commit d02ce43fd1
2 changed files with 69 additions and 17 deletions

View File

@@ -57,8 +57,9 @@ private:
private: private:
std::condition_variable chunk_notify_; std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_; mutable std::mutex chunk_mtx_;
std::unique_ptr<std::thread> forward_reader_thread_;
std::mutex read_mtx_; std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_; std::unique_ptr<std::thread> reverse_reader_thread_;
std::size_t ring_begin_{}; std::size_t ring_begin_{};
std::size_t ring_end_{}; std::size_t ring_end_{};
std::size_t ring_pos_{}; std::size_t ring_pos_{};

View File

@@ -70,8 +70,10 @@ auto ring_buffer_base::check_start() -> api_error {
event_system::instance().raise<download_begin>( event_system::instance().raise<download_begin>(
get_api_path(), get_source_path(), function_name); get_api_path(), get_source_path(), function_name);
reader_thread_ = forward_reader_thread_ =
std::make_unique<std::thread>([this]() { reader_thread(); }); std::make_unique<std::thread>([this]() { forward_reader_thread(); });
reverse_reader_thread_ =
std::make_unique<std::thread>([this]() { reverse_reader_thread(); });
return api_error::success; return api_error::success;
} catch (const std::exception &ex) { } catch (const std::exception &ex) {
utils::error::raise_api_path_error(function_name, get_api_path(), utils::error::raise_api_path_error(function_name, get_api_path(),
@@ -90,9 +92,9 @@ auto ring_buffer_base::close() -> bool {
auto res = open_file_base::close(); auto res = open_file_base::close();
if (reader_thread_) { if (forward_reader_thread_) {
reader_thread_->join(); forward_reader_thread_->join();
reader_thread_.reset(); forward_reader_thread_.reset();
} }
return res; return res;
@@ -182,6 +184,58 @@ void ring_buffer_base::forward(std::size_t count) {
update_position(count, true); update_position(count, true);
} }
void ring_buffer_base::forward_reader_thread() {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto notify_and_unlock = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
auto last_pos = ring_pos_;
auto next_chunk = ring_pos_;
notify_and_unlock();
while (not get_stop_requested()) {
chunk_lock.lock();
if (last_pos == ring_pos_) {
++next_chunk;
} else {
next_chunk = ring_pos_ + 1U;
last_pos = ring_pos_;
}
if (next_chunk > ring_end_) {
next_chunk = ring_begin_;
}
if (read_state_[next_chunk % read_state_.size()]) {
if (get_stop_requested()) {
notify_and_unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
last_pos = ring_pos_;
next_chunk = ring_pos_;
}
notify_and_unlock();
continue;
}
notify_and_unlock();
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped,
function_name);
}
auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> { auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex()); recur_mutex_lock file_lock(get_mutex());
return read_state_; return read_state_;
@@ -264,7 +318,7 @@ auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset,
return get_stop_requested() ? api_error::download_stopped : res; return get_stop_requested() ? api_error::download_stopped : res;
} }
void ring_buffer_base::reader_thread() { void ring_buffer_base::reverse_reader_thread() {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock chunk_lock(chunk_mtx_); unique_mutex_lock chunk_lock(chunk_mtx_);
@@ -273,22 +327,20 @@ void ring_buffer_base::reader_thread() {
chunk_lock.unlock(); chunk_lock.unlock();
}; };
auto last_pos = ring_pos_; auto last_begin = ring_begin_;
auto next_chunk = ring_pos_; auto next_chunk = ring_pos_;
notify_and_unlock(); notify_and_unlock();
while (not get_stop_requested()) { while (not get_stop_requested()) {
chunk_lock.lock(); chunk_lock.lock();
if (last_pos == ring_pos_) { if (last_begin != ring_begin_) {
++next_chunk; last_begin = ring_begin_;
} else { next_chunk = ring_pos_;
next_chunk = ring_pos_ + 1U;
last_pos = ring_pos_;
} }
if (next_chunk > ring_end_) { if (next_chunk > ring_begin_) {
next_chunk = ring_begin_; --next_chunk;
} }
if (read_state_[next_chunk % read_state_.size()]) { if (read_state_[next_chunk % read_state_.size()]) {
@@ -299,7 +351,7 @@ void ring_buffer_base::reader_thread() {
if (get_read_state().all()) { if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock); chunk_notify_.wait(chunk_lock);
last_pos = ring_pos_; last_begin = ring_begin_;
next_chunk = ring_pos_; next_chunk = ring_pos_;
} }
@@ -315,7 +367,6 @@ void ring_buffer_base::reader_thread() {
get_api_path(), get_source_path(), api_error::download_stopped, get_api_path(), get_source_path(), api_error::download_stopped,
function_name); function_name);
} }
void ring_buffer_base::reverse(std::size_t count) { void ring_buffer_base::reverse(std::size_t count) {
update_position(count, false); update_position(count, false);
} }