added task scheduler and refactored remove deleted items
This commit is contained in:
		| @@ -55,16 +55,14 @@ private: | |||||||
| private: | private: | ||||||
|   void add_all_items(const stop_type &stop_requested); |   void add_all_items(const stop_type &stop_requested); | ||||||
|  |  | ||||||
|   void get_removed_items(std::deque<removed_item> &directories, |   void process_removed_directories(std::deque<removed_item> removed_list, | ||||||
|                          std::deque<removed_item> &files, |  | ||||||
|                          const stop_type &stop_requested) const; |  | ||||||
|  |  | ||||||
|   void process_removed_directories(std::deque<removed_item> &removed_list, |  | ||||||
|                                    const stop_type &stop_requested); |                                    const stop_type &stop_requested); | ||||||
|  |  | ||||||
|   void process_removed_files(std::deque<removed_item> &removed_list, |   void process_removed_files(std::deque<removed_item> removed_list, | ||||||
|                              const stop_type &stop_requested); |                              const stop_type &stop_requested); | ||||||
|  |  | ||||||
|  |   void process_removed_items(const stop_type &stop_requested); | ||||||
|  |  | ||||||
|   void remove_deleted_items(const stop_type &stop_requested); |   void remove_deleted_items(const stop_type &stop_requested); | ||||||
|  |  | ||||||
|   void remove_unmatched_source_files(const stop_type &stop_requested); |   void remove_unmatched_source_files(const stop_type &stop_requested); | ||||||
|   | |||||||
| @@ -35,7 +35,7 @@ public: | |||||||
|     second, |     second, | ||||||
|   }; |   }; | ||||||
|  |  | ||||||
|   struct polling_item { |   struct polling_item final { | ||||||
|     std::string name; |     std::string name; | ||||||
|     frequency freq; |     frequency freq; | ||||||
|     std::function<void(const stop_type &stop_requested)> action; |     std::function<void(const stop_type &stop_requested)> action; | ||||||
|   | |||||||
							
								
								
									
										74
									
								
								repertory/librepertory/include/utils/tasks.hpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								repertory/librepertory/include/utils/tasks.hpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,74 @@ | |||||||
|  | /* | ||||||
|  |   Copyright <2018-2024> <scott.e.graves@protonmail.com> | ||||||
|  |  | ||||||
|  |   Permission is hereby granted, free of charge, to any person obtaining a copy | ||||||
|  |   of this software and associated documentation files (the "Software"), to deal | ||||||
|  |   in the Software without restriction, including without limitation the rights | ||||||
|  |   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||||||
|  |   copies of the Software, and to permit persons to whom the Software is | ||||||
|  |   furnished to do so, subject to the following conditions: | ||||||
|  |  | ||||||
|  |   The above copyright notice and this permission notice shall be included in all | ||||||
|  |   copies or substantial portions of the Software. | ||||||
|  |  | ||||||
|  |   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||||||
|  |   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||||||
|  |   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||||||
|  |   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||||||
|  |   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||||||
|  |   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||||||
|  |   SOFTWARE. | ||||||
|  | */ | ||||||
|  | #ifndef REPERTORY_INCLUDE_UTILS_TASKS_HPP_ | ||||||
|  | #define REPERTORY_INCLUDE_UTILS_TASKS_HPP_ | ||||||
|  |  | ||||||
|  | #include "types/repertory.hpp" | ||||||
|  |  | ||||||
|  | namespace repertory { | ||||||
|  | class app_config; | ||||||
|  |  | ||||||
|  | class tasks final { | ||||||
|  | public: | ||||||
|  |   struct task_item final { | ||||||
|  |     std::function<void(const stop_type &stop_requested)> action; | ||||||
|  |   }; | ||||||
|  |  | ||||||
|  | public: | ||||||
|  |   tasks(const tasks &) = delete; | ||||||
|  |   tasks(tasks &&) = delete; | ||||||
|  |   auto operator=(const tasks &) -> tasks & = delete; | ||||||
|  |   auto operator=(tasks &&) -> tasks & = delete; | ||||||
|  |  | ||||||
|  | private: | ||||||
|  |   tasks() = default; | ||||||
|  |  | ||||||
|  |   ~tasks() { stop(); } | ||||||
|  |  | ||||||
|  | private: | ||||||
|  |   static tasks instance_; | ||||||
|  |  | ||||||
|  | public: | ||||||
|  |   static auto instance() -> tasks & { return instance_; } | ||||||
|  |  | ||||||
|  | private: | ||||||
|  |   app_config *config_{nullptr}; | ||||||
|  |   std::mutex mutex_; | ||||||
|  |   std::condition_variable notify_; | ||||||
|  |   std::mutex start_stop_mutex_; | ||||||
|  |   stop_type stop_requested_{false}; | ||||||
|  |   std::vector<std::unique_ptr<std::jthread>> task_threads_; | ||||||
|  |   std::deque<task_item> tasks_; | ||||||
|  |  | ||||||
|  | private: | ||||||
|  |   void task_thread(); | ||||||
|  |  | ||||||
|  | public: | ||||||
|  |   void schedule(task_item task); | ||||||
|  |  | ||||||
|  |   void start(app_config *config); | ||||||
|  |  | ||||||
|  |   void stop(); | ||||||
|  | }; | ||||||
|  | } // namespace repertory | ||||||
|  |  | ||||||
|  | #endif // REPERTORY_INCLUDE_UTILS_TASKS_HPP_ | ||||||
| @@ -42,6 +42,7 @@ | |||||||
| #include "utils/common.hpp" | #include "utils/common.hpp" | ||||||
| #include "utils/error_utils.hpp" | #include "utils/error_utils.hpp" | ||||||
| #include "utils/polling.hpp" | #include "utils/polling.hpp" | ||||||
|  | #include "utils/tasks.hpp" | ||||||
| #include "utils/time.hpp" | #include "utils/time.hpp" | ||||||
| #include "utils/utils.hpp" | #include "utils/utils.hpp" | ||||||
|  |  | ||||||
| @@ -258,6 +259,7 @@ void fuse_drive::destroy_impl(void *ptr) { | |||||||
|   } |   } | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
|  |  | ||||||
|   if (eviction_) { |   if (eviction_) { | ||||||
|     eviction_->stop(); |     eviction_->stop(); | ||||||
| @@ -622,6 +624,7 @@ void *fuse_drive::init_impl(struct fuse_conn_info *conn) { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     polling::instance().start(&config_); |     polling::instance().start(&config_); | ||||||
|  |     tasks::instance().start(&config_); | ||||||
|  |  | ||||||
|     event_system::instance().raise<drive_mounted>(get_mount_location()); |     event_system::instance().raise<drive_mounted>(get_mount_location()); | ||||||
|   } catch (const std::exception &e) { |   } catch (const std::exception &e) { | ||||||
|   | |||||||
| @@ -38,6 +38,7 @@ | |||||||
| #include "utils/file_utils.hpp" | #include "utils/file_utils.hpp" | ||||||
| #include "utils/polling.hpp" | #include "utils/polling.hpp" | ||||||
| #include "utils/string.hpp" | #include "utils/string.hpp" | ||||||
|  | #include "utils/tasks.hpp" | ||||||
| #include "utils/time.hpp" | #include "utils/time.hpp" | ||||||
| #include "utils/utils.hpp" | #include "utils/utils.hpp" | ||||||
|  |  | ||||||
| @@ -652,6 +653,8 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     polling::instance().start(&config_); |     polling::instance().start(&config_); | ||||||
|  |     tasks::instance().start(&config_); | ||||||
|  |  | ||||||
|     event_system::instance().raise<drive_mounted>(mount_location); |     event_system::instance().raise<drive_mounted>(mount_location); | ||||||
|   } catch (const std::exception &e) { |   } catch (const std::exception &e) { | ||||||
|     utils::error::raise_error(function_name, e, "exception occurred"); |     utils::error::raise_error(function_name, e, "exception occurred"); | ||||||
| @@ -660,6 +663,7 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { | |||||||
|     } |     } | ||||||
|     server_->stop(); |     server_->stop(); | ||||||
|     polling::instance().stop(); |     polling::instance().stop(); | ||||||
|  |     tasks::instance().stop(); | ||||||
|     if (eviction_) { |     if (eviction_) { | ||||||
|       eviction_->stop(); |       eviction_->stop(); | ||||||
|     } |     } | ||||||
| @@ -1172,6 +1176,7 @@ VOID winfsp_drive::Unmounted(PVOID host) { | |||||||
|   } |   } | ||||||
|   server_->stop(); |   server_->stop(); | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
|   if (eviction_) { |   if (eviction_) { | ||||||
|     eviction_->stop(); |     eviction_->stop(); | ||||||
|   } |   } | ||||||
|   | |||||||
| @@ -19,8 +19,6 @@ | |||||||
|   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||||||
|   SOFTWARE. |   SOFTWARE. | ||||||
| */ | */ | ||||||
| #include <execution> |  | ||||||
|  |  | ||||||
| #include "providers/base_provider.hpp" | #include "providers/base_provider.hpp" | ||||||
|  |  | ||||||
| #include "app_config.hpp" | #include "app_config.hpp" | ||||||
| @@ -32,6 +30,7 @@ | |||||||
| #include "utils/file_utils.hpp" | #include "utils/file_utils.hpp" | ||||||
| #include "utils/path.hpp" | #include "utils/path.hpp" | ||||||
| #include "utils/polling.hpp" | #include "utils/polling.hpp" | ||||||
|  | #include "utils/tasks.hpp" | ||||||
| #include "utils/time.hpp" | #include "utils/time.hpp" | ||||||
|  |  | ||||||
| namespace repertory { | namespace repertory { | ||||||
| @@ -50,8 +49,8 @@ void base_provider::add_all_items(const stop_type &stop_requested) { | |||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::create_api_file(std::string path, std::string key, | auto base_provider::create_api_file(std::string path, std::string key, | ||||||
|                                     std::uint64_t size, |                                     std::uint64_t size, std::uint64_t file_time) | ||||||
|                                     std::uint64_t file_time) -> api_file { |     -> api_file { | ||||||
|   api_file file{}; |   api_file file{}; | ||||||
|   file.api_path = utils::path::create_api_path(path); |   file.api_path = utils::path::create_api_path(path); | ||||||
|   file.api_parent = utils::path::get_parent_api_path(file.api_path); |   file.api_parent = utils::path::get_parent_api_path(file.api_path); | ||||||
| @@ -83,8 +82,8 @@ auto base_provider::create_api_file(std::string path, std::uint64_t size, | |||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::create_directory_clone_source_meta( | auto base_provider::create_directory_clone_source_meta( | ||||||
|     const std::string &source_api_path, |     const std::string &source_api_path, const std::string &api_path) | ||||||
|     const std::string &api_path) -> api_error { |     -> api_error { | ||||||
|   REPERTORY_USES_FUNCTION_NAME(); |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|   bool exists{}; |   bool exists{}; | ||||||
| @@ -181,8 +180,8 @@ auto base_provider::create_directory(const std::string &api_path, | |||||||
|   return set_item_meta(api_path, meta); |   return set_item_meta(api_path, meta); | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::create_file(const std::string &api_path, | auto base_provider::create_file(const std::string &api_path, api_meta_map &meta) | ||||||
|                                 api_meta_map &meta) -> api_error { |     -> api_error { | ||||||
|   REPERTORY_USES_FUNCTION_NAME(); |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|   bool exists{}; |   bool exists{}; | ||||||
| @@ -239,8 +238,9 @@ auto base_provider::create_file(const std::string &api_path, | |||||||
|   return api_error::error; |   return api_error::error; | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::get_api_path_from_source( | auto base_provider::get_api_path_from_source(const std::string &source_path, | ||||||
|     const std::string &source_path, std::string &api_path) const -> api_error { |                                              std::string &api_path) const | ||||||
|  |     -> api_error { | ||||||
|   REPERTORY_USES_FUNCTION_NAME(); |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|   if (source_path.empty()) { |   if (source_path.empty()) { | ||||||
| @@ -253,8 +253,9 @@ auto base_provider::get_api_path_from_source( | |||||||
|   return db3_->get_api_path(source_path, api_path); |   return db3_->get_api_path(source_path, api_path); | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::get_directory_items( | auto base_provider::get_directory_items(const std::string &api_path, | ||||||
|     const std::string &api_path, directory_item_list &list) const -> api_error { |                                         directory_item_list &list) const | ||||||
|  |     -> api_error { | ||||||
|   REPERTORY_USES_FUNCTION_NAME(); |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|   bool exists{}; |   bool exists{}; | ||||||
| @@ -318,9 +319,10 @@ auto base_provider::get_file_size(const std::string &api_path, | |||||||
|   return api_error::success; |   return api_error::success; | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::get_filesystem_item( | auto base_provider::get_filesystem_item(const std::string &api_path, | ||||||
|     const std::string &api_path, bool directory, |                                         bool directory, | ||||||
|     filesystem_item &fsi) const -> api_error { |                                         filesystem_item &fsi) const | ||||||
|  |     -> api_error { | ||||||
|   bool exists{}; |   bool exists{}; | ||||||
|   auto res = is_directory(api_path, exists); |   auto res = is_directory(api_path, exists); | ||||||
|   if (res != api_error::success) { |   if (res != api_error::success) { | ||||||
| @@ -353,9 +355,10 @@ auto base_provider::get_filesystem_item( | |||||||
|   return api_error::success; |   return api_error::success; | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::get_filesystem_item_and_file( | auto base_provider::get_filesystem_item_and_file(const std::string &api_path, | ||||||
|     const std::string &api_path, api_file &file, |                                                  api_file &file, | ||||||
|     filesystem_item &fsi) const -> api_error { |                                                  filesystem_item &fsi) const | ||||||
|  |     -> api_error { | ||||||
|   auto res = get_file(api_path, file); |   auto res = get_file(api_path, file); | ||||||
|   if (res != api_error::success) { |   if (res != api_error::success) { | ||||||
|     return res; |     return res; | ||||||
| @@ -411,49 +414,6 @@ auto base_provider::get_pinned_files() const -> std::vector<std::string> { | |||||||
|   return db3_->get_pinned_files(); |   return db3_->get_pinned_files(); | ||||||
| } | } | ||||||
|  |  | ||||||
| void base_provider::get_removed_items(std::deque<removed_item> &directories, |  | ||||||
|                                       std::deque<removed_item> &files, |  | ||||||
|                                       const stop_type &stop_requested) const { |  | ||||||
|   auto list = db3_->get_api_path_list(); |  | ||||||
|   std::all_of(std::execution::par, list.begin(), list.end(), |  | ||||||
|               [&](auto &&api_path) -> bool { |  | ||||||
|                 if (stop_requested) { |  | ||||||
|                   return false; |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 api_meta_map meta{}; |  | ||||||
|                 if (get_item_meta(api_path, meta) != api_error::success) { |  | ||||||
|                   return true; |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 if (utils::string::to_bool(meta[META_DIRECTORY])) { |  | ||||||
|                   bool exists{}; |  | ||||||
|                   if (is_directory(api_path, exists) != api_error::success) { |  | ||||||
|                     return true; |  | ||||||
|                   } |  | ||||||
|  |  | ||||||
|                   if (not exists) { |  | ||||||
|                     directories.emplace_back(removed_item{api_path, true, ""}); |  | ||||||
|                   } |  | ||||||
|  |  | ||||||
|                   return true; |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 bool exists{}; |  | ||||||
|                 if (is_file(api_path, exists) != api_error::success) { |  | ||||||
|                   return true; |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 if (exists) { |  | ||||||
|                   return true; |  | ||||||
|                 } |  | ||||||
|  |  | ||||||
|                 files.emplace_back( |  | ||||||
|                     removed_item{api_path, false, meta[META_SOURCE]}); |  | ||||||
|                 return true; |  | ||||||
|               }); |  | ||||||
| } |  | ||||||
|  |  | ||||||
| auto base_provider::get_total_item_count() const -> std::uint64_t { | auto base_provider::get_total_item_count() const -> std::uint64_t { | ||||||
|   return db3_->get_total_item_count(); |   return db3_->get_total_item_count(); | ||||||
| } | } | ||||||
| @@ -474,7 +434,7 @@ auto base_provider::is_file_writeable(const std::string &api_path) const | |||||||
| } | } | ||||||
|  |  | ||||||
| void base_provider::process_removed_directories( | void base_provider::process_removed_directories( | ||||||
|     std::deque<removed_item> &removed_list, const stop_type &stop_requested) { |     std::deque<removed_item> removed_list, const stop_type &stop_requested) { | ||||||
|   for (auto &&item : removed_list) { |   for (auto &&item : removed_list) { | ||||||
|     if (stop_requested) { |     if (stop_requested) { | ||||||
|       return; |       return; | ||||||
| @@ -490,8 +450,8 @@ void base_provider::process_removed_directories( | |||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
| void base_provider::process_removed_files( | void base_provider::process_removed_files(std::deque<removed_item> removed_list, | ||||||
|     std::deque<removed_item> &removed_list, const stop_type &stop_requested) { |                                           const stop_type &stop_requested) { | ||||||
|   REPERTORY_USES_FUNCTION_NAME(); |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|   auto orphaned_directory = |   auto orphaned_directory = | ||||||
| @@ -550,6 +510,59 @@ void base_provider::process_removed_files( | |||||||
|   } |   } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | void base_provider::process_removed_items(const stop_type &stop_requested) { | ||||||
|  |   auto list = db3_->get_api_path_list(); | ||||||
|  |   [[maybe_unused]] auto res = | ||||||
|  |       std::all_of(list.begin(), list.end(), [&](auto &&api_path) -> bool { | ||||||
|  |         if (stop_requested) { | ||||||
|  |           return false; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         tasks::instance().schedule({ | ||||||
|  |             [this, api_path](auto &&stop_requested2) { | ||||||
|  |               api_meta_map meta{}; | ||||||
|  |               if (get_item_meta(api_path, meta) != api_error::success) { | ||||||
|  |                 return; | ||||||
|  |               } | ||||||
|  |  | ||||||
|  |               if (utils::string::to_bool(meta[META_DIRECTORY])) { | ||||||
|  |                 bool exists{}; | ||||||
|  |                 if (is_directory(api_path, exists) != api_error::success) { | ||||||
|  |                   return; | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 if (exists) { | ||||||
|  |                   return; | ||||||
|  |                 } | ||||||
|  |  | ||||||
|  |                 // process_removed_directories({ | ||||||
|  |                 //     removed_item{api_path, true, ""}, | ||||||
|  |                 // }, stop_requested2); | ||||||
|  |  | ||||||
|  |                 return; | ||||||
|  |               } | ||||||
|  |  | ||||||
|  |               bool exists{}; | ||||||
|  |               if (is_file(api_path, exists) != api_error::success) { | ||||||
|  |                 return; | ||||||
|  |               } | ||||||
|  |  | ||||||
|  |               if (exists) { | ||||||
|  |                 return; | ||||||
|  |               } | ||||||
|  |  | ||||||
|  |               process_removed_files( | ||||||
|  |                   { | ||||||
|  |                       removed_item{api_path, false, meta[META_SOURCE]}, | ||||||
|  |                   }, | ||||||
|  |                   stop_requested2); | ||||||
|  |             }, | ||||||
|  |         }); | ||||||
|  |  | ||||||
|  |         return true; | ||||||
|  |       }); | ||||||
|  | } | ||||||
|  |  | ||||||
| void base_provider::remove_deleted_items(const stop_type &stop_requested) { | void base_provider::remove_deleted_items(const stop_type &stop_requested) { | ||||||
|   add_all_items(stop_requested); |   add_all_items(stop_requested); | ||||||
|   if (stop_requested) { |   if (stop_requested) { | ||||||
| @@ -561,19 +574,7 @@ void base_provider::remove_deleted_items(const stop_type &stop_requested) { | |||||||
|     return; |     return; | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   std::deque<removed_item> directories; |   process_removed_items(stop_requested); | ||||||
|   std::deque<removed_item> files; |  | ||||||
|   get_removed_items(directories, files, stop_requested); |  | ||||||
|   if (stop_requested) { |  | ||||||
|     return; |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   process_removed_files(files, stop_requested); |  | ||||||
|   if (stop_requested) { |  | ||||||
|     return; |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   process_removed_directories(directories, stop_requested); |  | ||||||
| } | } | ||||||
|  |  | ||||||
| auto base_provider::remove_file(const std::string &api_path) -> api_error { | auto base_provider::remove_file(const std::string &api_path) -> api_error { | ||||||
|   | |||||||
							
								
								
									
										118
									
								
								repertory/librepertory/src/utils/tasks.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										118
									
								
								repertory/librepertory/src/utils/tasks.cpp
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,118 @@ | |||||||
|  | /* | ||||||
|  |   Copyright <2018-2024> <scott.e.graves@protonmail.com> | ||||||
|  |  | ||||||
|  |   Permission is hereby granted, free of charge, to any person obtaining a copy | ||||||
|  |   of this software and associated documentation files (the "Software"), to deal | ||||||
|  |   in the Software without restriction, including without limitation the rights | ||||||
|  |   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||||||
|  |   copies of the Software, and to permit persons to whom the Software is | ||||||
|  |   furnished to do so, subject to the following conditions: | ||||||
|  |  | ||||||
|  |   The above copyright notice and this permission notice shall be included in all | ||||||
|  |   copies or substantial portions of the Software. | ||||||
|  |  | ||||||
|  |   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||||||
|  |   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||||||
|  |   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||||||
|  |   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||||||
|  |   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||||||
|  |   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||||||
|  |   SOFTWARE. | ||||||
|  | */ | ||||||
|  | #include "utils/tasks.hpp" | ||||||
|  |  | ||||||
|  | #include "app_config.hpp" | ||||||
|  |  | ||||||
|  | namespace repertory { | ||||||
|  | tasks tasks::instance_; | ||||||
|  |  | ||||||
|  | void tasks::schedule(task_item task) { | ||||||
|  |   unique_mutex_lock lock(mutex_); | ||||||
|  |   while (not stop_requested_ && tasks_.size() > (task_threads_.size() * 4U)) { | ||||||
|  |     notify_.wait(lock); | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   if (stop_requested_) { | ||||||
|  |     notify_.notify_all(); | ||||||
|  |     return; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   tasks_.push_back(std::move(task)); | ||||||
|  |   notify_.notify_all(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void tasks::start(app_config *config) { | ||||||
|  |   mutex_lock start_stop_lock(start_stop_mutex_); | ||||||
|  |   if (not task_threads_.empty()) { | ||||||
|  |     return; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   config_ = config; | ||||||
|  |   stop_requested_ = false; | ||||||
|  |   for (std::uint32_t idx = 0U; idx < std::thread::hardware_concurrency(); | ||||||
|  |        ++idx) { | ||||||
|  |     task_threads_.emplace_back( | ||||||
|  |         std::make_unique<std::jthread>([this]() { task_thread(); })); | ||||||
|  |   } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void tasks::stop() { | ||||||
|  |   mutex_lock start_stop_lock(start_stop_mutex_); | ||||||
|  |   if (task_threads_.empty()) { | ||||||
|  |     return; | ||||||
|  |   } | ||||||
|  |  | ||||||
|  |   stop_requested_ = true; | ||||||
|  |  | ||||||
|  |   unique_mutex_lock lock(mutex_); | ||||||
|  |   notify_.notify_all(); | ||||||
|  |   lock.unlock(); | ||||||
|  |  | ||||||
|  |   task_threads_.clear(); | ||||||
|  | } | ||||||
|  |  | ||||||
|  | void tasks::task_thread() { | ||||||
|  |   REPERTORY_USES_FUNCTION_NAME(); | ||||||
|  |  | ||||||
|  |   unique_mutex_lock lock(mutex_); | ||||||
|  |  | ||||||
|  |   const auto release = [&]() { | ||||||
|  |     notify_.notify_all(); | ||||||
|  |     lock.unlock(); | ||||||
|  |   }; | ||||||
|  |  | ||||||
|  |   release(); | ||||||
|  |  | ||||||
|  |   while (not stop_requested_) { | ||||||
|  |     lock.lock(); | ||||||
|  |  | ||||||
|  |     if (stop_requested_) { | ||||||
|  |       release(); | ||||||
|  |       return; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     if (tasks_.empty()) { | ||||||
|  |       notify_.wait(lock); | ||||||
|  |       if (stop_requested_) { | ||||||
|  |         release(); | ||||||
|  |         return; | ||||||
|  |       } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     if (tasks_.empty()) { | ||||||
|  |       release(); | ||||||
|  |       continue; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     auto task = tasks_.front(); | ||||||
|  |     tasks_.pop_front(); | ||||||
|  |     release(); | ||||||
|  |  | ||||||
|  |     try { | ||||||
|  |       task.action(stop_requested_); | ||||||
|  |     } catch (const std::exception &e) { | ||||||
|  |       utils::error::raise_error(function_name, e, "failed to execute task"); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | } // namespace repertory | ||||||
| @@ -37,6 +37,7 @@ | |||||||
| #include "utils/path.hpp" | #include "utils/path.hpp" | ||||||
| #include "utils/polling.hpp" | #include "utils/polling.hpp" | ||||||
| #include "utils/string.hpp" | #include "utils/string.hpp" | ||||||
|  | #include "utils/tasks.hpp" | ||||||
| #include "utils/time.hpp" | #include "utils/time.hpp" | ||||||
| #include "utils/utils.hpp" | #include "utils/utils.hpp" | ||||||
|  |  | ||||||
| @@ -108,6 +109,7 @@ TEST_F(file_manager_test, can_create_and_close_file) { | |||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   file_manager mgr(*cfg, mp); |   file_manager mgr(*cfg, mp); | ||||||
|   mgr.start(); |   mgr.start(); | ||||||
| @@ -209,6 +211,7 @@ TEST_F(file_manager_test, can_create_and_close_file) { | |||||||
|   mgr.stop(); |   mgr.stop(); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
|  |  | ||||||
| TEST_F(file_manager_test, can_open_and_close_file) { | TEST_F(file_manager_test, can_open_and_close_file) { | ||||||
| @@ -217,6 +220,8 @@ TEST_F(file_manager_test, can_open_and_close_file) { | |||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   file_manager mgr(*cfg, mp); |   file_manager mgr(*cfg, mp); | ||||||
|   mgr.start(); |   mgr.start(); | ||||||
|  |  | ||||||
| @@ -316,12 +321,15 @@ TEST_F(file_manager_test, can_open_and_close_file) { | |||||||
|   mgr.stop(); |   mgr.stop(); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
|  |  | ||||||
| TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { | TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { | ||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   file_manager mgr(*cfg, mp); |   file_manager mgr(*cfg, mp); | ||||||
|   mgr.start(); |   mgr.start(); | ||||||
|  |  | ||||||
| @@ -377,6 +385,7 @@ TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { | |||||||
|   EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count()); |   EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count()); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
|  |  | ||||||
| TEST_F(file_manager_test, | TEST_F(file_manager_test, | ||||||
| @@ -535,6 +544,8 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { | |||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   file_manager mgr(*cfg, mp); |   file_manager mgr(*cfg, mp); | ||||||
|   mgr.start(); |   mgr.start(); | ||||||
|  |  | ||||||
| @@ -638,6 +649,7 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { | |||||||
|   file.close(); |   file.close(); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
|  |  | ||||||
| TEST_F(file_manager_test, can_evict_file) { | TEST_F(file_manager_test, can_evict_file) { | ||||||
| @@ -1419,6 +1431,7 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { | |||||||
|   cfg->set_chunk_downloader_timeout_secs(3U); |   cfg->set_chunk_downloader_timeout_secs(3U); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
| @@ -1501,6 +1514,7 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { | |||||||
|   mgr.stop(); |   mgr.stop(); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
|  |  | ||||||
| TEST_F(file_manager_test, remove_file_fails_if_file_does_not_exist) { | TEST_F(file_manager_test, remove_file_fails_if_file_does_not_exist) { | ||||||
| @@ -1548,6 +1562,7 @@ TEST_F(file_manager_test, | |||||||
|   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); |   EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); | ||||||
|  |  | ||||||
|   polling::instance().start(cfg.get()); |   polling::instance().start(cfg.get()); | ||||||
|  |   tasks::instance().start(cfg.get()); | ||||||
|  |  | ||||||
|   file_manager mgr(*cfg, mp); |   file_manager mgr(*cfg, mp); | ||||||
|   mgr.start(); |   mgr.start(); | ||||||
| @@ -1620,5 +1635,6 @@ TEST_F(file_manager_test, | |||||||
|   mgr.stop(); |   mgr.stop(); | ||||||
|  |  | ||||||
|   polling::instance().stop(); |   polling::instance().stop(); | ||||||
|  |   tasks::instance().stop(); | ||||||
| } | } | ||||||
| } // namespace repertory | } // namespace repertory | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user