diff --git a/repertory/librepertory/include/providers/base_provider.hpp b/repertory/librepertory/include/providers/base_provider.hpp index 6a00603b..63995ebf 100644 --- a/repertory/librepertory/include/providers/base_provider.hpp +++ b/repertory/librepertory/include/providers/base_provider.hpp @@ -55,16 +55,14 @@ private: private: void add_all_items(const stop_type &stop_requested); - void get_removed_items(std::deque &directories, - std::deque &files, - const stop_type &stop_requested) const; - - void process_removed_directories(std::deque &removed_list, + void process_removed_directories(std::deque removed_list, const stop_type &stop_requested); - void process_removed_files(std::deque &removed_list, + void process_removed_files(std::deque removed_list, const stop_type &stop_requested); + void process_removed_items(const stop_type &stop_requested); + void remove_deleted_items(const stop_type &stop_requested); void remove_unmatched_source_files(const stop_type &stop_requested); diff --git a/repertory/librepertory/include/utils/polling.hpp b/repertory/librepertory/include/utils/polling.hpp index 7e582fc0..82504c39 100644 --- a/repertory/librepertory/include/utils/polling.hpp +++ b/repertory/librepertory/include/utils/polling.hpp @@ -35,7 +35,7 @@ public: second, }; - struct polling_item { + struct polling_item final { std::string name; frequency freq; std::function action; diff --git a/repertory/librepertory/include/utils/tasks.hpp b/repertory/librepertory/include/utils/tasks.hpp new file mode 100644 index 00000000..47dea02d --- /dev/null +++ b/repertory/librepertory/include/utils/tasks.hpp @@ -0,0 +1,74 @@ +/* + Copyright <2018-2024> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#ifndef REPERTORY_INCLUDE_UTILS_TASKS_HPP_ +#define REPERTORY_INCLUDE_UTILS_TASKS_HPP_ + +#include "types/repertory.hpp" + +namespace repertory { +class app_config; + +class tasks final { +public: + struct task_item final { + std::function action; + }; + +public: + tasks(const tasks &) = delete; + tasks(tasks &&) = delete; + auto operator=(const tasks &) -> tasks & = delete; + auto operator=(tasks &&) -> tasks & = delete; + +private: + tasks() = default; + + ~tasks() { stop(); } + +private: + static tasks instance_; + +public: + static auto instance() -> tasks & { return instance_; } + +private: + app_config *config_{nullptr}; + std::mutex mutex_; + std::condition_variable notify_; + std::mutex start_stop_mutex_; + stop_type stop_requested_{false}; + std::vector> task_threads_; + std::deque tasks_; + +private: + void task_thread(); + +public: + void schedule(task_item task); + + void start(app_config *config); + + void stop(); +}; +} // namespace repertory + +#endif // REPERTORY_INCLUDE_UTILS_TASKS_HPP_ diff --git a/repertory/librepertory/src/drives/fuse/fuse_drive.cpp b/repertory/librepertory/src/drives/fuse/fuse_drive.cpp index dcd12d99..cc98161b 100644 --- a/repertory/librepertory/src/drives/fuse/fuse_drive.cpp +++ b/repertory/librepertory/src/drives/fuse/fuse_drive.cpp @@ -42,6 +42,7 @@ #include "utils/common.hpp" #include "utils/error_utils.hpp" #include "utils/polling.hpp" +#include "utils/tasks.hpp" #include "utils/time.hpp" #include "utils/utils.hpp" @@ -258,6 +259,7 @@ void fuse_drive::destroy_impl(void *ptr) { } polling::instance().stop(); + tasks::instance().stop(); if (eviction_) { eviction_->stop(); @@ -622,6 +624,7 @@ void *fuse_drive::init_impl(struct fuse_conn_info *conn) { } polling::instance().start(&config_); + tasks::instance().start(&config_); event_system::instance().raise(get_mount_location()); } catch (const std::exception &e) { diff --git a/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp b/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp index e5f6e4bc..5b6e2266 100644 --- a/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp +++ b/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp @@ -38,6 +38,7 @@ #include "utils/file_utils.hpp" #include "utils/polling.hpp" #include "utils/string.hpp" +#include "utils/tasks.hpp" #include "utils/time.hpp" #include "utils/utils.hpp" @@ -652,6 +653,8 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { } polling::instance().start(&config_); + tasks::instance().start(&config_); + event_system::instance().raise(mount_location); } catch (const std::exception &e) { utils::error::raise_error(function_name, e, "exception occurred"); @@ -660,6 +663,7 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { } server_->stop(); polling::instance().stop(); + tasks::instance().stop(); if (eviction_) { eviction_->stop(); } @@ -1172,6 +1176,7 @@ VOID winfsp_drive::Unmounted(PVOID host) { } server_->stop(); polling::instance().stop(); + tasks::instance().stop(); if (eviction_) { eviction_->stop(); } diff --git a/repertory/librepertory/src/providers/base_provider.cpp b/repertory/librepertory/src/providers/base_provider.cpp index 0b21c01b..8649d572 100644 --- a/repertory/librepertory/src/providers/base_provider.cpp +++ b/repertory/librepertory/src/providers/base_provider.cpp @@ -19,8 +19,6 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#include - #include "providers/base_provider.hpp" #include "app_config.hpp" @@ -32,6 +30,7 @@ #include "utils/file_utils.hpp" #include "utils/path.hpp" #include "utils/polling.hpp" +#include "utils/tasks.hpp" #include "utils/time.hpp" namespace repertory { @@ -50,8 +49,8 @@ void base_provider::add_all_items(const stop_type &stop_requested) { } auto base_provider::create_api_file(std::string path, std::string key, - std::uint64_t size, - std::uint64_t file_time) -> api_file { + std::uint64_t size, std::uint64_t file_time) + -> api_file { api_file file{}; file.api_path = utils::path::create_api_path(path); file.api_parent = utils::path::get_parent_api_path(file.api_path); @@ -83,8 +82,8 @@ auto base_provider::create_api_file(std::string path, std::uint64_t size, } auto base_provider::create_directory_clone_source_meta( - const std::string &source_api_path, - const std::string &api_path) -> api_error { + const std::string &source_api_path, const std::string &api_path) + -> api_error { REPERTORY_USES_FUNCTION_NAME(); bool exists{}; @@ -181,8 +180,8 @@ auto base_provider::create_directory(const std::string &api_path, return set_item_meta(api_path, meta); } -auto base_provider::create_file(const std::string &api_path, - api_meta_map &meta) -> api_error { +auto base_provider::create_file(const std::string &api_path, api_meta_map &meta) + -> api_error { REPERTORY_USES_FUNCTION_NAME(); bool exists{}; @@ -239,8 +238,9 @@ auto base_provider::create_file(const std::string &api_path, return api_error::error; } -auto base_provider::get_api_path_from_source( - const std::string &source_path, std::string &api_path) const -> api_error { +auto base_provider::get_api_path_from_source(const std::string &source_path, + std::string &api_path) const + -> api_error { REPERTORY_USES_FUNCTION_NAME(); if (source_path.empty()) { @@ -253,8 +253,9 @@ auto base_provider::get_api_path_from_source( return db3_->get_api_path(source_path, api_path); } -auto base_provider::get_directory_items( - const std::string &api_path, directory_item_list &list) const -> api_error { +auto base_provider::get_directory_items(const std::string &api_path, + directory_item_list &list) const + -> api_error { REPERTORY_USES_FUNCTION_NAME(); bool exists{}; @@ -318,9 +319,10 @@ auto base_provider::get_file_size(const std::string &api_path, return api_error::success; } -auto base_provider::get_filesystem_item( - const std::string &api_path, bool directory, - filesystem_item &fsi) const -> api_error { +auto base_provider::get_filesystem_item(const std::string &api_path, + bool directory, + filesystem_item &fsi) const + -> api_error { bool exists{}; auto res = is_directory(api_path, exists); if (res != api_error::success) { @@ -353,9 +355,10 @@ auto base_provider::get_filesystem_item( return api_error::success; } -auto base_provider::get_filesystem_item_and_file( - const std::string &api_path, api_file &file, - filesystem_item &fsi) const -> api_error { +auto base_provider::get_filesystem_item_and_file(const std::string &api_path, + api_file &file, + filesystem_item &fsi) const + -> api_error { auto res = get_file(api_path, file); if (res != api_error::success) { return res; @@ -411,49 +414,6 @@ auto base_provider::get_pinned_files() const -> std::vector { return db3_->get_pinned_files(); } -void base_provider::get_removed_items(std::deque &directories, - std::deque &files, - const stop_type &stop_requested) const { - auto list = db3_->get_api_path_list(); - std::all_of(std::execution::par, list.begin(), list.end(), - [&](auto &&api_path) -> bool { - if (stop_requested) { - return false; - } - - api_meta_map meta{}; - if (get_item_meta(api_path, meta) != api_error::success) { - return true; - } - - if (utils::string::to_bool(meta[META_DIRECTORY])) { - bool exists{}; - if (is_directory(api_path, exists) != api_error::success) { - return true; - } - - if (not exists) { - directories.emplace_back(removed_item{api_path, true, ""}); - } - - return true; - } - - bool exists{}; - if (is_file(api_path, exists) != api_error::success) { - return true; - } - - if (exists) { - return true; - } - - files.emplace_back( - removed_item{api_path, false, meta[META_SOURCE]}); - return true; - }); -} - auto base_provider::get_total_item_count() const -> std::uint64_t { return db3_->get_total_item_count(); } @@ -474,7 +434,7 @@ auto base_provider::is_file_writeable(const std::string &api_path) const } void base_provider::process_removed_directories( - std::deque &removed_list, const stop_type &stop_requested) { + std::deque removed_list, const stop_type &stop_requested) { for (auto &&item : removed_list) { if (stop_requested) { return; @@ -490,8 +450,8 @@ void base_provider::process_removed_directories( } } -void base_provider::process_removed_files( - std::deque &removed_list, const stop_type &stop_requested) { +void base_provider::process_removed_files(std::deque removed_list, + const stop_type &stop_requested) { REPERTORY_USES_FUNCTION_NAME(); auto orphaned_directory = @@ -550,6 +510,59 @@ void base_provider::process_removed_files( } } +void base_provider::process_removed_items(const stop_type &stop_requested) { + auto list = db3_->get_api_path_list(); + [[maybe_unused]] auto res = + std::all_of(list.begin(), list.end(), [&](auto &&api_path) -> bool { + if (stop_requested) { + return false; + } + + tasks::instance().schedule({ + [this, api_path](auto &&stop_requested2) { + api_meta_map meta{}; + if (get_item_meta(api_path, meta) != api_error::success) { + return; + } + + if (utils::string::to_bool(meta[META_DIRECTORY])) { + bool exists{}; + if (is_directory(api_path, exists) != api_error::success) { + return; + } + + if (exists) { + return; + } + + // process_removed_directories({ + // removed_item{api_path, true, ""}, + // }, stop_requested2); + + return; + } + + bool exists{}; + if (is_file(api_path, exists) != api_error::success) { + return; + } + + if (exists) { + return; + } + + process_removed_files( + { + removed_item{api_path, false, meta[META_SOURCE]}, + }, + stop_requested2); + }, + }); + + return true; + }); +} + void base_provider::remove_deleted_items(const stop_type &stop_requested) { add_all_items(stop_requested); if (stop_requested) { @@ -561,19 +574,7 @@ void base_provider::remove_deleted_items(const stop_type &stop_requested) { return; } - std::deque directories; - std::deque files; - get_removed_items(directories, files, stop_requested); - if (stop_requested) { - return; - } - - process_removed_files(files, stop_requested); - if (stop_requested) { - return; - } - - process_removed_directories(directories, stop_requested); + process_removed_items(stop_requested); } auto base_provider::remove_file(const std::string &api_path) -> api_error { diff --git a/repertory/librepertory/src/utils/tasks.cpp b/repertory/librepertory/src/utils/tasks.cpp new file mode 100644 index 00000000..7e564e52 --- /dev/null +++ b/repertory/librepertory/src/utils/tasks.cpp @@ -0,0 +1,118 @@ +/* + Copyright <2018-2024> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#include "utils/tasks.hpp" + +#include "app_config.hpp" + +namespace repertory { +tasks tasks::instance_; + +void tasks::schedule(task_item task) { + unique_mutex_lock lock(mutex_); + while (not stop_requested_ && tasks_.size() > (task_threads_.size() * 4U)) { + notify_.wait(lock); + } + + if (stop_requested_) { + notify_.notify_all(); + return; + } + + tasks_.push_back(std::move(task)); + notify_.notify_all(); +} + +void tasks::start(app_config *config) { + mutex_lock start_stop_lock(start_stop_mutex_); + if (not task_threads_.empty()) { + return; + } + + config_ = config; + stop_requested_ = false; + for (std::uint32_t idx = 0U; idx < std::thread::hardware_concurrency(); + ++idx) { + task_threads_.emplace_back( + std::make_unique([this]() { task_thread(); })); + } +} + +void tasks::stop() { + mutex_lock start_stop_lock(start_stop_mutex_); + if (task_threads_.empty()) { + return; + } + + stop_requested_ = true; + + unique_mutex_lock lock(mutex_); + notify_.notify_all(); + lock.unlock(); + + task_threads_.clear(); +} + +void tasks::task_thread() { + REPERTORY_USES_FUNCTION_NAME(); + + unique_mutex_lock lock(mutex_); + + const auto release = [&]() { + notify_.notify_all(); + lock.unlock(); + }; + + release(); + + while (not stop_requested_) { + lock.lock(); + + if (stop_requested_) { + release(); + return; + } + + if (tasks_.empty()) { + notify_.wait(lock); + if (stop_requested_) { + release(); + return; + } + } + + if (tasks_.empty()) { + release(); + continue; + } + + auto task = tasks_.front(); + tasks_.pop_front(); + release(); + + try { + task.action(stop_requested_); + } catch (const std::exception &e) { + utils::error::raise_error(function_name, e, "failed to execute task"); + } + } +} +} // namespace repertory diff --git a/repertory/repertory_test/src/file_manager_test.cpp b/repertory/repertory_test/src/file_manager_test.cpp index 3a8328d2..5e384240 100644 --- a/repertory/repertory_test/src/file_manager_test.cpp +++ b/repertory/repertory_test/src/file_manager_test.cpp @@ -37,6 +37,7 @@ #include "utils/path.hpp" #include "utils/polling.hpp" #include "utils/string.hpp" +#include "utils/tasks.hpp" #include "utils/time.hpp" #include "utils/utils.hpp" @@ -108,6 +109,7 @@ TEST_F(file_manager_test, can_create_and_close_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -209,6 +211,7 @@ TEST_F(file_manager_test, can_create_and_close_file) { mgr.stop(); polling::instance().stop(); + tasks::instance().stop(); } TEST_F(file_manager_test, can_open_and_close_file) { @@ -217,6 +220,8 @@ TEST_F(file_manager_test, can_open_and_close_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); + file_manager mgr(*cfg, mp); mgr.start(); @@ -316,12 +321,15 @@ TEST_F(file_manager_test, can_open_and_close_file) { mgr.stop(); polling::instance().stop(); + tasks::instance().stop(); } TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); + file_manager mgr(*cfg, mp); mgr.start(); @@ -377,6 +385,7 @@ TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count()); polling::instance().stop(); + tasks::instance().stop(); } TEST_F(file_manager_test, @@ -535,6 +544,8 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); + file_manager mgr(*cfg, mp); mgr.start(); @@ -638,6 +649,7 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { file.close(); polling::instance().stop(); + tasks::instance().stop(); } TEST_F(file_manager_test, can_evict_file) { @@ -1419,6 +1431,7 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { cfg->set_chunk_downloader_timeout_secs(3U); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); @@ -1501,6 +1514,7 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { mgr.stop(); polling::instance().stop(); + tasks::instance().stop(); } TEST_F(file_manager_test, remove_file_fails_if_file_does_not_exist) { @@ -1548,6 +1562,7 @@ TEST_F(file_manager_test, EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); + tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -1620,5 +1635,6 @@ TEST_F(file_manager_test, mgr.stop(); polling::instance().stop(); + tasks::instance().stop(); } } // namespace repertory