From 75a4676eac0e06695e3633a2cb153f1395a50403 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Mon, 23 Dec 2024 07:46:25 -0600 Subject: [PATCH] ring buffer background reader --- .../file_manager/ring_buffer_open_file.hpp | 18 +- .../librepertory/include/types/repertory.hpp | 27 +-- .../file_manager/ring_buffer_open_file.cpp | 155 ++++++++++++------ .../librepertory/src/types/repertory.cpp | 9 +- 4 files changed, 131 insertions(+), 78 deletions(-) diff --git a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp index 6d9eb91c..9e158b9f 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp @@ -42,8 +42,8 @@ public: ring_buffer_open_file() = delete; ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete; ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete; - auto operator=(ring_buffer_open_file &&) noexcept -> ring_buffer_open_file & = - delete; + auto operator=(ring_buffer_open_file &&) noexcept + -> ring_buffer_open_file & = delete; auto operator=(const ring_buffer_open_file &) noexcept -> ring_buffer_open_file & = delete; @@ -64,11 +64,9 @@ private: stop_type stop_requested_{false}; private: - auto download_chunk(std::size_t chunk) -> api_error; + auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; - void forward_reader_thread(std::size_t count); - - void reverse_reader_thread(std::size_t count); + void background_reader_thread(); protected: [[nodiscard]] auto is_download_complete() const -> bool override { @@ -108,8 +106,8 @@ public: return false; } - [[nodiscard]] auto - native_operation(native_operation_callback callback) -> api_error override; + [[nodiscard]] auto native_operation(native_operation_callback callback) + -> api_error override; [[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback) -> api_error override { @@ -129,8 +127,8 @@ public: void set_api_path(const std::string &api_path) override; - [[nodiscard]] auto write(std::uint64_t, const data_buffer &, - std::size_t &) -> api_error override { + [[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &) + -> api_error override { return api_error::not_supported; } }; diff --git a/repertory/librepertory/include/types/repertory.hpp b/repertory/librepertory/include/types/repertory.hpp index 36f40576..1230b161 100644 --- a/repertory/librepertory/include/types/repertory.hpp +++ b/repertory/librepertory/include/types/repertory.hpp @@ -194,6 +194,7 @@ enum class api_error { invalid_handle, invalid_operation, invalid_ring_buffer_multiple, + invalid_ring_buffer_position, invalid_ring_buffer_size, invalid_version, item_exists, @@ -215,31 +216,33 @@ enum class api_error { [[nodiscard]] auto api_error_from_string(std::string_view str) -> api_error; -[[nodiscard]] auto -api_error_to_string(const api_error &error) -> const std::string &; +[[nodiscard]] auto api_error_to_string(const api_error &error) + -> const std::string &; enum class database_type { rocksdb, sqlite, }; -[[nodiscard]] auto database_type_from_string( - std::string type, - database_type default_type = database_type::rocksdb) -> database_type; - [[nodiscard]] auto -database_type_to_string(const database_type &type) -> std::string; +database_type_from_string(std::string type, + database_type default_type = database_type::rocksdb) + -> database_type; + +[[nodiscard]] auto database_type_to_string(const database_type &type) + -> std::string; enum class download_type { direct, fallback, ring_buffer, }; -[[nodiscard]] auto download_type_from_string( - std::string type, - download_type default_type = download_type::fallback) -> download_type; - [[nodiscard]] auto -download_type_to_string(const download_type &type) -> std::string; +download_type_from_string(std::string type, + download_type default_type = download_type::fallback) + -> download_type; + +[[nodiscard]] auto download_type_to_string(const download_type &type) + -> std::string; enum class exit_code : std::int32_t { success = 0, diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index 0fcf512b..25b3a552 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -19,6 +19,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +#include + #include "file_manager/ring_buffer_open_file.hpp" #include "app_config.hpp" @@ -69,7 +71,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, source_path_ = utils::path::combine(buffer_directory, {utils::create_uuid_string()}); nf_ = utils::file::file::open_or_create_file(source_path_); - if (not *nf_) { + if (not*nf_) { throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", utils::get_last_error_code())); } @@ -105,57 +107,76 @@ auto ring_buffer_open_file::close() -> bool { return open_file_base::close(); } -auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error { +auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active) + -> api_error { unique_mutex_lock chunk_lock(chunk_mtx_); - if (active_downloads_.find(chunk) != active_downloads_.end()) { - auto active_download = active_downloads_.at(chunk); + const auto unlock_and_notify = [this, &chunk_lock]() { chunk_notify_.notify_all(); chunk_lock.unlock(); + }; + + const auto unlock_and_return = + [&unlock_and_notify](api_error res) -> api_error { + unlock_and_notify(); + return res; + }; + + if (chunk < ring_begin_ || chunk > ring_end_) { + return unlock_and_return(api_error::invalid_ring_buffer_position); + } + + if (active_downloads_.find(chunk) != active_downloads_.end()) { + if (skip_active) { + return unlock_and_return(api_error::success); + } + + auto active_download = active_downloads_.at(chunk); + unlock_and_notify(); return active_download->wait(); } - if (ring_state_[chunk % ring_state_.size()]) { - auto active_download{std::make_shared()}; - active_downloads_[chunk] = active_download; - ring_state_[chunk % ring_state_.size()] = false; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - data_buffer buffer; - auto data_offset{chunk * chunk_size_}; - auto data_size{ - chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_, - }; - - auto res{ - provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer, - stop_requested_), - }; - if (res == api_error::success) { - res = do_io([&]() -> api_error { - std::size_t bytes_written{}; - if (nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_, - &bytes_written)) { - return api_error::success; - } - - return api_error::os_error; - }); - } - - active_download->notify(res); - - chunk_lock.lock(); - active_downloads_.erase(chunk); - chunk_notify_.notify_all(); - return res; + if (not ring_state_[chunk % ring_state_.size()]) { + return unlock_and_return(api_error::success); } - chunk_notify_.notify_all(); - chunk_lock.unlock(); + auto active_download{std::make_shared()}; + active_downloads_[chunk] = active_download; + ring_state_[chunk % ring_state_.size()] = false; + unlock_and_notify(); - return api_error::success; + data_buffer buffer; + auto data_offset{chunk * chunk_size_}; + auto data_size{ + chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_, + }; + + auto res{ + provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer, + stop_requested_), + }; + + chunk_lock.lock(); + if (res == api_error::success) { + res = (chunk >= ring_begin_ && chunk <= ring_end_) + ? do_io([&]() -> api_error { + std::size_t bytes_written{}; + if (nf_->write(buffer, + (chunk % ring_state_.size()) * chunk_size_, + &bytes_written)) { + return api_error::success; + } + + return api_error::os_error; + }) + : api_error::invalid_ring_buffer_position; + } + + active_downloads_.erase(chunk); + unlock_and_notify(); + + active_download->notify(res); + return res; } void ring_buffer_open_file::forward(std::size_t count) { @@ -179,6 +200,7 @@ void ring_buffer_open_file::forward(std::size_t count) { ring_begin_ += added; ring_pos_ += count; } + ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); } @@ -204,9 +226,7 @@ auto ring_buffer_open_file::native_operation( void ring_buffer_open_file::reverse(std::size_t count) { mutex_lock chunk_lock(chunk_mtx_); - if (ring_pos_ < count) { - count = ring_pos_; - } + count = std::min(ring_pos_, count); if ((ring_pos_ - count) >= ring_begin_) { ring_pos_ -= count; @@ -232,16 +252,14 @@ void ring_buffer_open_file::reverse(std::size_t count) { } auto ring_buffer_open_file::read(std::size_t read_size, - std::uint64_t read_offset, - data_buffer &data) -> api_error { + std::uint64_t read_offset, data_buffer &data) + -> api_error { if (fsi_.directory) { return api_error::invalid_operation; } reset_timeout(); - mutex_lock lock(read_mtx_); - read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); if (read_size == 0U) { return api_error::success; @@ -251,6 +269,8 @@ auto ring_buffer_open_file::read(std::size_t read_size, read_offset = read_offset - (begin_chunk * chunk_size_); auto res{api_error::success}; + + unique_mutex_lock read_lock(read_mtx_); for (std::size_t chunk = begin_chunk; (res == api_error::success) && (read_size > 0U); ++chunk) { if (chunk > ring_pos_) { @@ -259,10 +279,16 @@ auto ring_buffer_open_file::read(std::size_t read_size, reverse(ring_pos_ - chunk); } - reset_timeout(); - - res = download_chunk(chunk); + res = download_chunk(chunk, false); if (res != api_error::success) { + if (not stop_requested_ && + res == api_error::invalid_ring_buffer_position) { + read_lock.unlock(); + + // TODO limit retry + return read(read_size, read_offset, data); + } + return res; } @@ -331,4 +357,29 @@ void ring_buffer_open_file::set_api_path(const std::string &api_path) { open_file_base::set_api_path(api_path); chunk_notify_.notify_all(); } + +void ring_buffer_open_file::background_reader_thread() { + unique_mutex_lock read_lock(read_mtx_); + read_lock.unlock(); + + while (not stop_requested_) { + read_lock.lock(); + auto next_chunk = + ring_pos_ + 1U >= ring_state_.size() ? 0U : ring_pos_ + 1U; + if (not ring_state_[next_chunk % ring_state_.size()]) { + read_lock.unlock(); + continue; + } + + read_lock.unlock(); + download_chunk(next_chunk, true); + + unique_mutex_lock chunk_lock(chunk_mtx_); + if (ring_state_.none()) { + chunk_notify_.wait(chunk_lock); + } + + chunk_notify_.notify_all(); + } +} } // namespace repertory diff --git a/repertory/librepertory/src/types/repertory.cpp b/repertory/librepertory/src/types/repertory.cpp index c8ed6b70..43aca09c 100644 --- a/repertory/librepertory/src/types/repertory.cpp +++ b/repertory/librepertory/src/types/repertory.cpp @@ -25,8 +25,8 @@ #include "utils/string.hpp" namespace repertory { -auto database_type_from_string(std::string type, - database_type default_type) -> database_type { +auto database_type_from_string(std::string type, database_type default_type) + -> database_type { type = utils::string::to_lower(utils::string::trim(type)); if (type == "rocksdb") { return database_type::rocksdb; @@ -50,8 +50,8 @@ auto database_type_to_string(const database_type &type) -> std::string { } } -auto download_type_from_string(std::string type, - download_type default_type) -> download_type { +auto download_type_from_string(std::string type, download_type default_type) + -> download_type { type = utils::string::to_lower(utils::string::trim(type)); if (type == "direct") { return download_type::direct; @@ -106,6 +106,7 @@ static const std::unordered_map LOOKUP = { {api_error::invalid_handle, "invalid_handle"}, {api_error::invalid_operation, "invalid_operation"}, {api_error::invalid_ring_buffer_multiple, "invalid_ring_buffer_multiple"}, + {api_error::invalid_ring_buffer_position, "invalid_ring_buffer_position"}, {api_error::invalid_ring_buffer_size, "invalid_ring_buffer_size"}, {api_error::invalid_version, "invalid_version"}, {api_error::item_exists, "item_exists"},