From b312fa77953334c9fab5b7409a319291a9f45161 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 28 Dec 2024 08:24:39 -0600 Subject: [PATCH] revert --- .../include/file_manager/direct_open_file.hpp | 69 ++- .../file_manager/ring_buffer_open_file.hpp | 88 +++- .../include/file_manager/ring_file_base.hpp | 152 ------- .../src/file_manager/direct_open_file.cpp | 303 ++++++++++++- .../file_manager/ring_buffer_open_file.cpp | 413 +++++++++++++++--- .../src/file_manager/ring_file_base.cpp | 386 ---------------- 6 files changed, 771 insertions(+), 640 deletions(-) delete mode 100644 repertory/librepertory/include/file_manager/ring_file_base.hpp delete mode 100644 repertory/librepertory/src/file_manager/ring_file_base.cpp diff --git a/repertory/librepertory/include/file_manager/direct_open_file.hpp b/repertory/librepertory/include/file_manager/direct_open_file.hpp index 3fd01e5d..bb6381b6 100644 --- a/repertory/librepertory/include/file_manager/direct_open_file.hpp +++ b/repertory/librepertory/include/file_manager/direct_open_file.hpp @@ -22,7 +22,7 @@ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ -#include "file_manager/ring_file_base.hpp" +#include "file_manager/open_file_base.hpp" #include "types/repertory.hpp" @@ -30,7 +30,7 @@ namespace repertory { class i_provider; class i_upload_manager; -class direct_open_file final : public ring_file_base { +class direct_open_file final : public open_file_base { public: direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider); @@ -45,24 +45,48 @@ public: auto operator=(const direct_open_file &) noexcept -> direct_open_file & = delete; +public: + static constexpr const auto ring_size{5U}; + private: - std::array ring_data_; + std::size_t total_chunks_; -protected: - [[nodiscard]] auto handle_read_buffer( - std::size_t chunk, - std::function func) -> api_error override; +private: + mutable std::mutex chunk_mtx_; + std::condition_variable chunk_notify_; + std::mutex read_mtx_; + std::unique_ptr reader_thread_; + std::size_t ring_begin_{}; + std::array ring_data_; + std::size_t ring_end_{}; + std::size_t ring_pos_{}; + boost::dynamic_bitset<> ring_state_{ring_size}; + stop_type stop_requested_{false}; - [[nodiscard]] auto on_check_start() -> bool override; +private: + [[nodiscard]] auto check_start() -> api_error; - [[nodiscard]] auto - use_buffer(std::size_t chunk, - std::function func) - -> api_error override; + auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; + + void reader_thread(); public: - [[nodiscard]] auto get_source_path() const -> std::string override { - return "direct"; + auto close() -> bool override; + + void forward(std::size_t count); + + [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; + + [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; + + [[nodiscard]] auto get_total_chunks() const -> std::size_t { + return total_chunks_; + } + + [[nodiscard]] auto is_complete() const -> bool override { return false; } + + [[nodiscard]] auto is_write_supported() const -> bool override { + return false; } [[nodiscard]] auto native_operation(native_operation_callback /* callback */) @@ -75,6 +99,23 @@ public: -> api_error override { return api_error::not_supported; } + + [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, + data_buffer &data) -> api_error override; + + [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { + return api_error::not_supported; + } + + void reverse(std::size_t count); + + void set_api_path(const std::string &api_path) override; + + [[nodiscard]] auto + write(std::uint64_t /* write_offset */, const data_buffer & /* data */, + std::size_t & /* bytes_written */) -> api_error override { + return api_error::not_supported; + } }; } // namespace repertory diff --git a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp index f2e0b135..e63cd335 100644 --- a/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp +++ b/repertory/librepertory/include/file_manager/ring_buffer_open_file.hpp @@ -22,16 +22,16 @@ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ -#include "file_manager/ring_file_base.hpp" +#include "file_manager/open_file_base.hpp" #include "types/repertory.hpp" -#include "utils/types/file/i_file.hpp" +#include "utils/file.hpp" namespace repertory { class i_provider; class i_upload_manager; -class ring_buffer_open_file final : public ring_file_base { +class ring_buffer_open_file final : public open_file_base { public: ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, @@ -49,39 +49,87 @@ public: -> ring_buffer_open_file & = delete; private: + boost::dynamic_bitset<> ring_state_; std::string source_path_; + std::size_t total_chunks_; private: + std::condition_variable chunk_notify_; + mutable std::mutex chunk_mtx_; std::unique_ptr nf_; + std::mutex read_mtx_; + std::unique_ptr reader_thread_; + std::size_t ring_begin_{}; + std::size_t ring_end_{}; + std::size_t ring_pos_{}; + stop_type stop_requested_{false}; -protected: - [[nodiscard]] auto handle_read_buffer( - std::size_t chunk, - std::function func) -> api_error override; +private: + [[nodiscard]] auto check_start() -> api_error; - [[nodiscard]] auto on_check_start() -> bool override; + auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; - [[nodiscard]] auto - on_chunk_downloaded(std::size_t chunk, - const data_buffer &buffer) -> api_error override; - - [[nodiscard]] auto - use_buffer(std::size_t chunk, - std::function func) - -> api_error override; + void reader_thread(); public: + [[nodiscard]] static auto can_handle_file(std::uint64_t file_size, + std::size_t chunk_size, + std::size_t ring_size) -> bool; + + auto close() -> bool override; + + void forward(std::size_t count); + + [[nodiscard]] auto get_current_chunk() const -> std::size_t { + return ring_pos_; + } + + [[nodiscard]] auto get_first_chunk() const -> std::size_t { + return ring_begin_; + } + + [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } + + [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; + + [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; + + [[nodiscard]] auto get_total_chunks() const -> std::size_t { + return total_chunks_; + } + + [[nodiscard]] auto is_complete() const -> bool override { return false; } + + [[nodiscard]] auto is_write_supported() const -> bool override { + return false; + } + + [[nodiscard]] auto + native_operation(native_operation_callback callback) -> api_error override; + [[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */, native_operation_callback /* callback */) -> api_error override { return api_error::not_supported; } - [[nodiscard]] auto - native_operation(native_operation_callback callback) -> api_error override; + [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, + data_buffer &data) -> api_error override; - [[nodiscard]] auto get_source_path() const -> std::string override { - return source_path_; + [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { + return api_error::not_supported; + } + + void reverse(std::size_t count); + + void set(std::size_t first_chunk, std::size_t current_chunk); + + void set_api_path(const std::string &api_path) override; + + [[nodiscard]] auto + write(std::uint64_t /* write_offset */, const data_buffer & /* data */, + std::size_t & /* bytes_written */) -> api_error override { + return api_error::not_supported; } }; } // namespace repertory diff --git a/repertory/librepertory/include/file_manager/ring_file_base.hpp b/repertory/librepertory/include/file_manager/ring_file_base.hpp deleted file mode 100644 index 0339a1a1..00000000 --- a/repertory/librepertory/include/file_manager/ring_file_base.hpp +++ /dev/null @@ -1,152 +0,0 @@ -/* - Copyright <2018-2024> - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ -#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ -#define REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ - -#include "file_manager/open_file_base.hpp" - -#include "types/repertory.hpp" - -namespace repertory { -class i_provider; -class i_upload_manager; - -class ring_file_base : public open_file_base { -public: - ring_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout, - filesystem_item fsi, i_provider &provider, - std::size_t ring_size, bool disable_io); - - ~ring_file_base() override = default; - -public: - static constexpr const auto min_ring_size{5U}; - -public: - ring_file_base() = delete; - ring_file_base(const ring_file_base &) noexcept = delete; - ring_file_base(ring_file_base &&) noexcept = delete; - auto operator=(ring_file_base &&) noexcept -> ring_file_base & = delete; - auto operator=(const ring_file_base &) noexcept -> ring_file_base & = delete; - -private: - boost::dynamic_bitset<> ring_state_; - std::size_t total_chunks_; - -private: - std::condition_variable chunk_notify_; - mutable std::mutex chunk_mtx_; - std::mutex read_mtx_; - std::unique_ptr reader_thread_; - std::size_t ring_begin_{}; - std::size_t ring_end_{}; - std::size_t ring_pos_{}; - stop_type stop_requested_{false}; - -private: - [[nodiscard]] auto check_start() -> api_error; - - auto download_chunk(std::size_t chunk, bool skip_active) -> api_error; - - void reader_thread(); - - void update_position(std::size_t count, bool is_forward); - -protected: - [[nodiscard]] auto get_read_state_size() const -> std::size_t; - - [[nodiscard]] virtual auto handle_read_buffer( - std::size_t chunk, - std::function func) -> api_error = 0; - - [[nodiscard]] auto has_reader_thread() -> bool { - return reader_thread_ != nullptr; - } - - [[nodiscard]] virtual auto on_check_start() -> bool = 0; - - [[nodiscard]] virtual auto - on_chunk_downloaded(std::size_t /* chunk */, - const data_buffer & /* buffer */) -> api_error { - return api_error::success; - } - - [[nodiscard]] virtual auto - use_buffer(std::size_t chunk, - std::function func) - -> api_error = 0; - -public: - [[nodiscard]] static auto can_handle_file(std::uint64_t file_size, - std::size_t chunk_size, - std::size_t ring_size) -> bool; - - auto close() -> bool override; - - void forward(std::size_t count); - - [[nodiscard]] auto get_current_chunk() const -> std::size_t { - return ring_pos_; - } - - [[nodiscard]] auto get_first_chunk() const -> std::size_t { - return ring_begin_; - } - - [[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; } - - [[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override; - - [[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override; - - [[nodiscard]] auto get_total_chunks() const -> std::size_t { - return total_chunks_; - } - - [[nodiscard]] auto is_complete() const -> bool override { return false; } - - [[nodiscard]] auto is_write_supported() const -> bool override { - return false; - } - - [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error override; - - [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override { - return api_error::not_supported; - } - - void reverse(std::size_t count); - - void set(std::size_t first_chunk, std::size_t current_chunk); - - void set_api_path(const std::string &api_path) override; - - [[nodiscard]] auto - write(std::uint64_t /* write_offset */, const data_buffer & /* data */, - std::size_t & /* bytes_written */) -> api_error override { - return api_error::not_supported; - } -}; -} // namespace repertory - -#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ diff --git a/repertory/librepertory/src/file_manager/direct_open_file.cpp b/repertory/librepertory/src/file_manager/direct_open_file.cpp index 5a5321ee..9c3a6f49 100644 --- a/repertory/librepertory/src/file_manager/direct_open_file.cpp +++ b/repertory/librepertory/src/file_manager/direct_open_file.cpp @@ -21,28 +21,303 @@ */ #include "file_manager/direct_open_file.hpp" +#include "events/event_system.hpp" +#include "file_manager/events.hpp" +#include "file_manager/open_file_base.hpp" +#include "providers/i_provider.hpp" +#include "types/repertory.hpp" +#include "utils/common.hpp" + namespace repertory { direct_open_file::direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider) - : ring_file_base(chunk_size, chunk_timeout, fsi, provider, min_ring_size, - true) {} + : open_file_base(chunk_size, chunk_timeout, fsi, provider, true), + total_chunks_(static_cast( + utils::divide_with_ceiling(fsi.size, chunk_size))) { + if (fsi.size > 0U) { + ring_state_.resize(std::min(total_chunks_, ring_state_.size())); -direct_open_file::~direct_open_file() { close(); } - -auto direct_open_file::handle_read_buffer( - std::size_t chunk, - std::function func) -> api_error { - return func(ring_data_.at(chunk % get_read_state_size())); + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + ring_state_.set(0U, ring_state_.size(), false); + } } -auto direct_open_file::on_check_start() -> bool { - return (get_file_size() == 0U || has_reader_thread()); +direct_open_file::~direct_open_file() { + REPERTORY_USES_FUNCTION_NAME(); + + close(); + + if (reader_thread_) { + reader_thread_->join(); + reader_thread_.reset(); + } } -auto direct_open_file::use_buffer( - std::size_t chunk, - std::function func) -> api_error { - return func(ring_data_.at(chunk % get_read_state_size())); +auto direct_open_file::check_start() -> api_error { + if (get_file_size() == 0U || reader_thread_) { + return api_error::success; + } + + event_system::instance().raise(get_api_path(), "direct"); + reader_thread_ = std::make_unique([this]() { reader_thread(); }); + return api_error::success; +} + +auto direct_open_file::close() -> bool { + stop_requested_ = true; + + unique_mutex_lock chunk_lock(chunk_mtx_); + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + return open_file_base::close(); +} + +auto direct_open_file::download_chunk(std::size_t chunk, + bool skip_active) -> api_error { + unique_mutex_lock chunk_lock(chunk_mtx_); + const auto unlock_and_notify = [this, &chunk_lock]() { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + const auto unlock_and_return = + [&unlock_and_notify](api_error res) -> api_error { + unlock_and_notify(); + return res; + }; + + if (chunk < ring_begin_ || chunk > ring_end_) { + return unlock_and_return(api_error::invalid_ring_buffer_position); + } + + if (active_downloads_.find(chunk) != active_downloads_.end()) { + if (skip_active) { + return unlock_and_return(api_error::success); + } + + auto active_download = active_downloads_.at(chunk); + unlock_and_notify(); + + return active_download->wait(); + } + + if (ring_state_[chunk % ring_state_.size()]) { + return unlock_and_return(api_error::success); + } + + auto active_download{std::make_shared()}; + active_downloads_[chunk] = active_download; + + auto &buffer = ring_data_.at(chunk % ring_state_.size()); + auto data_offset{chunk * get_chunk_size()}; + auto data_size{ + chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), + }; + + auto res{ + get_provider().read_file_bytes(get_api_path(), data_size, data_offset, + buffer, stop_requested_), + }; + + if (res == api_error::success) { + ring_state_[chunk % ring_state_.size()] = true; + auto progress = + (static_cast(chunk + 1U) / static_cast(total_chunks_)) * + 100.0; + event_system::instance().raise(get_api_path(), "direct", + progress); + } + + active_downloads_.erase(chunk); + unlock_and_notify(); + + active_download->notify(res); + return res; +} + +void direct_open_file::forward(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + if ((ring_pos_ + count) > (total_chunks_ - 1U)) { + count = (total_chunks_ - 1U) - ring_pos_; + } + + if ((ring_pos_ + count) <= ring_end_) { + ring_pos_ += count; + } else { + auto added = count - (ring_end_ - ring_pos_); + if (added >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), false); + ring_pos_ += count; + ring_begin_ += added; + } else { + for (std::size_t idx = 0U; idx < added; ++idx) { + ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; + } + ring_begin_ += added; + ring_pos_ += count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} + +auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_; +} + +auto direct_open_file::get_read_state(std::size_t chunk) const -> bool { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_[chunk % ring_state_.size()]; +} + +void direct_open_file::reverse(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + count = std::min(ring_pos_, count); + + if ((ring_pos_ - count) >= ring_begin_) { + ring_pos_ -= count; + } else { + auto removed = count - (ring_pos_ - ring_begin_); + if (removed >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), false); + ring_pos_ -= count; + ring_begin_ = ring_pos_; + } else { + for (std::size_t idx = 0U; idx < removed; ++idx) { + ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; + } + ring_begin_ -= removed; + ring_pos_ -= count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} + +auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, + data_buffer &data) -> api_error { + if (is_directory()) { + return api_error::invalid_operation; + } + + reset_timeout(); + + read_size = + utils::calculate_read_size(get_file_size(), read_size, read_offset); + if (read_size == 0U) { + return api_error::success; + } + + auto begin_chunk{static_cast(read_offset / get_chunk_size())}; + read_offset = read_offset - (begin_chunk * get_chunk_size()); + + unique_mutex_lock read_lock(read_mtx_); + auto res = check_start(); + if (res != api_error::success) { + return res; + } + + for (std::size_t chunk = begin_chunk; + not stop_requested_ && (res == api_error::success) && (read_size > 0U); + ++chunk) { + reset_timeout(); + + if (chunk > ring_pos_) { + forward(chunk - ring_pos_); + } else if (chunk < ring_pos_) { + reverse(ring_pos_ - chunk); + } + + res = download_chunk(chunk, false); + if (res != api_error::success) { + if (not stop_requested_ && + res == api_error::invalid_ring_buffer_position) { + read_lock.unlock(); + + // TODO limit retry + return read(read_size, read_offset, data); + } + + return res; + } + + reset_timeout(); + + auto to_read{ + std::min(static_cast(get_chunk_size() - read_offset), + read_size), + }; + + auto &buffer = ring_data_.at(chunk % ring_state_.size()); + auto begin = + std::next(buffer.begin(), static_cast(read_offset)); + auto end = std::next(begin, static_cast(to_read)); + data.insert(data.end(), begin, end); + + read_offset = 0U; + read_size -= to_read; + } + + return stop_requested_ ? api_error::download_stopped : res; +} + +void direct_open_file::reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + + chunk_lock.lock(); + check_and_wait(); + } + + event_system::instance().raise(get_api_path(), "direct", + api_error::download_stopped); +} + +void direct_open_file::set_api_path(const std::string &api_path) { + mutex_lock chunk_lock(chunk_mtx_); + open_file_base::set_api_path(api_path); + chunk_notify_.notify_all(); } } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index da77ca10..9f61a586 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -21,10 +21,16 @@ */ #include "file_manager/ring_buffer_open_file.hpp" +#include "events/event_system.hpp" +#include "file_manager/events.hpp" +#include "file_manager/open_file_base.hpp" #include "platform/platform.hpp" +#include "providers/i_provider.hpp" +#include "types/repertory.hpp" #include "utils/common.hpp" #include "utils/error_utils.hpp" -#include "utils/file.hpp" +#include "utils/file_utils.hpp" +#include "utils/path.hpp" namespace repertory { ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, @@ -33,12 +39,25 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, filesystem_item fsi, i_provider &provider, std::size_t ring_size) - : ring_file_base(chunk_size, chunk_timeout, fsi, provider, ring_size, - false), + : open_file_base(chunk_size, chunk_timeout, fsi, provider, false), + ring_state_(ring_size), source_path_(utils::path::combine(buffer_directory, { utils::create_uuid_string(), - })) {} + })), + total_chunks_(static_cast( + utils::divide_with_ceiling(fsi.size, chunk_size))) { + if (ring_size < 5U) { + throw std::runtime_error("ring size must be greater than or equal to 5"); + } + + if (not can_handle_file(fsi.size, chunk_size, ring_size)) { + throw std::runtime_error("file size is less than ring buffer size"); + } + + ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U); + ring_state_.set(0U, ring_size, false); +} ring_buffer_open_file::~ring_buffer_open_file() { REPERTORY_USES_FUNCTION_NAME(); @@ -48,93 +67,379 @@ ring_buffer_open_file::~ring_buffer_open_file() { if (nf_) { nf_->close(); nf_.reset(); + + if (not utils::file::file(source_path_).remove()) { + utils::error::raise_api_path_error( + function_name, get_api_path(), source_path_, + utils::get_last_error_code(), "failed to delete file"); + } } - if (utils::file::file(get_source_path()).remove()) { - return; + if (reader_thread_) { + reader_thread_->join(); + reader_thread_.reset(); } - - utils::error::raise_api_path_error( - function_name, get_api_path(), get_source_path(), - utils::get_last_error_code(), "failed to delete file"); } -auto ring_buffer_open_file::handle_read_buffer( - std::size_t /* chunk */, - std::function func) -> api_error { - data_buffer buffer; - return func(buffer); +auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, + std::size_t chunk_size, + std::size_t ring_size) -> bool { + return file_size >= (static_cast(ring_size) * chunk_size); } -auto ring_buffer_open_file::on_check_start() -> bool { +auto ring_buffer_open_file::check_start() -> api_error { REPERTORY_USES_FUNCTION_NAME(); if (nf_) { - return true; + return api_error::success; } - auto buffer_directory{utils::path::get_parent_path(get_source_path())}; + auto buffer_directory{utils::path::get_parent_path(source_path_)}; if (not utils::file::directory(buffer_directory).create_directory()) { - throw std::runtime_error( + utils::error::raise_api_path_error( + function_name, get_api_path(), source_path_, fmt::format("failed to create buffer directory|path|{}|err|{}", buffer_directory, utils::get_last_error_code())); + return api_error::os_error; } - nf_ = utils::file::file::open_or_create_file(get_source_path()); + nf_ = utils::file::file::open_or_create_file(source_path_); if (not nf_ || not *nf_) { - throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", - utils::get_last_error_code())); + utils::error::raise_api_path_error( + function_name, get_api_path(), source_path_, + fmt::format("failed to create buffer file|err|{}", + utils::get_last_error_code())); + return api_error::os_error; } - if (not nf_->truncate(get_read_state_size() * get_chunk_size())) { - throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}", - utils::get_last_error_code())); - + if (not nf_->truncate(ring_state_.size() * get_chunk_size())) { nf_->close(); nf_.reset(); + + utils::error::raise_api_path_error( + function_name, get_api_path(), source_path_, + fmt::format("failed to resize buffer file|err|{}", + utils::get_last_error_code())); + return api_error::os_error; } - return false; + event_system::instance().raise(get_api_path(), source_path_); + reader_thread_ = std::make_unique([this]() { reader_thread(); }); + return api_error::success; } -auto ring_buffer_open_file::on_chunk_downloaded( - std::size_t chunk, const data_buffer &buffer) -> api_error { - return do_io([&]() -> api_error { - std::size_t bytes_written{}; - if (nf_->write(buffer, (chunk % get_read_state_size()) * get_chunk_size(), - &bytes_written)) { - return api_error::success; +auto ring_buffer_open_file::close() -> bool { + stop_requested_ = true; + + unique_mutex_lock chunk_lock(chunk_mtx_); + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + return open_file_base::close(); +} + +auto ring_buffer_open_file::download_chunk(std::size_t chunk, + bool skip_active) -> api_error { + unique_mutex_lock chunk_lock(chunk_mtx_); + const auto unlock_and_notify = [this, &chunk_lock]() { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + const auto unlock_and_return = + [&unlock_and_notify](api_error res) -> api_error { + unlock_and_notify(); + return res; + }; + + if (chunk < ring_begin_ || chunk > ring_end_) { + return unlock_and_return(api_error::invalid_ring_buffer_position); + } + + if (get_active_downloads().find(chunk) != get_active_downloads().end()) { + if (skip_active) { + return unlock_and_return(api_error::success); } - return api_error::os_error; - }); + auto active_download = get_active_downloads().at(chunk); + unlock_and_notify(); + + return active_download->wait(); + } + + if (ring_state_[chunk % ring_state_.size()]) { + return unlock_and_return(api_error::success); + } + + auto active_download{std::make_shared()}; + get_active_downloads()[chunk] = active_download; + + data_buffer buffer; + auto data_offset{chunk * get_chunk_size()}; + auto data_size{ + chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), + }; + + auto res{ + get_provider().read_file_bytes(get_api_path(), data_size, data_offset, + buffer, stop_requested_), + }; + + if (res == api_error::success) { + res = do_io([&]() -> api_error { + std::size_t bytes_written{}; + if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(), + &bytes_written)) { + return api_error::success; + } + + return api_error::os_error; + }); + + if (res == api_error::success) { + ring_state_[chunk % ring_state_.size()] = true; + auto progress = (static_cast(chunk + 1U) / + static_cast(total_chunks_)) * + 100.0; + event_system::instance().raise(get_api_path(), + source_path_, progress); + } + } + + get_active_downloads().erase(chunk); + unlock_and_notify(); + + active_download->notify(res); + return res; } -auto ring_buffer_open_file::native_operation(native_operation_callback callback) - -> api_error { +void ring_buffer_open_file::forward(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + if ((ring_pos_ + count) > (total_chunks_ - 1U)) { + count = (total_chunks_ - 1U) - ring_pos_; + } + + if ((ring_pos_ + count) <= ring_end_) { + ring_pos_ += count; + } else { + auto added = count - (ring_end_ - ring_pos_); + if (added >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), false); + ring_pos_ += count; + ring_begin_ += added; + } else { + for (std::size_t idx = 0U; idx < added; ++idx) { + ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; + } + ring_begin_ += added; + ring_pos_ += count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} + +auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_; +} + +auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool { + recur_mutex_lock file_lock(get_mutex()); + return ring_state_[chunk % ring_state_.size()]; +} + +auto ring_buffer_open_file::native_operation( + i_open_file::native_operation_callback callback) -> api_error { return do_io([&]() -> api_error { return callback(nf_->get_handle()); }); } -auto ring_buffer_open_file::use_buffer( - std::size_t chunk, - std::function func) -> api_error { - data_buffer buffer; - buffer.resize(get_chunk_size()); +auto ring_buffer_open_file::read(std::size_t read_size, + std::uint64_t read_offset, + data_buffer &data) -> api_error { + if (is_directory()) { + return api_error::invalid_operation; + } - auto res = do_io([&]() -> api_error { - std::size_t bytes_read{}; - auto result = - nf_->read(buffer, (chunk % get_read_state_size()) * get_chunk_size(), - &bytes_read) - ? api_error::success - : api_error::os_error; - buffer.resize(bytes_read); - return result; - }); + reset_timeout(); + + read_size = + utils::calculate_read_size(get_file_size(), read_size, read_offset); + if (read_size == 0U) { + return api_error::success; + } + + auto begin_chunk{static_cast(read_offset / get_chunk_size())}; + read_offset = read_offset - (begin_chunk * get_chunk_size()); + + unique_mutex_lock read_lock(read_mtx_); + auto res = check_start(); if (res != api_error::success) { return res; } - return func(buffer); + for (std::size_t chunk = begin_chunk; + not stop_requested_ && (res == api_error::success) && (read_size > 0U); + ++chunk) { + reset_timeout(); + + if (chunk > ring_pos_) { + forward(chunk - ring_pos_); + } else if (chunk < ring_pos_) { + reverse(ring_pos_ - chunk); + } + + res = download_chunk(chunk, false); + if (res != api_error::success) { + if (not stop_requested_ && + res == api_error::invalid_ring_buffer_position) { + read_lock.unlock(); + + // TODO limit retry + return read(read_size, read_offset, data); + } + + return res; + } + + reset_timeout(); + + auto to_read{ + std::min(static_cast(get_chunk_size() - read_offset), + read_size), + }; + + res = do_io([&]() -> api_error { + data_buffer buffer(to_read); + + std::size_t bytes_read{}; + auto result = + nf_->read( + buffer, + (((chunk % ring_state_.size()) * get_chunk_size()) + read_offset), + &bytes_read) + ? api_error::success + : api_error::os_error; + + if (result != api_error::success) { + return result; + } + + reset_timeout(); + + data.insert(data.end(), buffer.begin(), buffer.end()); + read_size -= bytes_read; + + return result; + }); + + read_offset = 0U; + } + + return stop_requested_ ? api_error::download_stopped : res; +} + +void ring_buffer_open_file::reader_thread() { + unique_mutex_lock chunk_lock(chunk_mtx_); + auto next_chunk = ring_pos_; + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + while (not stop_requested_) { + chunk_lock.lock(); + + next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; + const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { + if (stop_requested_) { + chunk_notify_.notify_all(); + chunk_lock.unlock(); + return; + } + + if (get_read_state().all()) { + chunk_notify_.wait(chunk_lock); + next_chunk = ring_pos_; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + }; + + if (ring_state_[next_chunk % ring_state_.size()]) { + check_and_wait(); + continue; + } + + chunk_notify_.notify_all(); + chunk_lock.unlock(); + + download_chunk(next_chunk, true); + + chunk_lock.lock(); + check_and_wait(); + } + + event_system::instance().raise(get_api_path(), source_path_, + api_error::download_stopped); +} + +void ring_buffer_open_file::reverse(std::size_t count) { + mutex_lock chunk_lock(chunk_mtx_); + count = std::min(ring_pos_, count); + + if ((ring_pos_ - count) >= ring_begin_) { + ring_pos_ -= count; + } else { + auto removed = count - (ring_pos_ - ring_begin_); + if (removed >= ring_state_.size()) { + ring_state_.set(0U, ring_state_.size(), false); + ring_pos_ -= count; + ring_begin_ = ring_pos_; + } else { + for (std::size_t idx = 0U; idx < removed; ++idx) { + ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; + } + ring_begin_ -= removed; + ring_pos_ -= count; + } + + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + } + + chunk_notify_.notify_all(); +} + +void ring_buffer_open_file::set(std::size_t first_chunk, + std::size_t current_chunk) { + mutex_lock chunk_lock(chunk_mtx_); + if (first_chunk >= total_chunks_) { + chunk_notify_.notify_all(); + throw std::runtime_error("first chunk must be less than total chunks"); + } + + ring_begin_ = first_chunk; + ring_end_ = + std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); + + if (current_chunk > ring_end_) { + chunk_notify_.notify_all(); + throw std::runtime_error( + "current chunk must be less than or equal to last chunk"); + } + + ring_pos_ = current_chunk; + ring_state_.set(0U, ring_state_.size(), true); + + chunk_notify_.notify_all(); +} + +void ring_buffer_open_file::set_api_path(const std::string &api_path) { + mutex_lock chunk_lock(chunk_mtx_); + open_file_base::set_api_path(api_path); + chunk_notify_.notify_all(); } } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_file_base.cpp b/repertory/librepertory/src/file_manager/ring_file_base.cpp deleted file mode 100644 index 989d4806..00000000 --- a/repertory/librepertory/src/file_manager/ring_file_base.cpp +++ /dev/null @@ -1,386 +0,0 @@ -/* - Copyright <2018-2024> - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ -#include "file_manager/ring_file_base.hpp" - -#include "events/event_system.hpp" -#include "file_manager/events.hpp" -#include "file_manager/open_file_base.hpp" -#include "platform/platform.hpp" -#include "providers/i_provider.hpp" -#include "types/repertory.hpp" -#include "utils/common.hpp" -#include "utils/error_utils.hpp" - -namespace repertory { -ring_file_base::ring_file_base(std::uint64_t chunk_size, - std::uint8_t chunk_timeout, filesystem_item fsi, - i_provider &provider, std::size_t ring_size, - bool disable_io) - : open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io), - ring_state_(ring_size), - total_chunks_(static_cast( - utils::divide_with_ceiling(fsi.size, chunk_size))) { - if (disable_io) { - if (fsi.size > 0U) { - ring_state_.resize(std::min(total_chunks_, ring_state_.size())); - } - } else { - if (ring_state_.size() < min_ring_size) { - throw std::runtime_error(fmt::format( - "ring size must be greater than or equal to {}", min_ring_size)); - } - - if (not can_handle_file(fsi.size, chunk_size, get_read_state_size())) { - throw std::runtime_error("file size is less than ring buffer size"); - } - } - - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - ring_state_.set(0U, ring_state_.size(), false); -} - -auto ring_file_base::can_handle_file(std::uint64_t file_size, - std::size_t chunk_size, - std::size_t ring_size) -> bool { - return file_size >= (static_cast(ring_size) * chunk_size); -} - -auto ring_file_base::check_start() -> api_error { - REPERTORY_USES_FUNCTION_NAME(); - - try { - if (on_check_start()) { - return api_error::success; - } - - event_system::instance().raise(get_api_path(), - get_source_path()); - reader_thread_ = - std::make_unique([this]() { reader_thread(); }); - return api_error::success; - } catch (const std::exception &ex) { - utils::error::raise_api_path_error(function_name, get_api_path(), - get_source_path(), ex, - "failed to start"); - } - - return api_error::error; -} - -auto ring_file_base::close() -> bool { - stop_requested_ = true; - - unique_mutex_lock chunk_lock(chunk_mtx_); - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - auto ret = open_file_base::close(); - - if (reader_thread_) { - reader_thread_->join(); - reader_thread_.reset(); - } - - return ret; -} - -auto ring_file_base::download_chunk(std::size_t chunk, - bool skip_active) -> api_error { - unique_mutex_lock chunk_lock(chunk_mtx_); - const auto unlock_and_notify = [this, &chunk_lock]() { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - const auto unlock_and_return = - [&unlock_and_notify](api_error res) -> api_error { - unlock_and_notify(); - return res; - }; - - if (chunk < ring_begin_ || chunk > ring_end_) { - return unlock_and_return(api_error::invalid_ring_buffer_position); - } - - if (get_active_downloads().find(chunk) != get_active_downloads().end()) { - if (skip_active) { - return unlock_and_return(api_error::success); - } - - auto active_download = get_active_downloads().at(chunk); - unlock_and_notify(); - - return active_download->wait(); - } - - if (ring_state_[chunk % ring_state_.size()]) { - return unlock_and_return(api_error::success); - } - - auto active_download{std::make_shared()}; - get_active_downloads()[chunk] = active_download; - - auto data_offset{chunk * get_chunk_size()}; - auto data_size{ - chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), - }; - - auto res = handle_read_buffer(chunk, [&](auto &&buffer) { - auto result{ - get_provider().read_file_bytes(get_api_path(), data_size, data_offset, - buffer, stop_requested_), - }; - - if (result != api_error::success) { - return result; - } - result = on_chunk_downloaded(chunk, buffer); - if (result != api_error::success) { - return result; - } - - ring_state_[chunk % ring_state_.size()] = true; - auto progress = - (static_cast(chunk + 1U) / static_cast(total_chunks_)) * - 100.0; - event_system::instance().raise( - get_api_path(), get_source_path(), progress); - return api_error::success; - }); - - get_active_downloads().erase(chunk); - unlock_and_notify(); - - active_download->notify(res); - return res; -} - -void ring_file_base::forward(std::size_t count) { - update_position(count, true); -} - -auto ring_file_base::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_; -} - -auto ring_file_base::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_[chunk % ring_state_.size()]; -} - -auto ring_file_base::get_read_state_size() const -> std::size_t { - recur_mutex_lock file_lock(get_mutex()); - return ring_state_.size(); -} - -auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset, - data_buffer &data) -> api_error { - if (is_directory()) { - return api_error::invalid_operation; - } - - reset_timeout(); - - read_size = - utils::calculate_read_size(get_file_size(), read_size, read_offset); - if (read_size == 0U) { - return api_error::success; - } - - auto begin_chunk{static_cast(read_offset / get_chunk_size())}; - read_offset = read_offset - (begin_chunk * get_chunk_size()); - - unique_mutex_lock read_lock(read_mtx_); - auto res = check_start(); - if (res != api_error::success) { - return res; - } - - for (std::size_t chunk = begin_chunk; - not stop_requested_ && (res == api_error::success) && (read_size > 0U); - ++chunk) { - reset_timeout(); - - if (chunk > ring_pos_) { - forward(chunk - ring_pos_); - } else if (chunk < ring_pos_) { - reverse(ring_pos_ - chunk); - } - - res = download_chunk(chunk, false); - if (res != api_error::success) { - if (not stop_requested_ && - res == api_error::invalid_ring_buffer_position) { - read_lock.unlock(); - - // TODO limit retry - return read(read_size, read_offset, data); - } - - return res; - } - - reset_timeout(); - - auto to_read{ - std::min(static_cast(get_chunk_size() - read_offset), - read_size), - }; - - res = use_buffer( - chunk, [&data, &read_offset, &to_read](auto &&buffer) -> api_error { - auto begin = - std::next(buffer.begin(), static_cast(read_offset)); - auto end = std::next(begin, static_cast(to_read)); - data.insert(data.end(), begin, end); - return api_error::success; - }); - - reset_timeout(); - read_size -= to_read; - read_offset = 0U; - } - - return stop_requested_ ? api_error::download_stopped : res; -} - -void ring_file_base::reader_thread() { - unique_mutex_lock chunk_lock(chunk_mtx_); - auto next_chunk = ring_pos_; - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - while (not stop_requested_) { - chunk_lock.lock(); - - next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U; - const auto check_and_wait = [this, &chunk_lock, &next_chunk]() { - if (stop_requested_) { - chunk_notify_.notify_all(); - chunk_lock.unlock(); - return; - } - - if (get_read_state().all()) { - chunk_notify_.wait(chunk_lock); - next_chunk = ring_pos_; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - }; - - if (ring_state_[next_chunk % ring_state_.size()]) { - check_and_wait(); - continue; - } - - chunk_notify_.notify_all(); - chunk_lock.unlock(); - - download_chunk(next_chunk, true); - - chunk_lock.lock(); - check_and_wait(); - } - - event_system::instance().raise( - get_api_path(), get_source_path(), api_error::download_stopped); -} - -void ring_file_base::reverse(std::size_t count) { - update_position(count, false); -} - -void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) { - mutex_lock chunk_lock(chunk_mtx_); - if (first_chunk >= total_chunks_) { - chunk_notify_.notify_all(); - throw std::runtime_error("first chunk must be less than total chunks"); - } - - ring_begin_ = first_chunk; - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - - if (current_chunk > ring_end_) { - chunk_notify_.notify_all(); - throw std::runtime_error( - "current chunk must be less than or equal to last chunk"); - } - - ring_pos_ = current_chunk; - ring_state_.set(0U, ring_state_.size(), true); - - chunk_notify_.notify_all(); -} - -void ring_file_base::set_api_path(const std::string &api_path) { - mutex_lock chunk_lock(chunk_mtx_); - open_file_base::set_api_path(api_path); - chunk_notify_.notify_all(); -} - -void ring_file_base::update_position(std::size_t count, bool is_forward) { - mutex_lock chunk_lock(chunk_mtx_); - - if (is_forward) { - if ((ring_pos_ + count) > (total_chunks_ - 1U)) { - count = (total_chunks_ - 1U) - ring_pos_; - } - } else { - count = std::min(ring_pos_, count); - } - - if (is_forward ? (ring_pos_ + count) <= ring_end_ - : (ring_pos_ - count) >= ring_begin_) { - ring_pos_ += is_forward ? count : -count; - } else { - auto delta = is_forward ? count - (ring_end_ - ring_pos_) - : count - (ring_pos_ - ring_begin_); - - if (delta >= ring_state_.size()) { - ring_state_.set(0U, ring_state_.size(), false); - ring_pos_ += is_forward ? count : -count; - ring_begin_ += is_forward ? delta : -delta; - } else { - for (std::size_t idx = 0U; idx < delta; ++idx) { - if (is_forward) { - ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; - } else { - ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; - } - } - - ring_begin_ += is_forward ? delta : -delta; - ring_pos_ += is_forward ? count : -count; - } - - ring_end_ = - std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); - } - - chunk_notify_.notify_all(); -} -} // namespace repertory