From d67e41bc1dd068d365e97b1589e7badc8bc6914e Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Thu, 26 Dec 2024 20:43:47 -0600 Subject: [PATCH] refactor --- .../include/file_manager/direct_open_file.hpp | 15 +- .../file_manager/ring_buffer_open_file.hpp | 23 ++- .../src/file_manager/direct_open_file.cpp | 106 ++++++----- .../file_manager/ring_buffer_open_file.cpp | 170 ++++++++++-------- 4 files changed, 169 insertions(+), 145 deletions(-) diff --git a/repertory/librepertory/include/file_manager/direct_open_file.hpp b/repertory/librepertory/include/file_manager/direct_open_file.hpp index 9e97b735..77bd93fd 100644 --- a/repertory/librepertory/include/file_manager/direct_open_file.hpp +++ b/repertory/librepertory/include/file_manager/direct_open_file.hpp @@ -42,8 +42,8 @@ public: direct_open_file(const direct_open_file &) noexcept = delete; direct_open_file(direct_open_file &&) noexcept = delete; auto operator=(direct_open_file &&) noexcept -> direct_open_file & = delete; - auto operator=(const direct_open_file &) noexcept - -> direct_open_file & = delete; + auto + operator=(const direct_open_file &) noexcept -> direct_open_file & = delete; public: static constexpr const auto ring_size{5U}; @@ -64,10 +64,12 @@ private: stop_type stop_requested_{false}; private: - void background_reader_thread(); + [[nodiscard]] auto check_start() -> api_error; auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + void reader_thread(); + public: auto close() -> bool override; @@ -109,10 +111,9 @@ public: void set_api_path(const std::string &api_path) override; - [[nodiscard]] auto write(std::uint64_t /* write_offset */, - const data_buffer & /* data */, - std::size_t & /* bytes_written */) - -> api_error override { + [[nodiscard]] auto + write(std::uint64_t /* write_offset */, const data_buffer & /* data */, + std::size_t & /* bytes_written */) -> api_error override { return api_error::not_supported; } }; diff --git a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp index 29f92028..9bcd14f0 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp @@ -42,18 +42,17 @@ public: ring_buffer_open_file() = delete; ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete; ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete; - auto operator=(ring_buffer_open_file &&) noexcept - -> ring_buffer_open_file & = delete; + auto operator=(ring_buffer_open_file &&) noexcept -> ring_buffer_open_file & = + delete; auto operator=(const ring_buffer_open_file &) noexcept -> ring_buffer_open_file & = delete; private: boost::dynamic_bitset<> ring_state_; + std::string source_path_; std::size_t total_chunks_; private: - std::unique_ptr chunk_forward_thread_; - std::unique_ptr chunk_reverse_thread_; std::condition_variable chunk_notify_; mutable std::mutex chunk_mtx_; std::mutex read_mtx_; @@ -61,16 +60,15 @@ private: std::size_t ring_begin_{}; std::size_t ring_end_{}; std::size_t ring_pos_{}; - std::string source_path_; stop_type stop_requested_{false}; private: - void background_reader_thread(); - [[nodiscard]] auto check_allocation() -> api_error; auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + void reader_thread(); + public: [[nodiscard]] static auto can_handle_file(std::uint64_t file_size, std::size_t chunk_size, @@ -104,8 +102,8 @@ public: return false; } - [[nodiscard]] auto native_operation(native_operation_callback callback) - -> api_error override; + [[nodiscard]] auto + native_operation(native_operation_callback callback) -> api_error override; [[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */, native_operation_callback /* callback */) @@ -126,10 +124,9 @@ public: void set_api_path(const std::string &api_path) override; - [[nodiscard]] auto write(std::uint64_t /* write_offset */, - const data_buffer & /* data */, - std::size_t & /* bytes_written */) - -> api_error override { + [[nodiscard]] auto + write(std::uint64_t /* write_offset */, const data_buffer & /* data */, + std::size_t & /* bytes_written */) -> api_error override { return api_error::not_supported; } }; diff --git a/repertory/librepertory/src/file_manager/direct_open_file.cpp b/repertory/librepertory/src/file_manager/direct_open_file.cpp index c152a311..b04f5ad4 100644 --- a/repertory/librepertory/src/file_manager/direct_open_file.cpp +++ b/repertory/librepertory/src/file_manager/direct_open_file.cpp @@ -42,12 +42,7 @@ direct_open_file::direct_open_file(std::uint64_t chunk_size, ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); ring_state_.set(0U, ring_state_.size(), true); - - reader_thread_ = - std::make_unique([this]() { background_reader_thread(); }); } - - event_system::instance().raise(fsi_.api_path, "direct"); } direct_open_file::~direct_open_file() { @@ -61,48 +56,14 @@ direct_open_file::~direct_open_file() { } } -void direct_open_file::background_reader_thread() { - unique_mutex_lock chunk_lock(chunk_mtx_); - auto next_chunk = ring_pos_; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - while (not stop_requested_) { - chunk_lock.lock(); - - next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; - const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { - if (stop_requested_) { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - return; - } - - if (get_read_state().all()) { - chunk_notify_.wait(chunk_lock); - next_chunk = ring_pos_; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - if (not ring_state_[next_chunk % ring_state_.size()]) { - check_and_wait(); - continue; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - download_chunk(next_chunk, true); - - chunk_lock.lock(); - check_and_wait(); +auto direct_open_file::check_start() -> api_error { + if (fsi_.size == 0U || reader_thread_) { + return api_error::success; } - event_system::instance().raise(fsi_.api_path, "direct", - api_error::download_stopped); + event_system::instance().raise(fsi_.api_path, "direct"); + reader_thread_ = std::make_unique([this]() { reader_thread(); }); + return api_error::success; } auto direct_open_file::close() -> bool { @@ -115,8 +76,8 @@ auto direct_open_file::close() -> bool { return open_file_base::close(); } -auto direct_open_file::download_chunk(std::size_t chunk, bool skip_active) - -> api_error { +auto direct_open_file::download_chunk(std::size_t chunk, + bool skip_active) -> api_error { unique_mutex_lock chunk_lock(chunk_mtx_); const auto unlock_and_notify = [this, &chunk_lock]() { chunk_notify_.notify_all(); @@ -266,9 +227,12 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, auto begin_chunk{static_cast(read_offset / chunk_size_)}; read_offset = read_offset - (begin_chunk * chunk_size_); - auto res{api_error::success}; - unique_mutex_lock read_lock(read_mtx_); + auto res = check_start(); + if (res != api_error::success) { + return res; + } + for (std::size_t chunk = begin_chunk; not stop_requested_ && (res == api_error::success) && (read_size > 0U); ++chunk) { @@ -313,6 +277,50 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, return stop_requested_ ? api_error::download_stopped : res; } +void direct_open_file::reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (not ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + + chunk_lock.lock(); + check_and_wait(); + } + + event_system::instance().raise(fsi_.api_path, "direct", + api_error::download_stopped); +} + void direct_open_file::set_api_path(const std::string &api_path) { mutex_lock chunk_lock(chunk_mtx_); open_file_base::set_api_path(api_path); diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index 8f26360b..712cde7a 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -21,7 +21,6 @@ */ #include "file_manager/ring_buffer_open_file.hpp" -#include "app_config.hpp" #include "events/event_system.hpp" #include "file_manager/events.hpp" #include "file_manager/open_file_base.hpp" @@ -43,6 +42,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, std::size_t ring_size) : open_file_base(chunk_size, chunk_timeout, fsi, provider), ring_state_(ring_size), + source_path_(utils::path::combine(buffer_directory, + {utils::create_uuid_string()})), total_chunks_(static_cast( utils::divide_with_ceiling(fsi_.size, chunk_size))) { if (ring_size < 5U) { @@ -55,31 +56,6 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U); ring_state_.set(0U, ring_size, true); - - buffer_directory = utils::path::absolute(buffer_directory); - if (not utils::file::directory(buffer_directory).create_directory()) { - throw std::runtime_error( - fmt::format("failed to create buffer directory|path|{}|err|{}", - buffer_directory, utils::get_last_error_code())); - } - - source_path_ = - utils::path::combine(buffer_directory, {utils::create_uuid_string()}); - nf_ = utils::file::file::open_or_create_file(source_path_); - if (not*nf_) { - throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", - utils::get_last_error_code())); - } - - if (not nf_->truncate(ring_size * chunk_size)) { - nf_->close(); - throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}", - utils::get_last_error_code())); - } - - reader_thread_ = - std::make_unique([this]() { background_reader_thread(); }); - event_system::instance().raise(fsi_.api_path, source_path_); } ring_buffer_open_file::~ring_buffer_open_file() { @@ -89,6 +65,8 @@ ring_buffer_open_file::~ring_buffer_open_file() { if (nf_) { nf_->close(); + nf_.reset(); + if (not utils::file::file(source_path_).remove()) { utils::error::raise_api_path_error( function_name, fsi_.api_path, source_path_, @@ -102,50 +80,6 @@ ring_buffer_open_file::~ring_buffer_open_file() { } } -void ring_buffer_open_file::background_reader_thread() { - unique_mutex_lock chunk_lock(chunk_mtx_); - auto next_chunk = ring_pos_; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - while (not stop_requested_) { - chunk_lock.lock(); - - next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; - const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { - if (stop_requested_) { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - return; - } - - if (get_read_state().all()) { - chunk_notify_.wait(chunk_lock); - next_chunk = ring_pos_; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - if (not ring_state_[next_chunk % ring_state_.size()]) { - check_and_wait(); - continue; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - download_chunk(next_chunk, true); - - chunk_lock.lock(); - check_and_wait(); - } - - event_system::instance().raise(fsi_.api_path, source_path_, - api_error::download_stopped); -} - auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, std::size_t chunk_size, std::size_t ring_size) -> bool { @@ -153,7 +87,44 @@ auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, } auto ring_buffer_open_file::check_allocation() -> api_error { + REPERTORY_USES_FUNCTION_NAME(); + if (nf_) { + return api_error::success; + } + + auto buffer_directory{utils::path::get_parent_path(source_path_)}; + if (not utils::file::directory(buffer_directory).create_directory()) { + utils::error::raise_api_path_error( + function_name, fsi_.api_path, source_path_, + fmt::format("failed to create buffer directory|path|{}|err|{}", + buffer_directory, utils::get_last_error_code())); + return api_error::os_error; + } + + nf_ = utils::file::file::open_or_create_file(source_path_); + if (not nf_ || not *nf_) { + utils::error::raise_api_path_error( + function_name, fsi_.api_path, source_path_, + fmt::format("failed to create buffer file|err|{}", + utils::get_last_error_code())); + return api_error::os_error; + } + + if (not nf_->truncate(ring_state_.size() * chunk_size_)) { + nf_->close(); + nf_.reset(); + + utils::error::raise_api_path_error( + function_name, fsi_.api_path, source_path_, + fmt::format("failed to resize buffer file|err|{}", + utils::get_last_error_code())); + return api_error::os_error; + } + + event_system::instance().raise(fsi_.api_path, source_path_); + reader_thread_ = std::make_unique([this]() { reader_thread(); }); + return api_error::success; } auto ring_buffer_open_file::close() -> bool { @@ -166,8 +137,8 @@ auto ring_buffer_open_file::close() -> bool { return open_file_base::close(); } -auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active) - -> api_error { +auto ring_buffer_open_file::download_chunk(std::size_t chunk, + bool skip_active) -> api_error { unique_mutex_lock chunk_lock(chunk_mtx_); const auto unlock_and_notify = [this, &chunk_lock]() { chunk_notify_.notify_all(); @@ -316,8 +287,8 @@ void ring_buffer_open_file::reverse(std::size_t count) { } auto ring_buffer_open_file::read(std::size_t read_size, - std::uint64_t read_offset, data_buffer &data) - -> api_error { + std::uint64_t read_offset, + data_buffer &data) -> api_error { if (fsi_.directory) { return api_error::invalid_operation; } @@ -332,9 +303,12 @@ auto ring_buffer_open_file::read(std::size_t read_size, auto begin_chunk{static_cast(read_offset / chunk_size_)}; read_offset = read_offset - (begin_chunk * chunk_size_); - auto res{api_error::success}; - unique_mutex_lock read_lock(read_mtx_); + auto res = check_allocation(); + if (res != api_error::success) { + return res; + } + for (std::size_t chunk = begin_chunk; not stop_requested_ && (res == api_error::success) && (read_size > 0U); ++chunk) { @@ -395,6 +369,50 @@ auto ring_buffer_open_file::read(std::size_t read_size, return stop_requested_ ? api_error::download_stopped : res; } +void ring_buffer_open_file::reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (not ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + + chunk_lock.lock(); + check_and_wait(); + } + + event_system::instance().raise(fsi_.api_path, source_path_, + api_error::download_stopped); +} + void ring_buffer_open_file::set(std::size_t first_chunk, std::size_t current_chunk) { mutex_lock chunk_lock(chunk_mtx_);