diff --git a/repertory/librepertory/include/file_manager/open_file_base.hpp b/repertory/librepertory/include/file_manager/open_file_base.hpp index 57c9a12c..e075ec98 100644 --- a/repertory/librepertory/include/file_manager/open_file_base.hpp +++ b/repertory/librepertory/include/file_manager/open_file_base.hpp @@ -98,7 +98,7 @@ public: [[nodiscard]] auto get_result() -> api_error; }; -protected: +private: std::uint64_t chunk_size_; std::uint8_t chunk_timeout_; filesystem_item fsi_; @@ -109,30 +109,57 @@ protected: private: api_error error_{api_error::success}; mutable std::mutex error_mtx_; + mutable std::recursive_mutex file_mtx_; stop_type io_stop_requested_{false}; std::unique_ptr io_thread_; - -protected: - std::unordered_map> active_downloads_; - mutable std::recursive_mutex file_mtx_; - std::atomic last_access_{ - std::chrono::system_clock::now()}; - bool modified_{false}; - std::unique_ptr nf_; mutable std::mutex io_thread_mtx_; std::condition_variable io_thread_notify_; std::deque> io_thread_queue_; + std::atomic last_access_{ + std::chrono::system_clock::now(), + }; + bool modified_{false}; bool removed_{false}; +protected: + std::unordered_map> active_downloads_; + std::unique_ptr nf_; + private: void file_io_thread(); protected: [[nodiscard]] auto do_io(std::function action) -> api_error; + [[nodiscard]] auto get_mutex() const -> std::recursive_mutex &; + + [[nodiscard]] auto get_last_chunk_size() const -> std::size_t; + + [[nodiscard]] auto get_provider() -> i_provider & { return provider_; } + + [[nodiscard]] auto get_provider() const -> const i_provider & { + return provider_; + } + + [[nodiscard]] auto is_removed() const -> bool; + + void notify_io(); + void reset_timeout(); - auto set_api_error(const api_error &e) -> api_error; + auto set_api_error(const api_error &err) -> api_error; + + void set_file_size(std::uint64_t size); + + void set_last_chunk_size(std::size_t size); + + void set_modified(bool modified); + + void set_removed(bool removed); + + void set_source_path(std::string source_path); + + void wait_for_io(stop_type &stop_requested); public: void add(std::uint64_t handle, open_file_data ofd) override; @@ -171,9 +198,7 @@ public: [[nodiscard]] auto get_open_file_count() const -> std::size_t override; - [[nodiscard]] auto get_source_path() const -> std::string override { - return fsi_.source_path; - } + [[nodiscard]] auto get_source_path() const -> std::string override; [[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override; diff --git a/repertory/librepertory/src/file_manager/direct_open_file.cpp b/repertory/librepertory/src/file_manager/direct_open_file.cpp index 4df6758e..0d981712 100644 --- a/repertory/librepertory/src/file_manager/direct_open_file.cpp +++ b/repertory/librepertory/src/file_manager/direct_open_file.cpp @@ -34,8 +34,8 @@ direct_open_file::direct_open_file(std::uint64_t chunk_size, filesystem_item fsi, i_provider &provider) : open_file_base(chunk_size, chunk_timeout, fsi, provider, true), total_chunks_(static_cast( - utils::divide_with_ceiling(fsi_.size, chunk_size))) { - if (fsi_.size > 0U) { + utils::divide_with_ceiling(fsi.size, chunk_size))) { + if (fsi.size > 0U) { ring_state_.resize(std::min(total_chunks_, ring_state_.size())); ring_end_ = @@ -56,11 +56,11 @@ direct_open_file::~direct_open_file() { } auto direct_open_file::check_start() -> api_error { - if (fsi_.size == 0U || reader_thread_) { + if (get_file_size() == 0U || reader_thread_) { return api_error::success; } - event_system::instance().raise(fsi_.api_path, "direct"); + event_system::instance().raise(get_api_path(), "direct"); reader_thread_ = std::make_unique([this]() { reader_thread(); }); return api_error::success; } @@ -113,15 +113,15 @@ auto direct_open_file::download_chunk(std::size_t chunk, ring_state_[chunk % ring_state_.size()] = false; auto &buffer = ring_data_.at(chunk % ring_state_.size()); - auto data_offset{chunk * chunk_size_}; + auto data_offset{chunk * get_chunk_size()}; auto data_size{ - chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_, + chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), }; unlock_and_notify(); auto res{ - provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer, - stop_requested_), + get_provider().read_file_bytes(get_api_path(), data_size, data_offset, + buffer, stop_requested_), }; chunk_lock.lock(); @@ -129,7 +129,7 @@ auto direct_open_file::download_chunk(std::size_t chunk, auto progress = (static_cast(chunk + 1U) / static_cast(total_chunks_)) * 100.0; - event_system::instance().raise(fsi_.api_path, "direct", + event_system::instance().raise(get_api_path(), "direct", progress); res = (chunk >= ring_begin_ && chunk <= ring_end_) ? res @@ -173,13 +173,13 @@ void direct_open_file::forward(std::size_t count) { } auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); auto read_state = ring_state_; return read_state.flip(); } auto direct_open_file::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); return not ring_state_[chunk % ring_state_.size()]; } @@ -212,19 +212,20 @@ void direct_open_file::reverse(std::size_t count) { auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, data_buffer &data) -> api_error { - if (fsi_.directory) { + if (is_directory()) { return api_error::invalid_operation; } reset_timeout(); - read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); + read_size = + utils::calculate_read_size(get_file_size(), read_size, read_offset); if (read_size == 0U) { return api_error::success; } - auto begin_chunk{static_cast(read_offset / chunk_size_)}; - read_offset = read_offset - (begin_chunk * chunk_size_); + auto begin_chunk{static_cast(read_offset / get_chunk_size())}; + read_offset = read_offset - (begin_chunk * get_chunk_size()); unique_mutex_lock read_lock(read_mtx_); auto res = check_start(); @@ -259,7 +260,7 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, reset_timeout(); auto to_read{ - std::min(static_cast(chunk_size_ - read_offset), + std::min(static_cast(get_chunk_size() - read_offset), read_size), }; @@ -316,7 +317,7 @@ void direct_open_file::reader_thread() { check_and_wait(); } - event_system::instance().raise(fsi_.api_path, "direct", + event_system::instance().raise(get_api_path(), "direct", api_error::download_stopped); } diff --git a/repertory/librepertory/src/file_manager/open_file.cpp b/repertory/librepertory/src/file_manager/open_file.cpp index bd23dd4d..982fc9b5 100644 --- a/repertory/librepertory/src/file_manager/open_file.cpp +++ b/repertory/librepertory/src/file_manager/open_file.cpp @@ -68,7 +68,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, mgr_(mgr) { REPERTORY_USES_FUNCTION_NAME(); - if (fsi_.directory) { + if (fsi.directory) { if (read_state.has_value()) { utils::error::raise_api_path_error( function_name, fsi.api_path, fsi.source_path, @@ -79,7 +79,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, } nf_ = utils::file::file::open_or_create_file(fsi.source_path, - provider_.is_read_only()); + get_provider().is_read_only()); set_api_error(*nf_ ? api_error::success : api_error::os_error); if (get_api_error() != api_error::success) { return; @@ -92,12 +92,12 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, return; } - if (fsi_.size == 0U) { + if (fsi.size == 0U) { return; } read_state_.resize(static_cast( - utils::divide_with_ceiling(fsi_.size, chunk_size)), + utils::divide_with_ceiling(fsi.size, chunk_size)), false); auto file_size = nf_->size(); @@ -109,7 +109,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, return; } - if (provider_.is_read_only() || file_size.value() == fsi.size) { + if (get_provider().is_read_only() || file_size.value() == fsi.size) { read_state_.set(0U, read_state_.size(), true); allocated = true; } @@ -125,12 +125,12 @@ auto open_file::adjust_cache_size(std::uint64_t file_size, bool shrink) -> api_error { REPERTORY_USES_FUNCTION_NAME(); - if (file_size == fsi_.size) { + if (file_size == get_file_size()) { return api_error::success; } - if (file_size > fsi_.size) { - auto size = file_size - fsi_.size; + if (file_size > get_file_size()) { + auto size = file_size - get_file_size(); auto res = shrink ? cache_size_mgr::instance().shrink(size) : cache_size_mgr::instance().expand(size); if (res == api_error::success) { @@ -138,13 +138,13 @@ auto open_file::adjust_cache_size(std::uint64_t file_size, } utils::error::raise_api_path_error( - function_name, fsi_.api_path, fsi_.source_path, res, + function_name, get_api_path(), get_source_path(), res, fmt::format("failed to {} cache|size|{}", (shrink ? "shrink" : "expand"), size)); return set_api_error(res); } - auto size = fsi_.size - file_size; + auto size = get_file_size() - file_size; auto res = shrink ? cache_size_mgr::instance().expand(size) : cache_size_mgr::instance().shrink(size); if (res == api_error::success) { @@ -152,7 +152,7 @@ auto open_file::adjust_cache_size(std::uint64_t file_size, } utils::error::raise_api_path_error( - function_name, fsi_.api_path, fsi_.source_path, res, + function_name, get_api_path(), get_source_path(), res, fmt::format("failed to {} cache|size|{}", (shrink ? "expand" : "shrink"), size)); return set_api_error(res); @@ -161,7 +161,7 @@ auto open_file::adjust_cache_size(std::uint64_t file_size, auto open_file::check_start() -> api_error { REPERTORY_USES_FUNCTION_NAME(); - unique_recur_mutex_lock file_lock(file_mtx_); + unique_recur_mutex_lock file_lock(get_mutex()); if (allocated) { return api_error::success; } @@ -169,12 +169,12 @@ auto open_file::check_start() -> api_error { auto file_size = nf_->size(); if (not file_size.has_value()) { utils::error::raise_api_path_error( - function_name, fsi_.api_path, fsi_.source_path, + function_name, get_api_path(), get_source_path(), utils::get_last_error_code(), "failed to get file size"); return set_api_error(api_error::os_error); } - if (file_size.value() == fsi_.size) { + if (file_size.value() == get_file_size()) { allocated = true; return api_error::success; } @@ -186,11 +186,11 @@ auto open_file::check_start() -> api_error { } file_lock.lock(); - if (not nf_->truncate(fsi_.size)) { + if (not nf_->truncate(get_file_size())) { utils::error::raise_api_path_error( - function_name, fsi_.api_path, fsi_.source_path, + function_name, get_api_path(), get_source_path(), utils::get_last_error_code(), - fmt::format("failed to truncate file|size|{}", fsi_.size)); + fmt::format("failed to truncate file|size|{}", get_file_size())); return set_api_error(res); } @@ -201,15 +201,13 @@ auto open_file::check_start() -> api_error { auto open_file::close() -> bool { REPERTORY_USES_FUNCTION_NAME(); - if (fsi_.directory || stop_requested_) { + if (is_directory() || stop_requested_) { return false; } stop_requested_ = true; - unique_mutex_lock reader_lock(io_thread_mtx_); - io_thread_notify_.notify_all(); - reader_lock.unlock(); + notify_io(); if (reader_thread_) { reader_thread_->join(); @@ -224,9 +222,10 @@ auto open_file::close() -> bool { auto err = get_api_error(); if (err == api_error::success || err == api_error::download_incomplete || err == api_error::download_stopped) { - if (modified_ && not read_state.all()) { + if (is_modified() && not read_state.all()) { set_api_error(api_error::download_incomplete); - } else if (not modified_ && (fsi_.size > 0U) && not read_state.all()) { + } else if (not is_modified() && (get_file_size() > 0U) && + not read_state.all()) { set_api_error(api_error::download_stopped); } @@ -235,7 +234,7 @@ auto open_file::close() -> bool { nf_->close(); - if (modified_) { + if (is_modified()) { if (err == api_error::success) { mgr_.queue_upload(*this); return true; @@ -248,24 +247,24 @@ auto open_file::close() -> bool { } if (err != api_error::success || read_state.all()) { - mgr_.remove_resume(fsi_.api_path, get_source_path()); + mgr_.remove_resume(get_api_path(), get_source_path()); } if (err == api_error::success) { return true; } - file_manager::remove_source_and_shrink_cache(fsi_.api_path, fsi_.source_path, - fsi_.size, allocated); + file_manager::remove_source_and_shrink_cache( + get_api_path(), get_source_path(), get_file_size(), allocated); - auto parent = utils::path::get_parent_path(fsi_.source_path); - fsi_.source_path = - utils::path::combine(parent, {utils::create_uuid_string()}); - auto res = - provider_.set_item_meta(fsi_.api_path, META_SOURCE, fsi_.source_path); + auto parent = utils::path::get_parent_path(get_source_path()); + set_source_path(utils::path::combine(parent, {utils::create_uuid_string()})); + + auto res = get_provider().set_item_meta(get_api_path(), META_SOURCE, + get_source_path()); if (res != api_error::success) { - utils::error::raise_api_path_error(function_name, fsi_.api_path, - fsi_.source_path, res, + utils::error::raise_api_path_error(function_name, get_api_path(), + get_source_path(), res, "failed to set new source path"); } @@ -294,12 +293,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, return; } - auto data_offset = chunk * chunk_size_; - auto data_size = - (chunk == read_state.size() - 1U) ? last_chunk_size_ : chunk_size_; + auto data_offset = chunk * get_chunk_size(); + auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size() + : get_chunk_size(); if (active_downloads_.empty() && (read_state.count() == 0U)) { - event_system::instance().raise(fsi_.api_path, - fsi_.source_path); + event_system::instance().raise(get_api_path(), + get_source_path()); } active_downloads_[chunk] = std::make_shared(); @@ -322,16 +321,16 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, static_cast(state.size())) * 100.0; event_system::instance().raise( - fsi_.api_path, fsi_.source_path, progress); + get_api_path(), get_source_path(), progress); if (state.all() && not notified_) { notified_ = true; event_system::instance().raise( - fsi_.api_path, fsi_.source_path, get_api_error()); + get_api_path(), get_source_path(), get_api_error()); } } else if (not notified_) { notified_ = true; event_system::instance().raise( - fsi_.api_path, fsi_.source_path, get_api_error()); + get_api_path(), get_source_path(), get_api_error()); } lock.unlock(); @@ -343,7 +342,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, }; data_buffer buffer; - auto res = provider_.read_file_bytes( + auto res = get_provider().read_file_bytes( get_api_path(), data_size, data_offset, buffer, stop_requested_); if (res != api_error::success) { set_api_error(res); @@ -389,12 +388,12 @@ void open_file::download_range(std::size_t begin_chunk, std::size_t end_chunk, } auto open_file::get_allocated() const -> bool { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); return allocated; } auto open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); return read_state_; } @@ -424,7 +423,7 @@ auto open_file::native_operation( i_open_file::native_operation_callback callback) -> api_error { REPERTORY_USES_FUNCTION_NAME(); - if (fsi_.directory) { + if (is_directory()) { return set_api_error(api_error::invalid_operation); } @@ -446,7 +445,7 @@ auto open_file::native_operation( auto last_chunk = is_empty_file ? std::size_t(0U) : static_cast(utils::divide_with_ceiling( - new_file_size, chunk_size_)) - + new_file_size, get_chunk_size())) - 1U; unique_recur_mutex_lock rw_lock(rw_mtx_); @@ -503,10 +502,11 @@ auto open_file::native_operation( } set_read_state(read_state); - last_chunk_size_ = static_cast( - new_file_size <= chunk_size_ ? new_file_size - : (new_file_size % chunk_size_) == 0U ? chunk_size_ - : new_file_size % chunk_size_); + set_last_chunk_size(static_cast( + new_file_size <= get_chunk_size() ? new_file_size + : (new_file_size % get_chunk_size()) == 0U + ? get_chunk_size() + : new_file_size % get_chunk_size())); } if (original_file_size == new_file_size) { @@ -514,15 +514,15 @@ auto open_file::native_operation( } set_modified(); - fsi_.size = new_file_size; + set_file_size(new_file_size); auto now = std::to_string(utils::time::get_time_now()); - res = provider_.set_item_meta(fsi_.api_path, - { - {META_CHANGED, now}, - {META_MODIFIED, now}, - {META_SIZE, std::to_string(new_file_size)}, - {META_WRITTEN, now}, - }); + res = get_provider().set_item_meta( + get_api_path(), { + {META_CHANGED, now}, + {META_MODIFIED, now}, + {META_SIZE, std::to_string(new_file_size)}, + {META_WRITTEN, now}, + }); if (res == api_error::success) { return res; } @@ -534,7 +534,7 @@ auto open_file::native_operation( auto open_file::read(std::size_t read_size, std::uint64_t read_offset, data_buffer &data) -> api_error { - if (fsi_.directory) { + if (is_directory()) { return set_api_error(api_error::invalid_operation); } @@ -556,9 +556,9 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset, const auto read_from_source = [this, &data, &read_offset, &read_size]() -> api_error { return do_io([this, &data, &read_offset, &read_size]() -> api_error { - if (provider_.is_read_only()) { - return provider_.read_file_bytes(fsi_.api_path, read_size, read_offset, - data, stop_requested_); + if (get_provider().is_read_only()) { + return get_provider().read_file_bytes( + get_api_path(), read_size, read_offset, data, stop_requested_); } data.resize(read_size); @@ -574,9 +574,9 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset, return read_from_source(); } - auto begin_chunk = static_cast(read_offset / chunk_size_); + auto begin_chunk = static_cast(read_offset / get_chunk_size()); auto end_chunk = - static_cast((read_size + read_offset) / chunk_size_); + static_cast((read_size + read_offset) / get_chunk_size()); update_background_reader(begin_chunk); @@ -594,14 +594,14 @@ void open_file::remove(std::uint64_t handle) { open_file_base::remove(handle); recur_mutex_lock rw_lock(rw_mtx_); - if (modified_ && get_read_state().all() && + if (is_modified() && get_read_state().all() && (get_api_error() == api_error::success)) { mgr_.queue_upload(*this); - modified_ = false; + open_file_base::set_modified(false); } - if (removed_ && (get_open_file_count() == 0U)) { - removed_ = false; + if (is_removed() && (get_open_file_count() == 0U)) { + open_file_base::set_removed(false); } } @@ -609,8 +609,8 @@ void open_file::remove_all() { open_file_base::remove_all(); recur_mutex_lock rw_lock(rw_mtx_); - modified_ = false; - removed_ = true; + open_file_base::set_modified(false); + open_file_base::set_removed(true); mgr_.remove_upload(get_api_path()); @@ -618,11 +618,11 @@ void open_file::remove_all() { } auto open_file::resize(std::uint64_t new_file_size) -> api_error { - if (fsi_.directory) { + if (is_directory()) { return set_api_error(api_error::invalid_operation); } - if (new_file_size == fsi_.size) { + if (new_file_size == get_file_size()) { return api_error::success; } @@ -634,24 +634,24 @@ auto open_file::resize(std::uint64_t new_file_size) -> api_error { } void open_file::set_modified() { - if (not modified_) { - modified_ = true; + if (not is_modified()) { + open_file_base::set_modified(true); mgr_.store_resume(*this); } - if (not removed_) { - removed_ = true; + if (not is_removed()) { + open_file_base::set_removed(true); mgr_.remove_upload(get_api_path()); } } void open_file::set_read_state(std::size_t chunk) { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); read_state_.set(chunk); } void open_file::set_read_state(boost::dynamic_bitset<> read_state) { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); read_state_ = std::move(read_state); } @@ -668,15 +668,9 @@ void open_file::update_background_reader(std::size_t read_chunk) { while (not stop_requested_) { unique_recur_mutex_lock lock(rw_mtx_); auto read_state = get_read_state(); - if ((fsi_.size == 0U) || read_state.all()) { + if ((get_file_size() == 0U) || read_state.all()) { lock.unlock(); - - unique_mutex_lock io_lock(io_thread_mtx_); - if (not stop_requested_ && io_thread_queue_.empty()) { - io_thread_notify_.wait(io_lock); - } - io_thread_notify_.notify_all(); - io_lock.unlock(); + wait_for_io(stop_requested_); continue; } @@ -699,7 +693,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, bytes_written = 0U; - if (fsi_.directory || provider_.is_read_only()) { + if (is_directory() || get_provider().is_read_only()) { return set_api_error(api_error::invalid_operation); } @@ -716,9 +710,9 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, return res; } - auto begin_chunk = static_cast(write_offset / chunk_size_); + auto begin_chunk = static_cast(write_offset / get_chunk_size()); auto end_chunk = - static_cast((write_offset + data.size()) / chunk_size_); + static_cast((write_offset + data.size()) / get_chunk_size()); update_background_reader(begin_chunk); @@ -729,7 +723,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, } unique_recur_mutex_lock rw_lock(rw_mtx_); - if ((write_offset + data.size()) > fsi_.size) { + if ((write_offset + data.size()) > get_file_size()) { res = resize(write_offset + data.size()); if (res != api_error::success) { return res; @@ -749,11 +743,11 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, } auto now = std::to_string(utils::time::get_time_now()); - res = provider_.set_item_meta(fsi_.api_path, { - {META_CHANGED, now}, - {META_MODIFIED, now}, - {META_WRITTEN, now}, - }); + res = get_provider().set_item_meta(get_api_path(), { + {META_CHANGED, now}, + {META_MODIFIED, now}, + {META_WRITTEN, now}, + }); if (res != api_error::success) { utils::error::raise_api_path_error(function_name, get_api_path(), res, "failed to set file meta"); diff --git a/repertory/librepertory/src/file_manager/open_file_base.cpp b/repertory/librepertory/src/file_manager/open_file_base.cpp index c1bc906c..83a5a36a 100644 --- a/repertory/librepertory/src/file_manager/open_file_base.cpp +++ b/repertory/librepertory/src/file_manager/open_file_base.cpp @@ -133,6 +133,24 @@ auto open_file_base::can_close() const -> bool { return (duration.count() >= chunk_timeout_); } +auto open_file_base::close() -> bool { + unique_mutex_lock io_lock(io_thread_mtx_); + if (io_stop_requested_ || not io_thread_) { + io_thread_notify_.notify_all(); + io_lock.unlock(); + return false; + } + + io_stop_requested_ = true; + io_thread_notify_.notify_all(); + io_lock.unlock(); + + io_thread_->join(); + io_thread_.reset(); + + return true; +} + auto open_file_base::do_io(std::function action) -> api_error { unique_mutex_lock io_lock(io_thread_mtx_); auto item = std::make_shared(action); @@ -191,6 +209,36 @@ auto open_file_base::get_file_size() const -> std::uint64_t { return fsi_.size; } +[[nodiscard]] auto open_file_base::get_last_chunk_size() const -> std::size_t { + recur_mutex_lock file_lock(file_mtx_); + return last_chunk_size_; +} + +void open_file_base::set_file_size(std::uint64_t size) { + recur_mutex_lock file_lock(file_mtx_); + fsi_.size = size; +} + +void open_file_base::set_last_chunk_size(std::size_t size) { + recur_mutex_lock file_lock(file_mtx_); + last_chunk_size_ = size; +} + +void open_file_base::set_modified(bool modified) { + recur_mutex_lock file_lock(file_mtx_); + modified_ = modified; +} + +void open_file_base::set_removed(bool removed) { + recur_mutex_lock file_lock(file_mtx_); + removed_ = removed; +} + +void open_file_base::set_source_path(std::string source_path) { + recur_mutex_lock file_lock(file_mtx_); + fsi_.source_path = std::move(source_path); +} + auto open_file_base::get_filesystem_item() const -> filesystem_item { recur_mutex_lock file_lock(file_mtx_); return fsi_; @@ -235,6 +283,11 @@ auto open_file_base::get_open_file_count() const -> std::size_t { return open_data_.size(); } +auto open_file_base::get_source_path() const -> std::string { + recur_mutex_lock file_lock(file_mtx_); + return fsi_.source_path; +} + auto open_file_base::has_handle(std::uint64_t handle) const -> bool { recur_mutex_lock file_lock(file_mtx_); return open_data_.find(handle) != open_data_.end(); @@ -245,6 +298,16 @@ auto open_file_base::is_modified() const -> bool { return modified_; } +auto open_file_base::is_removed() const -> bool { + recur_mutex_lock file_lock(file_mtx_); + return removed_; +} + +void open_file_base::notify_io() { + mutex_lock io_lock(io_thread_mtx_); + io_thread_notify_.notify_all(); +} + void open_file_base::remove(std::uint64_t handle) { recur_mutex_lock file_lock(file_mtx_); if (open_data_.find(handle) == open_data_.end()) { @@ -303,21 +366,12 @@ void open_file_base::set_api_path(const std::string &api_path) { fsi_.api_parent = utils::path::get_parent_api_path(api_path); } -auto open_file_base::close() -> bool { +void open_file_base::wait_for_io(stop_type &stop_requested) { unique_mutex_lock io_lock(io_thread_mtx_); - if (io_stop_requested_ || not io_thread_) { - io_thread_notify_.notify_all(); - io_lock.unlock(); - return false; + if (not stop_requested && io_thread_queue_.empty()) { + io_thread_notify_.wait(io_lock); } - - io_stop_requested_ = true; io_thread_notify_.notify_all(); io_lock.unlock(); - - io_thread_->join(); - io_thread_.reset(); - - return true; } } // namespace repertory diff --git a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp index 13c24936..db78a1a7 100644 --- a/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp +++ b/repertory/librepertory/src/file_manager/ring_buffer_open_file.cpp @@ -45,12 +45,12 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, source_path_(utils::path::combine(buffer_directory, {utils::create_uuid_string()})), total_chunks_(static_cast( - utils::divide_with_ceiling(fsi_.size, chunk_size))) { + utils::divide_with_ceiling(fsi.size, chunk_size))) { if (ring_size < 5U) { throw std::runtime_error("ring size must be greater than or equal to 5"); } - if (not can_handle_file(fsi_.size, chunk_size, ring_size)) { + if (not can_handle_file(fsi.size, chunk_size, ring_size)) { throw std::runtime_error("file size is less than ring buffer size"); } @@ -69,7 +69,7 @@ ring_buffer_open_file::~ring_buffer_open_file() { if (not utils::file::file(source_path_).remove()) { utils::error::raise_api_path_error( - function_name, fsi_.api_path, source_path_, + function_name, get_api_path(), source_path_, utils::get_last_error_code(), "failed to delete file"); } } @@ -96,7 +96,7 @@ auto ring_buffer_open_file::check_start() -> api_error { auto buffer_directory{utils::path::get_parent_path(source_path_)}; if (not utils::file::directory(buffer_directory).create_directory()) { utils::error::raise_api_path_error( - function_name, fsi_.api_path, source_path_, + function_name, get_api_path(), source_path_, fmt::format("failed to create buffer directory|path|{}|err|{}", buffer_directory, utils::get_last_error_code())); return api_error::os_error; @@ -105,24 +105,24 @@ auto ring_buffer_open_file::check_start() -> api_error { nf_ = utils::file::file::open_or_create_file(source_path_); if (not nf_ || not *nf_) { utils::error::raise_api_path_error( - function_name, fsi_.api_path, source_path_, + function_name, get_api_path(), source_path_, fmt::format("failed to create buffer file|err|{}", utils::get_last_error_code())); return api_error::os_error; } - if (not nf_->truncate(ring_state_.size() * chunk_size_)) { + if (not nf_->truncate(ring_state_.size() * get_chunk_size())) { nf_->close(); nf_.reset(); utils::error::raise_api_path_error( - function_name, fsi_.api_path, source_path_, + function_name, get_api_path(), source_path_, fmt::format("failed to resize buffer file|err|{}", utils::get_last_error_code())); return api_error::os_error; } - event_system::instance().raise(fsi_.api_path, source_path_); + event_system::instance().raise(get_api_path(), source_path_); reader_thread_ = std::make_unique([this]() { reader_thread(); }); return api_error::success; } @@ -176,14 +176,14 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk, unlock_and_notify(); data_buffer buffer; - auto data_offset{chunk * chunk_size_}; + auto data_offset{chunk * get_chunk_size()}; auto data_size{ - chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_, + chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(), }; auto res{ - provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer, - stop_requested_), + get_provider().read_file_bytes(get_api_path(), data_size, data_offset, + buffer, stop_requested_), }; chunk_lock.lock(); @@ -191,20 +191,21 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk, auto progress = (static_cast(chunk + 1U) / static_cast(total_chunks_)) * 100.0; - event_system::instance().raise(fsi_.api_path, + event_system::instance().raise(get_api_path(), source_path_, progress); - res = (chunk >= ring_begin_ && chunk <= ring_end_) - ? do_io([&]() -> api_error { - std::size_t bytes_written{}; - if (nf_->write(buffer, - (chunk % ring_state_.size()) * chunk_size_, - &bytes_written)) { - return api_error::success; - } + res = + (chunk >= ring_begin_ && chunk <= ring_end_) + ? do_io([&]() -> api_error { + std::size_t bytes_written{}; + if (nf_->write(buffer, + (chunk % ring_state_.size()) * get_chunk_size(), + &bytes_written)) { + return api_error::success; + } - return api_error::os_error; - }) - : api_error::invalid_ring_buffer_position; + return api_error::os_error; + }) + : api_error::invalid_ring_buffer_position; } active_downloads_.erase(chunk); @@ -244,13 +245,13 @@ void ring_buffer_open_file::forward(std::size_t count) { } auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); auto read_state = ring_state_; return read_state.flip(); } auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(get_mutex()); return not ring_state_[chunk % ring_state_.size()]; } @@ -289,19 +290,20 @@ void ring_buffer_open_file::reverse(std::size_t count) { auto ring_buffer_open_file::read(std::size_t read_size, std::uint64_t read_offset, data_buffer &data) -> api_error { - if (fsi_.directory) { + if (is_directory()) { return api_error::invalid_operation; } reset_timeout(); - read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); + read_size = + utils::calculate_read_size(get_file_size(), read_size, read_offset); if (read_size == 0U) { return api_error::success; } - auto begin_chunk{static_cast(read_offset / chunk_size_)}; - read_offset = read_offset - (begin_chunk * chunk_size_); + auto begin_chunk{static_cast(read_offset / get_chunk_size())}; + read_offset = read_offset - (begin_chunk * get_chunk_size()); unique_mutex_lock read_lock(read_mtx_); auto res = check_start(); @@ -336,7 +338,7 @@ auto ring_buffer_open_file::read(std::size_t read_size, reset_timeout(); auto to_read{ - std::min(static_cast(chunk_size_ - read_offset), + std::min(static_cast(get_chunk_size() - read_offset), read_size), }; @@ -344,12 +346,13 @@ auto ring_buffer_open_file::read(std::size_t read_size, data_buffer buffer(to_read); std::size_t bytes_read{}; - auto result = nf_->read(buffer, - (((chunk % ring_state_.size()) * chunk_size_) + - read_offset), - &bytes_read) - ? api_error::success - : api_error::os_error; + auto result = + nf_->read( + buffer, + (((chunk % ring_state_.size()) * get_chunk_size()) + read_offset), + &bytes_read) + ? api_error::success + : api_error::os_error; if (result != api_error::success) { return result; @@ -409,7 +412,7 @@ void ring_buffer_open_file::reader_thread() { check_and_wait(); } - event_system::instance().raise(fsi_.api_path, source_path_, + event_system::instance().raise(get_api_path(), source_path_, api_error::download_stopped); }