From e4a80e22f3b5b47dfc6095744f5487a095572b18 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sun, 22 Dec 2024 15:57:40 -0600 Subject: [PATCH] try to fix ring buffer --- .../file_manager/ring_buffer_open_file.hpp | 18 +-- .../file_manager/ring_buffer_open_file.cpp | 140 +++++++++--------- .../src/ring_buffer_open_file_test.cpp | 4 +- 3 files changed, 79 insertions(+), 83 deletions(-) diff --git a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp index 8fb6d2a3..0aa80c79 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp @@ -32,10 +32,6 @@ class i_upload_manager; class ring_buffer_open_file final : public open_file_base { public: - ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, - std::uint8_t chunk_timeout, filesystem_item fsi, - i_provider &provider); - ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider, std::size_t ring_size); @@ -60,9 +56,9 @@ private: std::unique_ptr chunk_reverse_thread_; std::condition_variable chunk_notify_; mutable std::mutex chunk_mtx_; - std::size_t current_chunk_{}; - std::size_t first_chunk_{}; - std::size_t last_chunk_; + std::size_t ring_begin_{}; + std::size_t ring_end_{}; + std::size_t ring_pos_{}; stop_type stop_requested_{false}; private: @@ -87,16 +83,14 @@ public: void forward(std::size_t count); [[nodiscard]] auto get_current_chunk() const -> std::size_t { - return current_chunk_; + return ring_pos_; } [[nodiscard]] auto get_first_chunk() const -> std::size_t { - return first_chunk_; + return ring_begin_; } - [[nodiscard]] auto get_last_chunk() const -> std::size_t { - return last_chunk_; - } + [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index 032cf245..ca709fb2 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -34,15 +34,6 @@ #include "utils/utils.hpp" namespace repertory { -ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, - std::uint64_t chunk_size, - std::uint8_t chunk_timeout, - filesystem_item fsi, - i_provider &provider) - : ring_buffer_open_file(std::move(buffer_directory), chunk_size, - chunk_timeout, std::move(fsi), provider, - (1024ULL * 1024ULL * 1024ULL) / chunk_size) {} - ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, std::uint8_t chunk_timeout, @@ -65,7 +56,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, throw std::runtime_error("file size is less than ring buffer size"); } - last_chunk_ = ring_state_.size() - 1U; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); ring_state_.set(0U, ring_state_.size(), true); buffer_directory = utils::path::absolute(buffer_directory); @@ -131,8 +123,8 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error { chunk_notify_.notify_all(); chunk_lock.unlock(); - data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_ - : chunk_size_); + data_buffer buffer(chunk == (total_chunks_ - 1U) ? last_chunk_size_ + : chunk_size_); auto res = provider_.read_file_bytes(fsi_.api_path, buffer.size(), @@ -165,28 +157,28 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error { void ring_buffer_open_file::forward(std::size_t count) { mutex_lock chunk_lock(chunk_mtx_); - if ((current_chunk_ + count) > (total_chunks_ - 1U)) { - count = (total_chunks_ - 1U) - current_chunk_; + if ((ring_pos_ + count) > (total_chunks_ - 1U)) { + count = (total_chunks_ - 1U) - ring_pos_; } - if ((current_chunk_ + count) <= last_chunk_) { - current_chunk_ += count; + if ((ring_pos_ + count) <= ring_end_) { + ring_pos_ += count; } else { - const auto added = count - (last_chunk_ - current_chunk_); + auto added = count - (ring_end_ - ring_pos_); if (added >= ring_state_.size()) { ring_state_.set(0U, ring_state_.size(), true); - current_chunk_ += count; - first_chunk_ += added; - last_chunk_ = - std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); + ring_pos_ += count; + ring_begin_ += added; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); } else { for (std::size_t idx = 0U; idx < added; ++idx) { - ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true; + ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true; } - first_chunk_ += added; - current_chunk_ += count; - last_chunk_ = - std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); + ring_begin_ += added; + ring_pos_ += count; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); } } @@ -211,28 +203,28 @@ auto ring_buffer_open_file::native_operation( void ring_buffer_open_file::reverse(std::size_t count) { mutex_lock chunk_lock(chunk_mtx_); - if (current_chunk_ < count) { - count = current_chunk_; + if (ring_pos_ < count) { + count = ring_pos_; } - if ((current_chunk_ - count) >= first_chunk_) { - current_chunk_ -= count; + if ((ring_pos_ - count) >= ring_begin_) { + ring_pos_ -= count; } else { - const auto removed = count - (current_chunk_ - first_chunk_); + auto removed = count - (ring_pos_ - ring_begin_); if (removed >= ring_state_.size()) { ring_state_.set(0U, ring_state_.size(), true); - current_chunk_ -= count; - first_chunk_ = current_chunk_; - last_chunk_ = - std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); + ring_pos_ -= count; + ring_begin_ = ring_pos_; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); } else { for (std::size_t idx = 0U; idx < removed; ++idx) { - ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true; + ring_state_[(ring_end_ - idx) % ring_state_.size()] = true; } - first_chunk_ -= removed; - current_chunk_ -= count; - last_chunk_ = - std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); + ring_begin_ -= removed; + ring_pos_ -= count; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); } } @@ -246,6 +238,7 @@ auto ring_buffer_open_file::read(std::size_t read_size, return api_error::invalid_operation; } + data.clear(); reset_timeout(); read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); @@ -253,46 +246,54 @@ auto ring_buffer_open_file::read(std::size_t read_size, return api_error::success; } - const auto start_chunk_index = - static_cast(read_offset / chunk_size_); - read_offset = read_offset - (start_chunk_index * chunk_size_); - data_buffer buffer(chunk_size_); + auto begin_chunk = static_cast(read_offset / chunk_size_); + read_offset = read_offset - (begin_chunk * chunk_size_); auto res = api_error::success; - for (std::size_t chunk = start_chunk_index; + for (std::size_t chunk = begin_chunk; (res == api_error::success) && (read_size > 0U); ++chunk) { - if (chunk > current_chunk_) { - forward(chunk - current_chunk_); - } else if (chunk < current_chunk_) { - reverse(current_chunk_ - chunk); + if (chunk > ring_pos_) { + forward(chunk - ring_pos_); + } else if (chunk < ring_pos_) { + reverse(ring_pos_ - chunk); } reset_timeout(); + res = download_chunk(chunk); if (res != api_error::success) { - continue; + return res; } - const auto to_read = std::min( - static_cast(chunk_size_ - read_offset), read_size); - res = do_io([this, &buffer, &chunk, &data, read_offset, - &to_read]() -> api_error { + reset_timeout(); + + auto to_read = std::min(static_cast(chunk_size_ - read_offset), + read_size); + res = do_io([&]() -> api_error { + data_buffer buffer(to_read); + std::size_t bytes_read{}; - auto ret = nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_), - &bytes_read) - ? api_error::success - : api_error::os_error; - if (ret == api_error::success) { - data.insert( - data.end(), buffer.begin() + static_cast(read_offset), - buffer.begin() + static_cast(read_offset + to_read)); - reset_timeout(); + auto result = nf_->read(buffer, + (((chunk % ring_state_.size()) * chunk_size_) + + read_offset), + &bytes_read) + ? api_error::success + : api_error::os_error; + buffer.resize(bytes_read); + + if (result != api_error::success) { + return result; } - return ret; + reset_timeout(); + + data.insert(data.end(), buffer.begin(), buffer.end()); + read_size -= buffer.size(); + + return result; }); + read_offset = 0U; - read_size -= to_read; } return res; @@ -306,16 +307,17 @@ void ring_buffer_open_file::set(std::size_t first_chunk, throw std::runtime_error("first chunk must be less than total chunks"); } - first_chunk_ = first_chunk; - last_chunk_ = first_chunk_ + ring_state_.size() - 1U; + ring_begin_ = first_chunk; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - if (current_chunk > last_chunk_) { + if (current_chunk > ring_end_) { chunk_notify_.notify_all(); throw std::runtime_error( "current chunk must be less than or equal to last chunk"); } - current_chunk_ = current_chunk; + ring_pos_ = current_chunk; ring_state_.set(0U, ring_state_.size(), false); chunk_notify_.notify_all(); diff --git a/repertory/repertory_test/src/ring_buffer_open_file_test.cpp b/repertory/repertory_test/src/ring_buffer_open_file_test.cpp index 2dae1a0c..9f50b2d5 100644 --- a/repertory/repertory_test/src/ring_buffer_open_file_test.cpp +++ b/repertory/repertory_test/src/ring_buffer_open_file_test.cpp @@ -315,7 +315,7 @@ TEST(ring_buffer_open_file, can_reverse_full_ring) { } TEST(ring_buffer_open_file, read_full_file) { - auto &nf = test::create_random_file(test_chunk_size * 32u); + auto &nf = test::create_random_file(test_chunk_size * 33u + 11u); auto download_source_path = nf.get_path(); auto dest_path = test::generate_test_file_name("ring_buffer_open_file"); @@ -327,7 +327,7 @@ TEST(ring_buffer_open_file, read_full_file) { filesystem_item fsi; fsi.directory = false; fsi.api_path = "/test.txt"; - fsi.size = test_chunk_size * 32u; + fsi.size = test_chunk_size * 33u + 11u; fsi.source_path = test::generate_test_file_name("ring_buffer_open_file"); EXPECT_CALL(mp, read_file_bytes)