diff --git a/repertory/librepertory/include/file_manager/cache_size_mgr.hpp b/repertory/librepertory/include/file_manager/cache_size_mgr.hpp index c1395404..89bbf957 100644 --- a/repertory/librepertory/include/file_manager/cache_size_mgr.hpp +++ b/repertory/librepertory/include/file_manager/cache_size_mgr.hpp @@ -50,7 +50,7 @@ private: stop_type stop_requested_{false}; public: - [[nodiscard]] auto expand(std::uint64_t size) -> api_error; + [[nodiscard]] auto expand(std::uint64_t size, bool should_wait) -> api_error; void initialize(app_config *cfg); diff --git a/repertory/librepertory/include/file_manager/open_file.hpp b/repertory/librepertory/include/file_manager/open_file.hpp index b02de7ad..67f59254 100644 --- a/repertory/librepertory/include/file_manager/open_file.hpp +++ b/repertory/librepertory/include/file_manager/open_file.hpp @@ -72,6 +72,7 @@ private: boost::dynamic_bitset<> read_state_; std::unique_ptr reader_thread_; std::unique_ptr download_thread_; + mutable std::recursive_mutex rw_mtx_; stop_type stop_requested_ = false; private: @@ -102,12 +103,12 @@ public: return true; } - [[nodiscard]] auto - native_operation(native_operation_callback callback) -> api_error override; + [[nodiscard]] auto native_operation(native_operation_callback callback) + -> api_error override; - [[nodiscard]] auto - native_operation(std::uint64_t new_file_size, - native_operation_callback callback) -> api_error override; + [[nodiscard]] auto native_operation(std::uint64_t new_file_size, + native_operation_callback callback) + -> api_error override; void remove(std::uint64_t handle) override; diff --git a/repertory/librepertory/src/file_manager/cache_size_mgr.cpp b/repertory/librepertory/src/file_manager/cache_size_mgr.cpp index ec4cde4b..16ab568a 100644 --- a/repertory/librepertory/src/file_manager/cache_size_mgr.cpp +++ b/repertory/librepertory/src/file_manager/cache_size_mgr.cpp @@ -42,7 +42,7 @@ E_SIMPLE2(max_cache_size_reached, warn, true, cache_size_mgr cache_size_mgr::instance_{}; -auto cache_size_mgr::expand(std::uint64_t size) -> api_error { +auto cache_size_mgr::expand(std::uint64_t size, bool should_wait) -> api_error { if (size == 0U) { return api_error::success; } @@ -59,13 +59,16 @@ auto cache_size_mgr::expand(std::uint64_t size) -> api_error { while (not stop_requested_ && cache_size_ > max_cache_size) { event_system::instance().raise(cache_size_, max_cache_size); - notify_.notify_all(); + if (not should_wait) { + break; + } + notify_.wait(lock); } notify_.notify_all(); - return stop_requested_ ? api_error::error : api_error::success; + return api_error::success; } void cache_size_mgr::initialize(app_config *cfg) { @@ -104,7 +107,7 @@ auto cache_size_mgr::shrink(std::uint64_t size) -> api_error { notify_.notify_all(); - return stop_requested_ ? api_error::error : api_error::success; + return api_error::success; } auto cache_size_mgr::size() const -> std::uint64_t { diff --git a/repertory/librepertory/src/file_manager/open_file.cpp b/repertory/librepertory/src/file_manager/open_file.cpp index 65a5acde..4297c13f 100644 --- a/repertory/librepertory/src/file_manager/open_file.cpp +++ b/repertory/librepertory/src/file_manager/open_file.cpp @@ -64,12 +64,15 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, i_upload_manager &mgr) : open_file_base(chunk_size, chunk_timeout, fsi, open_data, provider), mgr_(mgr) { - if (fsi_.directory && read_state.has_value()) { - throw startup_exception( - fmt::format("cannot resume a directory|sp|", fsi.api_path)); - } + REPERTORY_USES_FUNCTION_NAME(); + + if (fsi_.directory) { + if (read_state.has_value()) { + utils::error::raise_api_path_error( + function_name, fsi.api_path, fsi.source_path, + fmt::format("cannot resume a directory|sp|", fsi.api_path)); + } - if (fsi.directory) { return; } @@ -88,16 +91,15 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, utils::divide_with_ceiling(fsi_.size, chunk_size)), false); - auto file_size = nf_->size(); - if (provider_.is_read_only() || file_size.value() == fsi.size) { + auto file_size = nf_->size().value_or(0U); + if (provider_.is_read_only() || file_size == fsi.size) { read_state_.set(0U, read_state_.size(), true); } else if (nf_->truncate(fsi.size)) { - if (file_size.value() > fsi.size) { - set_api_error( - cache_size_mgr::instance().shrink(file_size.value() - fsi.size)); + if (file_size > fsi.size) { + set_api_error(cache_size_mgr::instance().shrink(file_size - fsi.size)); } else { set_api_error( - cache_size_mgr::instance().expand(fsi.size - file_size.value())); + cache_size_mgr::instance().expand(fsi.size - file_size, false)); } } else { set_api_error(api_error::os_error); @@ -202,7 +204,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, reset_timeout(); } - unique_recur_mutex_lock download_lock(file_mtx_); + unique_recur_mutex_lock file_lock(rw_mtx_); if ((get_api_error() == api_error::success) && (chunk < read_state_.size()) && not read_state_[chunk]) { if (active_downloads_.find(chunk) != active_downloads_.end()) { @@ -211,7 +213,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, } auto active_download = active_downloads_.at(chunk); - download_lock.unlock(); + file_lock.unlock(); active_download->wait(); return; @@ -229,7 +231,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, read_state_.count()); active_downloads_[chunk] = std::make_shared(); - download_lock.unlock(); + file_lock.unlock(); if (should_reset) { reset_timeout(); @@ -238,7 +240,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, std::async(std::launch::async, [this, chunk, data_size, data_offset, should_reset]() { const auto notify_complete = [this, chunk, should_reset]() { - unique_recur_mutex_lock file_lock(file_mtx_); + unique_recur_mutex_lock lock(rw_mtx_); auto active_download = active_downloads_.at(chunk); active_downloads_.erase(chunk); event_system::instance().raise( @@ -260,7 +262,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, event_system::instance().raise( fsi_.api_path, fsi_.source_path, get_api_error()); } - file_lock.unlock(); + lock.unlock(); active_download->notify(get_api_error()); @@ -299,9 +301,9 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active, return; } - unique_recur_mutex_lock file_lock(file_mtx_); + unique_recur_mutex_lock lock(rw_mtx_); read_state_.set(chunk); - file_lock.unlock(); + lock.unlock(); notify_complete(); }).wait(); @@ -318,28 +320,26 @@ void open_file::download_range(std::size_t start_chunk, std::size_t end_chunk, } auto open_file::get_read_state() const -> boost::dynamic_bitset<> { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(rw_mtx_); return read_state_; } auto open_file::get_read_state(std::size_t chunk) const -> bool { - recur_mutex_lock file_lock(file_mtx_); - return read_state_[chunk]; + return get_read_state()[chunk]; } auto open_file::is_complete() const -> bool { - recur_mutex_lock file_lock(file_mtx_); + recur_mutex_lock file_lock(rw_mtx_); return read_state_.all(); } auto open_file::native_operation( i_open_file::native_operation_callback callback) -> api_error { - unique_recur_mutex_lock file_lock(file_mtx_); if (stop_requested_) { return api_error::download_stopped; } - file_lock.unlock(); + unique_recur_mutex_lock file_lock(rw_mtx_); return do_io([&]() -> api_error { return callback(nf_->get_handle()); }); } @@ -352,11 +352,9 @@ auto open_file::native_operation( return api_error::invalid_operation; } - unique_recur_mutex_lock file_lock(file_mtx_); if (stop_requested_) { return api_error::download_stopped; } - file_lock.unlock(); auto is_empty_file = new_file_size == 0U; auto last_chunk = is_empty_file @@ -365,7 +363,7 @@ auto open_file::native_operation( new_file_size, chunk_size_)) - 1U; - file_lock.lock(); + unique_recur_mutex_lock file_lock(rw_mtx_); if (not is_empty_file && (last_chunk < read_state_.size())) { file_lock.unlock(); update_background_reader(0U); @@ -465,12 +463,10 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset, }); }; - unique_recur_mutex_lock file_lock(file_mtx_); if (read_state_.all()) { reset_timeout(); return read_from_source(); } - file_lock.unlock(); auto start_chunk = static_cast(read_offset / chunk_size_); auto end_chunk = @@ -483,14 +479,15 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset, return get_api_error(); } - file_lock.lock(); + unique_recur_mutex_lock file_lock(rw_mtx_); return get_api_error() == api_error::success ? read_from_source() : get_api_error(); } void open_file::remove(std::uint64_t handle) { - recur_mutex_lock file_lock(file_mtx_); open_file_base::remove(handle); + + recur_mutex_lock file_lock(rw_mtx_); if (modified_ && read_state_.all() && (get_api_error() == api_error::success)) { mgr_.queue_upload(*this); @@ -503,9 +500,9 @@ void open_file::remove(std::uint64_t handle) { } void open_file::remove_all() { - recur_mutex_lock file_lock(file_mtx_); open_file_base::remove_all(); + recur_mutex_lock file_lock(rw_mtx_); modified_ = false; removed_ = true; @@ -524,7 +521,8 @@ auto open_file::resize(std::uint64_t new_file_size) -> api_error { } if (new_file_size > fsi_.size) { - auto res = cache_size_mgr::instance().expand(new_file_size - fsi_.size); + auto res = + cache_size_mgr::instance().expand(new_file_size - fsi_.size, true); if (res != api_error::success) { return res; } @@ -555,7 +553,7 @@ void open_file::set_modified() { } void open_file::update_background_reader(std::size_t read_chunk) { - recur_mutex_lock reader_lock(file_mtx_); + recur_mutex_lock file_lock(rw_mtx_); read_chunk_ = read_chunk; if (reader_thread_ || stop_requested_) { @@ -565,9 +563,9 @@ void open_file::update_background_reader(std::size_t read_chunk) { reader_thread_ = std::make_unique([this]() { std::size_t next_chunk{}; while (not stop_requested_) { - unique_recur_mutex_lock file_lock(file_mtx_); + unique_recur_mutex_lock lock(rw_mtx_); if ((fsi_.size == 0U) || read_state_.all()) { - file_lock.unlock(); + lock.unlock(); unique_mutex_lock io_lock(io_thread_mtx_); if (not stop_requested_ && io_thread_queue_.empty()) { @@ -584,7 +582,7 @@ void open_file::update_background_reader(std::size_t read_chunk) { } while ((next_chunk != 0U) && (active_downloads_.find(next_chunk) != active_downloads_.end())); - file_lock.unlock(); + lock.unlock(); download_chunk(next_chunk, true, false); } }); @@ -604,11 +602,9 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, return api_error::success; } - unique_recur_mutex_lock write_lock(file_mtx_); if (stop_requested_) { return api_error::download_stopped; } - write_lock.unlock(); auto start_chunk = static_cast(write_offset / chunk_size_); auto end_chunk = @@ -622,7 +618,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data, return get_api_error(); } - write_lock.lock(); + unique_recur_mutex_lock file_lock(rw_mtx_); if ((write_offset + data.size()) > fsi_.size) { auto res = resize(write_offset + data.size()); if (res != api_error::success) { diff --git a/repertory/repertory_test/src/open_file_test.cpp b/repertory/repertory_test/src/open_file_test.cpp index 5d2d9e42..f99a5efc 100644 --- a/repertory/repertory_test/src/open_file_test.cpp +++ b/repertory/repertory_test/src/open_file_test.cpp @@ -494,7 +494,8 @@ TEST_F(open_file_test, resize_file_to_0_bytes) { fsi.size = test_chunk_size * 4U; fsi.source_path = source_path; - EXPECT_EQ(api_error::success, cache_size_mgr::instance().expand(fsi.size)); + EXPECT_EQ(api_error::success, + cache_size_mgr::instance().expand(fsi.size, false)); open_file file(test_chunk_size, 0U, fsi, provider, upload_mgr); test_closeable_open_file(file, false, api_error::success, fsi.size, @@ -547,7 +548,8 @@ TEST_F(open_file_test, resize_file_by_full_chunk) { fsi.size = test_chunk_size * 4U; fsi.source_path = source_path; - EXPECT_EQ(api_error::success, cache_size_mgr::instance().expand(fsi.size)); + EXPECT_EQ(api_error::success, + cache_size_mgr::instance().expand(fsi.size, false)); EXPECT_CALL(upload_mgr, store_resume) .WillOnce([&fsi](const i_open_file &file) {