From bc0e216b7500b1e3bfce9ade3bf1cdaed8b4f85b Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Thu, 26 Dec 2024 07:57:59 -0600 Subject: [PATCH] refactor direct_open_file --- .../include/file_manager/direct_open_file.hpp | 70 ++-- .../src/file_manager/direct_open_file.cpp | 312 ++++++++++++++++-- .../file_manager/ring_buffer_open_file.cpp | 10 +- 3 files changed, 338 insertions(+), 54 deletions(-) diff --git a/repertory/librepertory/include/file_manager/direct_open_file.hpp b/repertory/librepertory/include/file_manager/direct_open_file.hpp index 088f2573..a8ab3c1f 100644 --- a/repertory/librepertory/include/file_manager/direct_open_file.hpp +++ b/repertory/librepertory/include/file_manager/direct_open_file.hpp @@ -19,8 +19,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ -#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ +#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_ +#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_ #include "file_manager/open_file_base.hpp" @@ -45,11 +45,29 @@ public: auto operator=(const direct_open_file &) noexcept -> direct_open_file & = delete; +public: + static constexpr const auto ring_size{5U}; + private: - std::atomic last_progress_{0U}; + boost::dynamic_bitset<> ring_state_; std::size_t total_chunks_; + +private: + std::condition_variable chunk_notify_; + mutable std::mutex chunk_mtx_; + std::mutex read_mtx_; + std::unique_ptr reader_thread_; + std::size_t ring_begin_{}; + std::array ring_data_; + std::size_t ring_end_{}; + std::size_t ring_pos_{}; stop_type stop_requested_{false}; +private: + auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + + void background_reader_thread(); + protected: [[nodiscard]] auto is_download_complete() const -> bool override { return false; @@ -58,33 +76,39 @@ protected: public: auto close() -> bool override; + void forward(std::size_t count); + + [[nodiscard]] auto get_current_chunk() const -> std::size_t { + return ring_pos_; + } + + [[nodiscard]] auto get_first_chunk() const -> std::size_t { + return ring_begin_; + } + + [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } + + [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; + + [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; + + [[nodiscard]] auto get_total_chunks() const -> std::size_t { + return total_chunks_; + } + [[nodiscard]] auto is_complete() const -> bool override { return true; } [[nodiscard]] auto is_write_supported() const -> bool override { return false; } - [[nodiscard]] auto get_read_state() const - -> boost::dynamic_bitset<> override { - return {}; - } - - [[nodiscard]] auto get_read_state(std::size_t /* chunk */) const - -> bool override { - return false; - } - - [[nodiscard]] auto get_total_chunks() const -> std::uint64_t { - return total_chunks_; - } - [[nodiscard]] auto native_operation(native_operation_callback /* callback */) -> api_error override { return api_error::not_supported; } [[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */, - native_operation_callback /*callback*/) + native_operation_callback /* callback */) -> api_error override { return api_error::not_supported; } @@ -92,10 +116,16 @@ public: [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, data_buffer &data) -> api_error override; - [[nodiscard]] auto resize(std::uint64_t /*size*/) -> api_error override { + [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { return api_error::not_supported; } + void reverse(std::size_t count); + + void set(std::size_t first_chunk, std::size_t current_chunk); + + void set_api_path(const std::string &api_path) override; + [[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &) -> api_error override { return api_error::not_supported; @@ -103,4 +133,4 @@ public: }; } // namespace repertory -#endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ +#endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_ diff --git a/repertory/librepertory/src/file_manager/direct_open_file.cpp b/repertory/librepertory/src/file_manager/direct_open_file.cpp index 3da03e50..3fd75490 100644 --- a/repertory/librepertory/src/file_manager/direct_open_file.cpp +++ b/repertory/librepertory/src/file_manager/direct_open_file.cpp @@ -27,32 +27,234 @@ #include "providers/i_provider.hpp" #include "types/repertory.hpp" #include "utils/common.hpp" -#include "utils/time.hpp" namespace repertory { direct_open_file::direct_open_file(std::uint64_t chunk_size, - std::uint8_t chunk_timeout, - filesystem_item fsi, i_provider &provider) - : open_file_base(chunk_size, chunk_timeout, fsi, provider, true), + std::uint8_t chunk_timeout, + filesystem_item fsi, i_provider &provider) + : open_file_base(chunk_size, chunk_timeout, fsi, provider), + ring_state_(ring_size), total_chunks_(static_cast( - utils::divide_with_ceiling(fsi.size, chunk_size))) { - event_system::instance().raise(fsi_.api_path, ""); + utils::divide_with_ceiling(fsi_.size, chunk_size))) { + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + ring_state_.set(0U, ring_state_.size(), true); + + if (fsi_.size > 0U) { + reader_thread_ = + std::make_unique([this]() { background_reader_thread(); }); + } + + event_system::instance().raise(fsi_.api_path, "direct"); } -direct_open_file::~direct_open_file() { close(); } +direct_open_file::~direct_open_file() { + REPERTORY_USES_FUNCTION_NAME(); + + close(); + + if (reader_thread_) { + reader_thread_->join(); + reader_thread_.reset(); + } +} + +void direct_open_file::background_reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (not ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + + chunk_lock.lock(); + check_and_wait(); + } + + event_system::instance().raise(fsi_.api_path, "direct", + api_error::download_stopped); +} auto direct_open_file::close() -> bool { stop_requested_ = true; - last_progress_ = 0U; - auto ret = open_file_base::close(); - event_system::instance().raise(fsi_.api_path, "", - api_error::download_stopped); - return ret; + unique_mutex_lock chunk_lock(chunk_mtx_); + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + return open_file_base::close(); +} + +auto direct_open_file::download_chunk(std::size_t chunk, bool skip_active) + -> api_error { + unique_mutex_lock chunk_lock(chunk_mtx_); + const auto unlock_and_notify = [this, &chunk_lock]() { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + const auto unlock_and_return = + [&unlock_and_notify](api_error res) -> api_error { + unlock_and_notify(); + return res; + }; + + if (chunk < ring_begin_ || chunk > ring_end_) { + return unlock_and_return(api_error::invalid_ring_buffer_position); + } + + if (active_downloads_.find(chunk) != active_downloads_.end()) { + if (skip_active) { + return unlock_and_return(api_error::success); + } + + auto active_download = active_downloads_.at(chunk); + unlock_and_notify(); + + return active_download->wait(); + } + + if (not ring_state_[chunk % ring_state_.size()]) { + return unlock_and_return(api_error::success); + } + + auto active_download{std::make_shared()}; + active_downloads_[chunk] = active_download; + ring_state_[chunk % ring_state_.size()] = false; + unlock_and_notify(); + + auto &buffer = ring_data_.at(chunk % ring_state_.size()); + auto data_offset{chunk * chunk_size_}; + auto data_size{ + chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_, + }; + + event_system::instance().raise( + fsi_.api_path, "direct", chunk, get_read_state().size(), + get_read_state().count()); + + auto res{ + provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer, + stop_requested_), + }; + + chunk_lock.lock(); + if (res == api_error::success) { + auto progress = + (static_cast(chunk + 1U) / static_cast(total_chunks_)) * + 100.0; + event_system::instance().raise(fsi_.api_path, "direct", + progress); + } + + event_system::instance().raise( + fsi_.api_path, "direct", chunk, get_read_state().size(), + get_read_state().count(), res); + + active_downloads_.erase(chunk); + unlock_and_notify(); + + active_download->notify(res); + return res; +} + +void direct_open_file::forward(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + if ((ring_pos_ + count) > (total_chunks_ - 1U)) { + count = (total_chunks_ - 1U) - ring_pos_; + } + + if ((ring_pos_ + count) <= ring_end_) { + ring_pos_ += count; + } else { + auto added = count - (ring_end_ - ring_pos_); + if (added >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), true); + ring_pos_ += count; + ring_begin_ += added; + } else { + for (std::size_t idx = 0U; idx < added; ++idx) { + ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true; + } + ring_begin_ += added; + ring_pos_ += count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} + +auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> { + recur_mutex_lock file_lock(file_mtx_); + auto read_state = ring_state_; + return read_state.flip(); +} + +auto direct_open_file::get_read_state(std::size_t chunk) const -> bool { + recur_mutex_lock file_lock(file_mtx_); + return not ring_state_[chunk % ring_state_.size()]; +} + +void direct_open_file::reverse(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + count = std::min(ring_pos_, count); + + if ((ring_pos_ - count) >= ring_begin_) { + ring_pos_ -= count; + } else { + auto removed = count - (ring_pos_ - ring_begin_); + if (removed >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), true); + ring_pos_ -= count; + ring_begin_ = ring_pos_; + } else { + for (std::size_t idx = 0U; idx < removed; ++idx) { + ring_state_[(ring_end_ - idx) % ring_state_.size()] = true; + } + ring_begin_ -= removed; + ring_pos_ -= count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); } auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error { + data_buffer &data) -> api_error { if (fsi_.directory) { return api_error::invalid_operation; } @@ -64,23 +266,79 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, return api_error::success; } - auto res = provider_.read_file_bytes(fsi_.api_path, read_size, read_offset, - data, stop_requested_); - if (res != api_error::success) { - return res; - } + auto begin_chunk{static_cast(read_offset / chunk_size_)}; + read_offset = read_offset - (begin_chunk * chunk_size_); - reset_timeout(); - if ((utils::time::get_time_now() - last_progress_.load()) > - (2U * utils::time::NANOS_PER_SECOND)) { - last_progress_ = utils::time::get_time_now(); - auto progress = (static_cast(read_offset + read_size) / - static_cast(fsi_.size)) * - 100.0; - event_system::instance().raise(fsi_.api_path, "", - progress); + auto res{api_error::success}; + + unique_mutex_lock read_lock(read_mtx_); + for (std::size_t chunk = begin_chunk; + (res == api_error::success) && (read_size > 0U); ++chunk) { + if (chunk > ring_pos_) { + forward(chunk - ring_pos_); + } else if (chunk < ring_pos_) { + reverse(ring_pos_ - chunk); + } + + res = download_chunk(chunk, false); + if (res != api_error::success) { + if (not stop_requested_ && + res == api_error::invalid_ring_buffer_position) { + read_lock.unlock(); + + // TODO limit retry + return read(read_size, read_offset, data); + } + + return res; + } + + reset_timeout(); + + auto to_read{ + std::min(static_cast(chunk_size_ - read_offset), + read_size), + }; + + auto &source_buffer = ring_data_.at(chunk % ring_state_.size()); + auto begin = std::next(source_buffer.begin(), + static_cast(read_offset)); + auto end = std::next(begin, static_cast(to_read)); + data.insert(data.end(), begin, end); + + read_offset = 0U; } return res; } + +void direct_open_file::set(std::size_t first_chunk, + std::size_t current_chunk) { + mutex_lock chunk_lock(chunk_mtx_); + if (first_chunk >= total_chunks_) { + chunk_notify_.notify_all(); + throw std::runtime_error("first chunk must be less than total chunks"); + } + + ring_begin_ = first_chunk; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + + if (current_chunk > ring_end_) { + chunk_notify_.notify_all(); + throw std::runtime_error( + "current chunk must be less than or equal to last chunk"); + } + + ring_pos_ = current_chunk; + ring_state_.set(0U, ring_state_.size(), false); + + chunk_notify_.notify_all(); +} + +void direct_open_file::set_api_path(const std::string &api_path) { + mutex_lock chunk_lock(chunk_mtx_); + open_file_base::set_api_path(api_path); + chunk_notify_.notify_all(); +} } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index b6556609..23d6ea9e 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -45,12 +45,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, ring_state_(ring_size), total_chunks_(static_cast( utils::divide_with_ceiling(fsi_.size, chunk_size))) { - if ((ring_size % 2U) != 0U) { - throw std::runtime_error("ring size must be a multiple of 2"); - } - - if (ring_size < 4U) { - throw std::runtime_error("ring size must be greater than or equal to 4"); + if (ring_size < 5U) { + throw std::runtime_error("ring size must be greater than or equal to 5"); } if (not can_handle_file(fsi_.size, chunk_size, ring_size)) { @@ -149,7 +145,7 @@ void ring_buffer_open_file::background_reader_thread() { auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, std::size_t chunk_size, std::size_t ring_size) -> bool { - return file_size >= (static_cast(ring_size) * chunk_size * 2U); + return file_size >= (static_cast(ring_size) * chunk_size); } auto ring_buffer_open_file::close() -> bool {