From 827d0b537150efc9fd9bddf077ed12388f90d8cf Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 28 Dec 2024 11:25:15 -0600 Subject: [PATCH] refactor --- .../include/file_manager/direct_open_file.hpp | 75 +--- .../file_manager/ring_buffer_common.hpp | 151 +++++++ .../file_manager/ring_buffer_open_file.hpp | 77 +--- .../src/file_manager/direct_open_file.cpp | 296 +------------ .../src/file_manager/ring_buffer_common.cpp | 372 ++++++++++++++++ .../file_manager/ring_buffer_open_file.cpp | 408 +++--------------- 6 files changed, 633 insertions(+), 746 deletions(-) create mode 100644 repertory/librepertory/include/file_manager/ring_buffer_common.hpp create mode 100644 repertory/librepertory/src/file_manager/ring_buffer_common.cpp diff --git a/repertory/librepertory/include/file_manager/direct_open_file.hpp b/repertory/librepertory/include/file_manager/direct_open_file.hpp index 8de2b7e0..f19b6d71 100644 --- a/repertory/librepertory/include/file_manager/direct_open_file.hpp +++ b/repertory/librepertory/include/file_manager/direct_open_file.hpp @@ -22,7 +22,7 @@ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ -#include "file_manager/open_file_base.hpp" +#include "file_manager/ring_buffer_common.hpp" #include "types/repertory.hpp" @@ -30,7 +30,7 @@ namespace repertory { class i_provider; class i_upload_manager; -class direct_open_file final : public open_file_base { +class direct_open_file final : public ring_buffer_common { public: direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider); @@ -45,52 +45,28 @@ public: auto operator=(const direct_open_file &) noexcept -> direct_open_file & = delete; -public: - static constexpr const auto ring_size{5U}; - private: - std::size_t total_chunks_; + std::array ring_data_; -private: - mutable std::mutex chunk_mtx_; - std::condition_variable chunk_notify_; - std::mutex read_mtx_; - std::unique_ptr reader_thread_; - std::size_t ring_begin_{}; - std::array ring_data_; - std::size_t ring_end_{}; - std::size_t ring_pos_{}; - boost::dynamic_bitset<> ring_state_{ring_size}; - stop_type stop_requested_{false}; +protected: + [[nodiscard]] auto on_check_start() -> bool override; -private: - [[nodiscard]] auto check_start() -> api_error; - - auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; - - void reader_thread(); - - void update_position(std::size_t count, bool is_forward); - -public: - auto close() -> bool override; - - void forward(std::size_t count); - - [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; - - [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; - - [[nodiscard]] auto get_total_chunks() const -> std::size_t { - return total_chunks_; + [[nodiscard]] auto + on_chunk_downloaded(std::size_t /* chunk */, + const data_buffer & /* buffer */) -> api_error override { + return api_error::success; } - [[nodiscard]] auto is_complete() const -> bool override { return false; } + [[nodiscard]] auto + on_read_chunk(std::size_t chunk, std::size_t read_size, + std::uint64_t read_offset, data_buffer &data, + std::size_t &bytes_read) -> api_error override; - [[nodiscard]] auto is_write_supported() const -> bool override { - return false; - } + [[nodiscard]] auto use_buffer(std::size_t chunk, + std::function func) + -> api_error override; +public: [[nodiscard]] auto native_operation(native_operation_callback /* callback */) -> api_error override { return api_error::not_supported; @@ -101,23 +77,6 @@ public: -> api_error override { return api_error::not_supported; } - - [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error override; - - [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { - return api_error::not_supported; - } - - void reverse(std::size_t count); - - void set_api_path(const std::string &api_path) override; - - [[nodiscard]] auto - write(std::uint64_t /* write_offset */, const data_buffer & /* data */, - std::size_t & /* bytes_written */) -> api_error override { - return api_error::not_supported; - } }; } // namespace repertory diff --git a/repertory/librepertory/include/file_manager/ring_buffer_common.hpp b/repertory/librepertory/include/file_manager/ring_buffer_common.hpp new file mode 100644 index 00000000..865f7f0b --- /dev/null +++ b/repertory/librepertory/include/file_manager/ring_buffer_common.hpp @@ -0,0 +1,151 @@ +/* + Copyright <2018-2024> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_ +#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_ + +#include "file_manager/open_file_base.hpp" + +#include "types/repertory.hpp" +#include "utils/file.hpp" + +namespace repertory { +class i_provider; +class i_upload_manager; + +class ring_buffer_common : public open_file_base { +public: + ring_buffer_common(std::uint64_t chunk_size, std::uint8_t chunk_timeout, + filesystem_item fsi, i_provider &provider, + std::size_t ring_size, bool disable_io); + + ~ring_buffer_common() override = default; + +public: + ring_buffer_common() = delete; + ring_buffer_common(const ring_buffer_common &) noexcept = delete; + ring_buffer_common(ring_buffer_common &&) noexcept = delete; + auto + operator=(ring_buffer_common &&) noexcept -> ring_buffer_common & = delete; + auto operator=(const ring_buffer_common &) noexcept -> ring_buffer_common & = + delete; + +public: + static constexpr const auto min_ring_size{5U}; + +private: + boost::dynamic_bitset<> ring_state_; + std::size_t total_chunks_; + +private: + std::condition_variable chunk_notify_; + mutable std::mutex chunk_mtx_; + std::mutex read_mtx_; + std::unique_ptr reader_thread_; + std::size_t ring_begin_{}; + std::size_t ring_end_{}; + std::size_t ring_pos_{}; + stop_type stop_requested_{false}; + +private: + [[nodiscard]] auto check_start() -> api_error; + + auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + + void reader_thread(); + + void update_position(std::size_t count, bool is_forward); + +protected: + [[nodiscard]] auto has_reader_thread() const -> bool { + return reader_thread_ != nullptr; + } + + [[nodiscard]] auto get_ring_size() const -> std::size_t { + return ring_state_.size(); + } + + [[nodiscard]] virtual auto on_check_start() -> bool = 0; + + [[nodiscard]] virtual auto + on_chunk_downloaded(std::size_t chunk, + const data_buffer &buffer) -> api_error = 0; + + [[nodiscard]] virtual auto + on_read_chunk(std::size_t chunk, std::size_t read_size, + std::uint64_t read_offset, data_buffer &data, + std::size_t &bytes_read) -> api_error = 0; + + [[nodiscard]] virtual auto + use_buffer(std::size_t chunk, + std::function func) -> api_error = 0; + +public: + auto close() -> bool override; + + void forward(std::size_t count); + + [[nodiscard]] auto get_current_chunk() const -> std::size_t { + return ring_pos_; + } + + [[nodiscard]] auto get_first_chunk() const -> std::size_t { + return ring_begin_; + } + + [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } + + [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; + + [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; + + [[nodiscard]] auto get_total_chunks() const -> std::size_t { + return total_chunks_; + } + + [[nodiscard]] auto is_complete() const -> bool override { return false; } + + [[nodiscard]] auto is_write_supported() const -> bool override { + return false; + } + + [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, + data_buffer &data) -> api_error override; + + [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { + return api_error::not_supported; + } + + void reverse(std::size_t count); + + void set(std::size_t first_chunk, std::size_t current_chunk); + + void set_api_path(const std::string &api_path) override; + + [[nodiscard]] auto + write(std::uint64_t /* write_offset */, const data_buffer & /* data */, + std::size_t & /* bytes_written */) -> api_error override { + return api_error::not_supported; + } +}; +} // namespace repertory + +#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_ diff --git a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp index 684897aa..eac354ee 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp @@ -22,7 +22,7 @@ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ -#include "file_manager/open_file_base.hpp" +#include "file_manager/ring_buffer_common.hpp" #include "types/repertory.hpp" #include "utils/file.hpp" @@ -31,7 +31,7 @@ namespace repertory { class i_provider; class i_upload_manager; -class ring_buffer_open_file final : public open_file_base { +class ring_buffer_open_file final : public ring_buffer_common { public: ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, @@ -51,61 +51,31 @@ public: private: boost::dynamic_bitset<> ring_state_; std::string source_path_; - std::size_t total_chunks_; private: - std::condition_variable chunk_notify_; - mutable std::mutex chunk_mtx_; std::unique_ptr nf_; - std::mutex read_mtx_; - std::unique_ptr reader_thread_; - std::size_t ring_begin_{}; - std::size_t ring_end_{}; - std::size_t ring_pos_{}; - stop_type stop_requested_{false}; -private: - [[nodiscard]] auto check_start() -> api_error; +protected: + [[nodiscard]] auto on_check_start() -> bool override; - auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + [[nodiscard]] auto + on_chunk_downloaded(std::size_t chunk, + const data_buffer &buffer) -> api_error override; - void reader_thread(); + [[nodiscard]] auto + on_read_chunk(std::size_t chunk, std::size_t read_size, + std::uint64_t read_offset, data_buffer &data, + std::size_t &bytes_read) -> api_error override; - void update_position(std::size_t count, bool is_forward); + [[nodiscard]] auto use_buffer(std::size_t chunk, + std::function func) + -> api_error override; public: [[nodiscard]] static auto can_handle_file(std::uint64_t file_size, std::size_t chunk_size, std::size_t ring_size) -> bool; - auto close() -> bool override; - - void forward(std::size_t count); - - [[nodiscard]] auto get_current_chunk() const -> std::size_t { - return ring_pos_; - } - - [[nodiscard]] auto get_first_chunk() const -> std::size_t { - return ring_begin_; - } - - [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } - - [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; - - [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; - - [[nodiscard]] auto get_total_chunks() const -> std::size_t { - return total_chunks_; - } - - [[nodiscard]] auto is_complete() const -> bool override { return false; } - - [[nodiscard]] auto is_write_supported() const -> bool override { - return false; - } - [[nodiscard]] auto native_operation(native_operation_callback callback) -> api_error override; @@ -115,23 +85,8 @@ public: return api_error::not_supported; } - [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error override; - - [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { - return api_error::not_supported; - } - - void reverse(std::size_t count); - - void set(std::size_t first_chunk, std::size_t current_chunk); - - void set_api_path(const std::string &api_path) override; - - [[nodiscard]] auto - write(std::uint64_t /* write_offset */, const data_buffer & /* data */, - std::size_t & /* bytes_written */) -> api_error override { - return api_error::not_supported; + [[nodiscard]] auto get_source_path() const -> std::string override { + return source_path_; } }; } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/direct_open_file.cpp b/repertory/librepertory/src/file_manager/direct_open_file.cpp index e2466187..67467051 100644 --- a/repertory/librepertory/src/file_manager/direct_open_file.cpp +++ b/repertory/librepertory/src/file_manager/direct_open_file.cpp @@ -21,301 +21,43 @@ */ #include "file_manager/direct_open_file.hpp" -#include "events/event_system.hpp" -#include "file_manager/events.hpp" #include "file_manager/open_file_base.hpp" #include "providers/i_provider.hpp" #include "types/repertory.hpp" -#include "utils/common.hpp" namespace repertory { direct_open_file::direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider) - : open_file_base(chunk_size, chunk_timeout, fsi, provider, true), - total_chunks_(static_cast( - utils::divide_with_ceiling(fsi.size, chunk_size))) { - if (fsi.size > 0U) { - ring_state_.resize(std::min(total_chunks_, ring_state_.size())); - - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - ring_state_.set(0U, ring_state_.size(), false); - } -} + : ring_buffer_common(chunk_size, chunk_timeout, fsi, provider, + min_ring_size, true) {} direct_open_file::~direct_open_file() { REPERTORY_USES_FUNCTION_NAME(); close(); - - if (reader_thread_) { - reader_thread_->join(); - reader_thread_.reset(); - } } -auto direct_open_file::check_start() -> api_error { - if (get_file_size() == 0U || reader_thread_) { - return api_error::success; - } +auto direct_open_file::on_check_start() -> bool { + return (get_file_size() == 0U || has_reader_thread()); +} - event_system::instance().raise(get_api_path(), "direct"); - reader_thread_ = std::make_unique([this]() { reader_thread(); }); +auto direct_open_file::on_read_chunk(std::size_t chunk, std::size_t read_size, + std::uint64_t read_offset, + data_buffer &data, + std::size_t &bytes_read) -> api_error { + auto &buffer = ring_data_.at(chunk % get_ring_size()); + auto begin = + std::next(buffer.begin(), static_cast(read_offset)); + auto end = std::next(begin, static_cast(read_size)); + data.insert(data.end(), begin, end); + bytes_read = read_size; return api_error::success; } -auto direct_open_file::close() -> bool { - stop_requested_ = true; - - unique_mutex_lock chunk_lock(chunk_mtx_); - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - return open_file_base::close(); -} - -auto direct_open_file::download_chunk(std::size_t chunk, - bool skip_active) -> api_error { - unique_mutex_lock chunk_lock(chunk_mtx_); - const auto unlock_and_notify = [this, &chunk_lock]() { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - const auto unlock_and_return = - [&unlock_and_notify](api_error res) -> api_error { - unlock_and_notify(); - return res; - }; - - if (chunk < ring_begin_ || chunk > ring_end_) { - return unlock_and_return(api_error::invalid_ring_buffer_position); - } - - if (get_active_downloads().find(chunk) != get_active_downloads().end()) { - if (skip_active) { - return unlock_and_return(api_error::success); - } - - auto active_download = get_active_downloads().at(chunk); - unlock_and_notify(); - - return active_download->wait(); - } - - if (ring_state_[chunk % ring_state_.size()]) { - return unlock_and_return(api_error::success); - } - - auto active_download{std::make_shared()}; - get_active_downloads()[chunk] = active_download; - - auto &buffer = ring_data_.at(chunk % ring_state_.size()); - auto data_offset{chunk * get_chunk_size()}; - auto data_size{ - chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), - }; - unlock_and_notify(); - - auto res{ - get_provider().read_file_bytes(get_api_path(), data_size, data_offset, - buffer, stop_requested_), - }; - - chunk_lock.lock(); - if (chunk < ring_begin_ || chunk > ring_end_) { - res = api_error::invalid_ring_buffer_position; - unlock_and_notify(); - active_download->notify(res); - return res; - } - - if (res == api_error::success) { - ring_state_[chunk % ring_state_.size()] = true; - auto progress = - (static_cast(chunk + 1U) / static_cast(total_chunks_)) * - 100.0; - event_system::instance().raise(get_api_path(), "direct", - progress); - } - - get_active_downloads().erase(chunk); - unlock_and_notify(); - - active_download->notify(res); - return res; -} - -void direct_open_file::forward(std::size_t count) { - return update_position(count, true); -} - -auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_; -} - -auto direct_open_file::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_[chunk % ring_state_.size()]; -} - -void direct_open_file::reverse(std::size_t count) { - return update_position(count, false); -} - -auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error { - if (is_directory()) { - return api_error::invalid_operation; - } - - reset_timeout(); - - read_size = - utils::calculate_read_size(get_file_size(), read_size, read_offset); - if (read_size == 0U) { - return api_error::success; - } - - auto begin_chunk{static_cast(read_offset / get_chunk_size())}; - read_offset = read_offset - (begin_chunk * get_chunk_size()); - - unique_mutex_lock read_lock(read_mtx_); - auto res = check_start(); - if (res != api_error::success) { - return res; - } - - for (std::size_t chunk = begin_chunk; - not stop_requested_ && (res == api_error::success) && (read_size > 0U); - ++chunk) { - reset_timeout(); - - if (chunk > ring_pos_) { - forward(chunk - ring_pos_); - } else if (chunk < ring_pos_) { - reverse(ring_pos_ - chunk); - } - - res = download_chunk(chunk, false); - if (res != api_error::success) { - if (res == api_error::invalid_ring_buffer_position) { - read_lock.unlock(); - - // TODO limit retry - return read(read_size, read_offset, data); - } - - return res; - } - - reset_timeout(); - - auto to_read{ - std::min(static_cast(get_chunk_size() - read_offset), - read_size), - }; - - auto &buffer = ring_data_.at(chunk % ring_state_.size()); - auto begin = - std::next(buffer.begin(), static_cast(read_offset)); - auto end = std::next(begin, static_cast(to_read)); - data.insert(data.end(), begin, end); - - read_offset = 0U; - read_size -= to_read; - } - - return stop_requested_ ? api_error::download_stopped : res; -} - -void direct_open_file::reader_thread() { - unique_mutex_lock chunk_lock(chunk_mtx_); - auto next_chunk = ring_pos_; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - while (not stop_requested_) { - chunk_lock.lock(); - - next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; - const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { - if (stop_requested_) { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - return; - } - - if (get_read_state().all()) { - chunk_notify_.wait(chunk_lock); - next_chunk = ring_pos_; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - if (ring_state_[next_chunk % ring_state_.size()]) { - check_and_wait(); - continue; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - download_chunk(next_chunk, true); - } - - event_system::instance().raise(get_api_path(), "direct", - api_error::download_stopped); -} - -void direct_open_file::set_api_path(const std::string &api_path) { - mutex_lock chunk_lock(chunk_mtx_); - open_file_base::set_api_path(api_path); - chunk_notify_.notify_all(); -} - -void direct_open_file::update_position(std::size_t count, bool is_forward) { - mutex_lock chunk_lock(chunk_mtx_); - - if (is_forward) { - if ((ring_pos_ + count) > (total_chunks_ - 1U)) { - count = (total_chunks_ - 1U) - ring_pos_; - } - } else { - count = std::min(ring_pos_, count); - } - - if (is_forward ? (ring_pos_ + count) <= ring_end_ - : (ring_pos_ - count) >= ring_begin_) { - ring_pos_ += is_forward ? count : -count; - } else { - auto delta = is_forward ? count - (ring_end_ - ring_pos_) - : count - (ring_pos_ - ring_begin_); - - if (delta >= ring_state_.size()) { - ring_state_.set(0U, ring_state_.size(), false); - ring_pos_ += is_forward ? count : -count; - ring_begin_ += is_forward ? delta : -delta; - } else { - for (std::size_t idx = 0U; idx < delta; ++idx) { - if (is_forward) { - ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; - } else { - ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; - } - } - ring_begin_ += is_forward ? delta : -delta; - ring_pos_ += is_forward ? count : -count; - } - - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - } - - chunk_notify_.notify_all(); +auto direct_open_file::use_buffer(std::size_t chunk, + std::function func) + -> api_error { + return func(ring_data_.at(chunk % get_ring_size())); } } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_buffer_common.cpp b/repertory/librepertory/src/file_manager/ring_buffer_common.cpp new file mode 100644 index 00000000..58ddcbc7 --- /dev/null +++ b/repertory/librepertory/src/file_manager/ring_buffer_common.cpp @@ -0,0 +1,372 @@ +/* + Copyright <2018-2024> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#include "file_manager/ring_buffer_common.hpp" + +#include "events/event_system.hpp" +#include "file_manager/events.hpp" +#include "file_manager/open_file_base.hpp" +#include "platform/platform.hpp" +#include "providers/i_provider.hpp" +#include "types/repertory.hpp" +#include "utils/common.hpp" +#include "utils/error_utils.hpp" + +namespace repertory { +ring_buffer_common::ring_buffer_common(std::uint64_t chunk_size, + std::uint8_t chunk_timeout, + filesystem_item fsi, + i_provider &provider, + std::size_t ring_size, bool disable_io) + : open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io), + ring_state_(ring_size), + total_chunks_(static_cast( + utils::divide_with_ceiling(fsi.size, chunk_size))) { + if (disable_io) { + if (fsi.size > 0U) { + ring_state_.resize(std::min(total_chunks_, ring_state_.size())); + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + ring_state_.set(0U, ring_state_.size(), false); + } + } else { + if (ring_size < min_ring_size) { + throw std::runtime_error("ring size must be greater than or equal to 5"); + } + + ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U); + ring_state_.set(0U, ring_size, false); + } +} + +auto ring_buffer_common::check_start() -> api_error { + REPERTORY_USES_FUNCTION_NAME(); + + try { + if (on_check_start()) { + return api_error::success; + } + + event_system::instance().raise(get_api_path(), + get_source_path()); + reader_thread_ = + std::make_unique([this]() { reader_thread(); }); + return api_error::success; + } catch (const std::exception &ex) { + utils::error::raise_api_path_error(function_name, get_api_path(), + get_source_path(), ex, + "failed to start"); + return api_error::error; + } +} + +auto ring_buffer_common::close() -> bool { + stop_requested_ = true; + + unique_mutex_lock chunk_lock(chunk_mtx_); + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + auto res = open_file_base::close(); + + if (reader_thread_) { + reader_thread_->join(); + reader_thread_.reset(); + } + + return res; +} + +auto ring_buffer_common::download_chunk(std::size_t chunk, + bool skip_active) -> api_error { + unique_mutex_lock chunk_lock(chunk_mtx_); + const auto unlock_and_notify = [this, &chunk_lock]() { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + const auto unlock_and_return = + [&unlock_and_notify](api_error res) -> api_error { + unlock_and_notify(); + return res; + }; + + if (chunk < ring_begin_ || chunk > ring_end_) { + return unlock_and_return(api_error::invalid_ring_buffer_position); + } + + if (get_active_downloads().find(chunk) != get_active_downloads().end()) { + if (skip_active) { + return unlock_and_return(api_error::success); + } + + auto active_download = get_active_downloads().at(chunk); + unlock_and_notify(); + + return active_download->wait(); + } + + if (ring_state_[chunk % ring_state_.size()]) { + return unlock_and_return(api_error::success); + } + + auto active_download{std::make_shared()}; + get_active_downloads()[chunk] = active_download; + + return use_buffer(chunk, [&](data_buffer &buffer) -> api_error { + auto data_offset{chunk * get_chunk_size()}; + auto data_size{ + chunk == (total_chunks_ - 1U) ? get_last_chunk_size() + : get_chunk_size(), + }; + unlock_and_notify(); + + auto result{ + get_provider().read_file_bytes(get_api_path(), data_size, data_offset, + buffer, stop_requested_), + }; + + chunk_lock.lock(); + if (chunk < ring_begin_ || chunk > ring_end_) { + result = api_error::invalid_ring_buffer_position; + unlock_and_notify(); + active_download->notify(result); + return result; + } + + if (result == api_error::success) { + result = on_chunk_downloaded(chunk, buffer); + if (result == api_error::success) { + ring_state_[chunk % ring_state_.size()] = true; + auto progress = (static_cast(chunk + 1U) / + static_cast(total_chunks_)) * + 100.0; + event_system::instance().raise( + get_api_path(), get_source_path(), progress); + } + } + + get_active_downloads().erase(chunk); + unlock_and_notify(); + + active_download->notify(result); + return result; + }); +} + +void ring_buffer_common::forward(std::size_t count) { + update_position(count, true); +} + +auto ring_buffer_common::get_read_state() const -> boost::dynamic_bitset<> { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_; +} + +auto ring_buffer_common::get_read_state(std::size_t chunk) const -> bool { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_[chunk % ring_state_.size()]; +} + +auto ring_buffer_common::read(std::size_t read_size, std::uint64_t read_offset, + data_buffer &data) -> api_error { + if (is_directory()) { + return api_error::invalid_operation; + } + + reset_timeout(); + + read_size = + utils::calculate_read_size(get_file_size(), read_size, read_offset); + if (read_size == 0U) { + return api_error::success; + } + + auto begin_chunk{static_cast(read_offset / get_chunk_size())}; + read_offset = read_offset - (begin_chunk * get_chunk_size()); + + unique_mutex_lock read_lock(read_mtx_); + auto res = check_start(); + if (res != api_error::success) { + return res; + } + + for (std::size_t chunk = begin_chunk; + not stop_requested_ && (res == api_error::success) && (read_size > 0U); + ++chunk) { + reset_timeout(); + + if (chunk > ring_pos_) { + forward(chunk - ring_pos_); + } else if (chunk < ring_pos_) { + reverse(ring_pos_ - chunk); + } + + res = download_chunk(chunk, false); + if (res != api_error::success) { + if (res == api_error::invalid_ring_buffer_position) { + read_lock.unlock(); + + // TODO limit retry + return read(read_size, read_offset, data); + } + + return res; + } + + reset_timeout(); + + std::size_t bytes_read{}; + res = on_read_chunk( + chunk, + std::min(static_cast(get_chunk_size() - read_offset), + read_size), + read_offset, data, bytes_read); + if (res != api_error::success) { + return res; + } + + reset_timeout(); + + read_size -= bytes_read; + read_offset = 0U; + } + + return stop_requested_ ? api_error::download_stopped : res; +} + +void ring_buffer_common::reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + } + + event_system::instance().raise( + get_api_path(), get_source_path(), api_error::download_stopped); +} + +void ring_buffer_common::reverse(std::size_t count) { + update_position(count, false); +} + +void ring_buffer_common::set(std::size_t first_chunk, + std::size_t current_chunk) { + mutex_lock chunk_lock(chunk_mtx_); + if (first_chunk >= total_chunks_) { + chunk_notify_.notify_all(); + throw std::runtime_error("first chunk must be less than total chunks"); + } + + ring_begin_ = first_chunk; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + + if (current_chunk > ring_end_) { + chunk_notify_.notify_all(); + throw std::runtime_error( + "current chunk must be less than or equal to last chunk"); + } + + ring_pos_ = current_chunk; + ring_state_.set(0U, ring_state_.size(), true); + + chunk_notify_.notify_all(); +} + +void ring_buffer_common::set_api_path(const std::string &api_path) { + mutex_lock chunk_lock(chunk_mtx_); + open_file_base::set_api_path(api_path); + chunk_notify_.notify_all(); +} + +void ring_buffer_common::update_position(std::size_t count, bool is_forward) { + mutex_lock chunk_lock(chunk_mtx_); + + if (is_forward) { + if ((ring_pos_ + count) > (total_chunks_ - 1U)) { + count = (total_chunks_ - 1U) - ring_pos_; + } + } else { + count = std::min(ring_pos_, count); + } + + if (is_forward ? (ring_pos_ + count) <= ring_end_ + : (ring_pos_ - count) >= ring_begin_) { + ring_pos_ += is_forward ? count : -count; + } else { + auto delta = is_forward ? count - (ring_end_ - ring_pos_) + : count - (ring_pos_ - ring_begin_); + + if (delta >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), false); + ring_pos_ += is_forward ? count : -count; + ring_begin_ += is_forward ? delta : -delta; + } else { + for (std::size_t idx = 0U; idx < delta; ++idx) { + if (is_forward) { + ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; + } else { + ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; + } + } + ring_begin_ += is_forward ? delta : -delta; + ring_pos_ += is_forward ? count : -count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} +} // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index 6cae6b05..2bbbcd8d 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -21,15 +21,12 @@ */ #include "file_manager/ring_buffer_open_file.hpp" -#include "events/event_system.hpp" -#include "file_manager/events.hpp" #include "file_manager/open_file_base.hpp" #include "platform/platform.hpp" #include "providers/i_provider.hpp" #include "types/repertory.hpp" #include "utils/common.hpp" #include "utils/error_utils.hpp" -#include "utils/file_utils.hpp" #include "utils/path.hpp" namespace repertory { @@ -39,24 +36,16 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, filesystem_item fsi, i_provider &provider, std::size_t ring_size) - : open_file_base(chunk_size, chunk_timeout, fsi, provider, false), + : ring_buffer_common(chunk_size, chunk_timeout, fsi, provider, ring_size, + false), ring_state_(ring_size), source_path_(utils::path::combine(buffer_directory, { utils::create_uuid_string(), - })), - total_chunks_(static_cast( - utils::divide_with_ceiling(fsi.size, chunk_size))) { - if (ring_size < 5U) { - throw std::runtime_error("ring size must be greater than or equal to 5"); - } - + })) { if (not can_handle_file(fsi.size, chunk_size, ring_size)) { throw std::runtime_error("file size is less than ring buffer size"); } - - ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U); - ring_state_.set(0U, ring_size, false); } ring_buffer_open_file::~ring_buffer_open_file() { @@ -64,20 +53,17 @@ ring_buffer_open_file::~ring_buffer_open_file() { close(); - if (nf_) { - nf_->close(); - nf_.reset(); - - if (not utils::file::file(source_path_).remove()) { - utils::error::raise_api_path_error( - function_name, get_api_path(), source_path_, - utils::get_last_error_code(), "failed to delete file"); - } + if (not nf_) { + return; } - if (reader_thread_) { - reader_thread_->join(); - reader_thread_.reset(); + nf_->close(); + nf_.reset(); + + if (not utils::file::file(source_path_).remove()) { + utils::error::raise_api_path_error( + function_name, get_api_path(), source_path_, + utils::get_last_error_code(), "failed to delete file"); } } @@ -87,358 +73,80 @@ auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, return file_size >= (static_cast(ring_size) * chunk_size); } -auto ring_buffer_open_file::check_start() -> api_error { +auto ring_buffer_open_file::native_operation( + i_open_file::native_operation_callback callback) -> api_error { + return do_io([&]() -> api_error { return callback(nf_->get_handle()); }); +} + +auto ring_buffer_open_file::on_check_start() -> bool { REPERTORY_USES_FUNCTION_NAME(); if (nf_) { - return api_error::success; + return true; } auto buffer_directory{utils::path::get_parent_path(source_path_)}; if (not utils::file::directory(buffer_directory).create_directory()) { - utils::error::raise_api_path_error( - function_name, get_api_path(), source_path_, + throw std::runtime_error( fmt::format("failed to create buffer directory|path|{}|err|{}", buffer_directory, utils::get_last_error_code())); - return api_error::os_error; } nf_ = utils::file::file::open_or_create_file(source_path_); if (not nf_ || not *nf_) { - utils::error::raise_api_path_error( - function_name, get_api_path(), source_path_, - fmt::format("failed to create buffer file|err|{}", - utils::get_last_error_code())); - return api_error::os_error; + throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", + utils::get_last_error_code())); } if (not nf_->truncate(ring_state_.size() * get_chunk_size())) { nf_->close(); nf_.reset(); - utils::error::raise_api_path_error( - function_name, get_api_path(), source_path_, - fmt::format("failed to resize buffer file|err|{}", - utils::get_last_error_code())); + throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}", + utils::get_last_error_code())); + } + + return false; +} + +auto ring_buffer_open_file::on_chunk_downloaded( + std::size_t chunk, const data_buffer &buffer) -> api_error { + return do_io([&]() -> api_error { + std::size_t bytes_written{}; + if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(), + &bytes_written)) { + return api_error::success; + } + return api_error::os_error; - } - - event_system::instance().raise(get_api_path(), source_path_); - reader_thread_ = std::make_unique([this]() { reader_thread(); }); - return api_error::success; + }); } -auto ring_buffer_open_file::close() -> bool { - stop_requested_ = true; +auto ring_buffer_open_file::on_read_chunk( + std::size_t chunk, std::size_t read_size, std::uint64_t read_offset, + data_buffer &data, std::size_t &bytes_read) -> api_error { + data_buffer buffer(read_size); + auto res = do_io([&]() -> api_error { + return nf_->read(buffer, + (((chunk % ring_state_.size()) * get_chunk_size()) + + read_offset), + &bytes_read) + ? api_error::success + : api_error::os_error; + }); - unique_mutex_lock chunk_lock(chunk_mtx_); - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - return open_file_base::close(); -} - -auto ring_buffer_open_file::download_chunk(std::size_t chunk, - bool skip_active) -> api_error { - unique_mutex_lock chunk_lock(chunk_mtx_); - const auto unlock_and_notify = [this, &chunk_lock]() { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - const auto unlock_and_return = - [&unlock_and_notify](api_error res) -> api_error { - unlock_and_notify(); - return res; - }; - - if (chunk < ring_begin_ || chunk > ring_end_) { - return unlock_and_return(api_error::invalid_ring_buffer_position); - } - - if (get_active_downloads().find(chunk) != get_active_downloads().end()) { - if (skip_active) { - return unlock_and_return(api_error::success); - } - - auto active_download = get_active_downloads().at(chunk); - unlock_and_notify(); - - return active_download->wait(); - } - - if (ring_state_[chunk % ring_state_.size()]) { - return unlock_and_return(api_error::success); - } - - auto active_download{std::make_shared()}; - get_active_downloads()[chunk] = active_download; - - data_buffer buffer; - auto data_offset{chunk * get_chunk_size()}; - auto data_size{ - chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), - }; - unlock_and_notify(); - - auto res{ - get_provider().read_file_bytes(get_api_path(), data_size, data_offset, - buffer, stop_requested_), - }; - - chunk_lock.lock(); - if (chunk < ring_begin_ || chunk > ring_end_) { - res = api_error::invalid_ring_buffer_position; - unlock_and_notify(); - active_download->notify(res); - return res; - } - - if (res == api_error::success) { - res = do_io([&]() -> api_error { - std::size_t bytes_written{}; - if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(), - &bytes_written)) { - return api_error::success; - } - - return api_error::os_error; - }); - - if (res == api_error::success) { - ring_state_[chunk % ring_state_.size()] = true; - auto progress = (static_cast(chunk + 1U) / - static_cast(total_chunks_)) * - 100.0; - event_system::instance().raise(get_api_path(), - source_path_, progress); - } - } - - get_active_downloads().erase(chunk); - unlock_and_notify(); - - active_download->notify(res); - return res; -} - -void ring_buffer_open_file::forward(std::size_t count) { - update_position(count, true); -} - -auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_; -} - -auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_[chunk % ring_state_.size()]; -} - -auto ring_buffer_open_file::native_operation( - i_open_file::native_operation_callback callback) -> api_error { - return do_io([&]() -> api_error { return callback(nf_->get_handle()); }); -} - -auto ring_buffer_open_file::read(std::size_t read_size, - std::uint64_t read_offset, - data_buffer &data) -> api_error { - if (is_directory()) { - return api_error::invalid_operation; - } - - reset_timeout(); - - read_size = - utils::calculate_read_size(get_file_size(), read_size, read_offset); - if (read_size == 0U) { - return api_error::success; - } - - auto begin_chunk{static_cast(read_offset / get_chunk_size())}; - read_offset = read_offset - (begin_chunk * get_chunk_size()); - - unique_mutex_lock read_lock(read_mtx_); - auto res = check_start(); if (res != api_error::success) { return res; } - for (std::size_t chunk = begin_chunk; - not stop_requested_ && (res == api_error::success) && (read_size > 0U); - ++chunk) { - reset_timeout(); - - if (chunk > ring_pos_) { - forward(chunk - ring_pos_); - } else if (chunk < ring_pos_) { - reverse(ring_pos_ - chunk); - } - - res = download_chunk(chunk, false); - if (res != api_error::success) { - if (res == api_error::invalid_ring_buffer_position) { - read_lock.unlock(); - - // TODO limit retry - return read(read_size, read_offset, data); - } - - return res; - } - - reset_timeout(); - - auto to_read{ - std::min(static_cast(get_chunk_size() - read_offset), - read_size), - }; - - res = do_io([&]() -> api_error { - data_buffer buffer(to_read); - - std::size_t bytes_read{}; - auto result = - nf_->read( - buffer, - (((chunk % ring_state_.size()) * get_chunk_size()) + read_offset), - &bytes_read) - ? api_error::success - : api_error::os_error; - - if (result != api_error::success) { - return result; - } - - reset_timeout(); - - data.insert(data.end(), buffer.begin(), buffer.end()); - read_size -= bytes_read; - - return result; - }); - - read_offset = 0U; - } - - return stop_requested_ ? api_error::download_stopped : res; + data.insert(data.end(), buffer.begin(), buffer.end()); + return api_error::success; } -void ring_buffer_open_file::reader_thread() { - unique_mutex_lock chunk_lock(chunk_mtx_); - auto next_chunk = ring_pos_; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - while (not stop_requested_) { - chunk_lock.lock(); - - next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; - const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { - if (stop_requested_) { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - return; - } - - if (get_read_state().all()) { - chunk_notify_.wait(chunk_lock); - next_chunk = ring_pos_; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - if (ring_state_[next_chunk % ring_state_.size()]) { - check_and_wait(); - continue; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - download_chunk(next_chunk, true); - } - - event_system::instance().raise(get_api_path(), source_path_, - api_error::download_stopped); -} - -void ring_buffer_open_file::reverse(std::size_t count) { - update_position(count, false); -} - -void ring_buffer_open_file::set(std::size_t first_chunk, - std::size_t current_chunk) { - mutex_lock chunk_lock(chunk_mtx_); - if (first_chunk >= total_chunks_) { - chunk_notify_.notify_all(); - throw std::runtime_error("first chunk must be less than total chunks"); - } - - ring_begin_ = first_chunk; - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - - if (current_chunk > ring_end_) { - chunk_notify_.notify_all(); - throw std::runtime_error( - "current chunk must be less than or equal to last chunk"); - } - - ring_pos_ = current_chunk; - ring_state_.set(0U, ring_state_.size(), true); - - chunk_notify_.notify_all(); -} - -void ring_buffer_open_file::set_api_path(const std::string &api_path) { - mutex_lock chunk_lock(chunk_mtx_); - open_file_base::set_api_path(api_path); - chunk_notify_.notify_all(); -} - -void ring_buffer_open_file::update_position(std::size_t count, - bool is_forward) { - mutex_lock chunk_lock(chunk_mtx_); - - if (is_forward) { - if ((ring_pos_ + count) > (total_chunks_ - 1U)) { - count = (total_chunks_ - 1U) - ring_pos_; - } - } else { - count = std::min(ring_pos_, count); - } - - if (is_forward ? (ring_pos_ + count) <= ring_end_ - : (ring_pos_ - count) >= ring_begin_) { - ring_pos_ += is_forward ? count : -count; - } else { - auto delta = is_forward ? count - (ring_end_ - ring_pos_) - : count - (ring_pos_ - ring_begin_); - - if (delta >= ring_state_.size()) { - ring_state_.set(0U, ring_state_.size(), false); - ring_pos_ += is_forward ? count : -count; - ring_begin_ += is_forward ? delta : -delta; - } else { - for (std::size_t idx = 0U; idx < delta; ++idx) { - if (is_forward) { - ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; - } else { - ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; - } - } - ring_begin_ += is_forward ? delta : -delta; - ring_pos_ += is_forward ? count : -count; - } - - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - } - - chunk_notify_.notify_all(); +auto ring_buffer_open_file::use_buffer( + std::size_t /* chunk */, + std::function func) -> api_error { + data_buffer buffer; + return func(buffer); } } // namespace repertory