diff --git a/repertory/librepertory/include/utils/tasks.hpp b/repertory/librepertory/include/utils/tasks.hpp index 51de1636..5edbcf1b 100644 --- a/repertory/librepertory/include/utils/tasks.hpp +++ b/repertory/librepertory/include/utils/tasks.hpp @@ -22,7 +22,9 @@ #ifndef REPERTORY_INCLUDE_UTILS_TASKS_HPP_ #define REPERTORY_INCLUDE_UTILS_TASKS_HPP_ +#include "common.hpp" #include "types/repertory.hpp" +#include namespace repertory { class app_config; @@ -31,9 +33,43 @@ class tasks final { public: static constexpr const auto default_delay_ms{10U}; - struct task_item final { - std::function action; - std::uint16_t delay_ms{default_delay_ms}; + struct task final { + std::function action; + }; + + class i_task { + INTERFACE_SETUP(i_task); + + public: + virtual auto wait() const -> bool = 0; + }; + + using task_ptr = std::shared_ptr; + +private: + class task_wait : public i_task { + private: + bool complete{false}; + mutable std::mutex mtx; + mutable std::condition_variable notify; + bool success{false}; + + public: + void set_result(bool result); + + auto wait() const -> bool override; + }; + + struct scheduled_task final { + task item; + + std::uint16_t delay_ms{ + default_delay_ms, + }; + + std::shared_ptr wait{ + std::make_shared(), + }; }; public: @@ -61,13 +97,13 @@ private: std::mutex start_stop_mutex_; stop_type stop_requested_{false}; std::vector> task_threads_; - std::deque tasks_; + std::deque tasks_; private: void task_thread(); public: - void schedule(task_item task); + auto schedule(task item) -> task_ptr; void start(app_config *config); diff --git a/repertory/librepertory/src/drives/fuse/fuse_drive.cpp b/repertory/librepertory/src/drives/fuse/fuse_drive.cpp index cc98161b..5ebf021e 100644 --- a/repertory/librepertory/src/drives/fuse/fuse_drive.cpp +++ b/repertory/librepertory/src/drives/fuse/fuse_drive.cpp @@ -42,7 +42,6 @@ #include "utils/common.hpp" #include "utils/error_utils.hpp" #include "utils/polling.hpp" -#include "utils/tasks.hpp" #include "utils/time.hpp" #include "utils/utils.hpp" @@ -82,8 +81,8 @@ auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid, struct fuse_file_info * /*file_info*/) -> api_error { #else -auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid) - -> api_error { +auto fuse_drive::chown_impl(std::string api_path, uid_t uid, + gid_t gid) -> api_error { #endif return check_and_perform( api_path, X_OK, [&](api_meta_map &meta) -> api_error { @@ -259,7 +258,6 @@ void fuse_drive::destroy_impl(void *ptr) { } polling::instance().stop(); - tasks::instance().stop(); if (eviction_) { eviction_->stop(); @@ -483,8 +481,8 @@ auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st, struct fuse_file_info * /*file_info*/) -> api_error { #else -auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st) - -> api_error { +auto fuse_drive::getattr_impl(std::string api_path, + struct stat *unix_st) -> api_error { #endif auto parent = utils::path::get_parent_api_path(api_path); @@ -567,8 +565,8 @@ auto fuse_drive::getxtimes_impl(std::string api_path, struct timespec *bkuptime, #endif // __APPLE__ #if FUSE_USE_VERSION >= 30 -auto fuse_drive::init_impl(struct fuse_conn_info *conn, struct fuse_config *cfg) - -> void * { +auto fuse_drive::init_impl(struct fuse_conn_info *conn, + struct fuse_config *cfg) -> void * { #else void *fuse_drive::init_impl(struct fuse_conn_info *conn) { #endif @@ -624,7 +622,6 @@ void *fuse_drive::init_impl(struct fuse_conn_info *conn) { } polling::instance().start(&config_); - tasks::instance().start(&config_); event_system::instance().raise(get_mount_location()); } catch (const std::exception &e) { @@ -803,9 +800,8 @@ auto fuse_drive::release_impl(std::string /*api_path*/, return api_error::success; } -auto fuse_drive::releasedir_impl(std::string /*api_path*/, - struct fuse_file_info *file_info) - -> api_error { +auto fuse_drive::releasedir_impl( + std::string /*api_path*/, struct fuse_file_info *file_info) -> api_error { auto iter = directory_cache_->get_directory(file_info->fh); if (iter == nullptr) { return api_error::invalid_handle; @@ -823,8 +819,8 @@ auto fuse_drive::rename_directory(const std::string &from_api_path, } auto fuse_drive::rename_file(const std::string &from_api_path, - const std::string &to_api_path, bool overwrite) - -> int { + const std::string &to_api_path, + bool overwrite) -> int { auto res = fm_->rename_file(from_api_path, to_api_path, overwrite); errno = std::abs(utils::from_api_error(res)); return (res == api_error::success) ? 0 : -1; @@ -834,8 +830,8 @@ auto fuse_drive::rename_file(const std::string &from_api_path, auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path, unsigned int /*flags*/) -> api_error { #else -auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path) - -> api_error { +auto fuse_drive::rename_impl(std::string from_api_path, + std::string to_api_path) -> api_error { #endif auto res = check_parent_access(to_api_path, W_OK | X_OK); if (res != api_error::success) { @@ -949,15 +945,15 @@ auto fuse_drive::getxattr_impl(std::string api_path, const char *name, } #else // __APPLE__ auto fuse_drive::getxattr_impl(std::string api_path, const char *name, - char *value, size_t size, int &attribute_size) - -> api_error { + char *value, size_t size, + int &attribute_size) -> api_error { return getxattr_common(api_path, name, value, size, attribute_size, nullptr); } #endif // __APPLE__ auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size, - int &required_size, bool &return_size) - -> api_error { + int &required_size, + bool &return_size) -> api_error { auto check_size = (size == 0); auto res = check_parent_access(api_path, X_OK); @@ -997,8 +993,8 @@ auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size, return res; } -auto fuse_drive::removexattr_impl(std::string api_path, const char *name) - -> api_error { +auto fuse_drive::removexattr_impl(std::string api_path, + const char *name) -> api_error { std::string attribute_name; #if defined(__APPLE__) auto res = parse_xattr_parameters(name, 0, attribute_name, api_path); @@ -1026,8 +1022,8 @@ auto fuse_drive::setxattr_impl(std::string api_path, const char *name, uint32_t position) -> api_error { #else // __APPLE__ auto fuse_drive::setxattr_impl(std::string api_path, const char *name, - const char *value, size_t size, int flags) - -> api_error { + const char *value, size_t size, + int flags) -> api_error { #endif std::string attribute_name; #if defined(__APPLE__) @@ -1105,8 +1101,8 @@ void fuse_drive::set_item_meta(const std::string &api_path, } #if defined(__APPLE__) -auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr) - -> api_error { +auto fuse_drive::setattr_x_impl(std::string api_path, + struct setattr_x *attr) -> api_error { bool exists{}; auto res = provider_.is_file(api_path, exists); if (res != api_error::success) { @@ -1160,7 +1156,7 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr) ts[0].tv_sec = attr->acctime.tv_sec; ts[0].tv_nsec = attr->acctime.tv_nsec; } else { - struct timeval tv{}; + struct timeval tv {}; gettimeofday(&tv, NULL); ts[0].tv_sec = tv.tv_sec; ts[0].tv_nsec = tv.tv_usec * 1000; @@ -1205,9 +1201,8 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr) return api_error::success; } -auto fuse_drive::setbkuptime_impl(std::string api_path, - const struct timespec *bkuptime) - -> api_error { +auto fuse_drive::setbkuptime_impl( + std::string api_path, const struct timespec *bkuptime) -> api_error { return check_and_perform( api_path, X_OK, [&](api_meta_map &meta) -> api_error { auto nanos = bkuptime->tv_nsec + @@ -1243,8 +1238,8 @@ auto fuse_drive::setvolname_impl(const char * /*volname*/) -> api_error { return api_error::success; } -auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf) - -> api_error { +auto fuse_drive::statfs_x_impl(std::string /*api_path*/, + struct statfs *stbuf) -> api_error { if (statfs(&config_.get_cache_directory()[0], stbuf) != 0) { return api_error::os_error; } @@ -1269,8 +1264,8 @@ auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf) return api_error::success; } #else // __APPLE__ -auto fuse_drive::statfs_impl(std::string /*api_path*/, struct statvfs *stbuf) - -> api_error { +auto fuse_drive::statfs_impl(std::string /*api_path*/, + struct statvfs *stbuf) -> api_error { if (statvfs(config_.get_cache_directory().data(), stbuf) != 0) { return api_error::os_error; } @@ -1350,8 +1345,8 @@ auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2], struct fuse_file_info * /*file_info*/) -> api_error { #else -auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2]) - -> api_error { +auto fuse_drive::utimens_impl(std::string api_path, + const struct timespec tv[2]) -> api_error { #endif api_meta_map meta; auto res = provider_.get_item_meta(api_path, meta); diff --git a/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp b/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp index 5b6e2266..80e9694e 100644 --- a/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp +++ b/repertory/librepertory/src/drives/winfsp/winfsp_drive.cpp @@ -38,7 +38,6 @@ #include "utils/file_utils.hpp" #include "utils/polling.hpp" #include "utils/string.hpp" -#include "utils/tasks.hpp" #include "utils/time.hpp" #include "utils/utils.hpp" @@ -90,8 +89,8 @@ auto winfsp_drive::handle_error(std::string_view function_name, return ret; } -auto winfsp_drive::winfsp_service::OnStart(ULONG /*Argc*/, PWSTR * /*Argv*/) - -> NTSTATUS { +auto winfsp_drive::winfsp_service::OnStart(ULONG /*Argc*/, + PWSTR * /*Argv*/) -> NTSTATUS { REPERTORY_USES_FUNCTION_NAME(); auto mount_location = utils::string::to_lower( @@ -458,10 +457,9 @@ auto winfsp_drive::get_item_meta(const std::string &api_path, return ret; } -auto winfsp_drive::get_security_by_name(PWSTR file_name, PUINT32 attributes, - PSECURITY_DESCRIPTOR descriptor, - std::uint64_t *descriptor_size) - -> NTSTATUS { +auto winfsp_drive::get_security_by_name( + PWSTR file_name, PUINT32 attributes, PSECURITY_DESCRIPTOR descriptor, + std::uint64_t *descriptor_size) -> NTSTATUS { auto api_path = utils::path::create_api_path(utils::string::to_utf8(file_name)); @@ -653,7 +651,6 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { } polling::instance().start(&config_); - tasks::instance().start(&config_); event_system::instance().raise(mount_location); } catch (const std::exception &e) { @@ -663,7 +660,6 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS { } server_->stop(); polling::instance().stop(); - tasks::instance().stop(); if (eviction_) { eviction_->stop(); } @@ -724,8 +720,8 @@ auto winfsp_drive::Open(PWSTR file_name, UINT32 create_options, auto winfsp_drive::Overwrite(PVOID /*file_node*/, PVOID file_desc, UINT32 attributes, BOOLEAN replace_attributes, - UINT64 /*allocation_size*/, FileInfo *file_info) - -> NTSTATUS { + UINT64 /*allocation_size*/, + FileInfo *file_info) -> NTSTATUS { REPERTORY_USES_FUNCTION_NAME(); std::string api_path; @@ -831,8 +827,8 @@ void winfsp_drive::populate_file_info(std::uint64_t file_size, } auto winfsp_drive::Read(PVOID /*file_node*/, PVOID file_desc, PVOID buffer, - UINT64 offset, ULONG length, PULONG bytes_transferred) - -> NTSTATUS { + UINT64 offset, ULONG length, + PULONG bytes_transferred) -> NTSTATUS { REPERTORY_USES_FUNCTION_NAME(); *bytes_transferred = 0U; @@ -887,8 +883,8 @@ auto winfsp_drive::Read(PVOID /*file_node*/, PVOID file_desc, PVOID buffer, auto winfsp_drive::ReadDirectory(PVOID /*file_node*/, PVOID file_desc, PWSTR /*pattern*/, PWSTR marker, PVOID buffer, - ULONG buffer_length, PULONG bytes_transferred) - -> NTSTATUS { + ULONG buffer_length, + PULONG bytes_transferred) -> NTSTATUS { REPERTORY_USES_FUNCTION_NAME(); std::string api_path; @@ -1050,8 +1046,8 @@ auto winfsp_drive::Rename(PVOID /*file_node*/, PVOID /*file_desc*/, auto winfsp_drive::SetBasicInfo(PVOID /*file_node*/, PVOID file_desc, UINT32 attributes, UINT64 creation_time, UINT64 last_access_time, UINT64 last_write_time, - UINT64 change_time, FileInfo *file_info) - -> NTSTATUS { + UINT64 change_time, + FileInfo *file_info) -> NTSTATUS { REPERTORY_USES_FUNCTION_NAME(); std::string api_path; @@ -1176,7 +1172,6 @@ VOID winfsp_drive::Unmounted(PVOID host) { } server_->stop(); polling::instance().stop(); - tasks::instance().stop(); if (eviction_) { eviction_->stop(); } diff --git a/repertory/librepertory/src/utils/polling.cpp b/repertory/librepertory/src/utils/polling.cpp index ca8ecc79..1d857dfb 100644 --- a/repertory/librepertory/src/utils/polling.cpp +++ b/repertory/librepertory/src/utils/polling.cpp @@ -22,6 +22,7 @@ #include "utils/polling.hpp" #include "app_config.hpp" +#include "utils/tasks.hpp" namespace repertory { polling polling::instance_; @@ -31,31 +32,34 @@ void polling::frequency_thread( while (not stop_requested_) { unique_mutex_lock lock(mutex_); auto futures = std::accumulate( - items_.begin(), items_.end(), std::deque>{}, + items_.begin(), items_.end(), std::deque{}, [this, &freq](auto &&list, auto &&item) { if (item.second.freq != freq) { return list; } - list.emplace_back( - std::async(std::launch::async, [this, &freq, item]() -> void { + auto future = tasks::instance().schedule({ + [this, &freq, item](auto &&task_stopped) { if (config_->get_event_level() == event_level::trace || freq != frequency::second) { event_system::instance().raise( item.first); } - item.second.action(stop_requested_); + item.second.action(task_stopped); if (config_->get_event_level() == event_level::trace || freq != frequency::second) { event_system::instance().raise(item.first); } - })); + }, + }); + + list.emplace_back(future); return list; }); lock.unlock(); while (not futures.empty()) { - futures.front().wait(); + futures.front()->wait(); futures.pop_front(); } @@ -97,6 +101,7 @@ void polling::start(app_config *config) { }, frequency::high); }); + frequency_threads_.at(idx++) = std::make_unique([this]() -> void { this->frequency_thread( @@ -105,11 +110,14 @@ void polling::start(app_config *config) { }, frequency::low); }); + frequency_threads_.at(idx++) = std::make_unique([this]() -> void { this->frequency_thread([]() -> std::uint32_t { return 1U; }, frequency::second); }); + + tasks::instance().start(config); } void polling::stop() { @@ -121,6 +129,8 @@ void polling::stop() { event_system::instance().raise("polling"); stop_requested_ = true; + tasks::instance().stop(); + unique_mutex_lock thread_lock(mutex_); notify_.notify_all(); thread_lock.unlock(); diff --git a/repertory/librepertory/src/utils/tasks.cpp b/repertory/librepertory/src/utils/tasks.cpp index 0ff6166f..a962bc43 100644 --- a/repertory/librepertory/src/utils/tasks.cpp +++ b/repertory/librepertory/src/utils/tasks.cpp @@ -26,10 +26,33 @@ namespace repertory { tasks tasks::instance_; -void tasks::schedule(task_item task) { +void tasks::task_wait::set_result(bool result) { + unique_mutex_lock lock(mtx); + if (complete) { + notify.notify_all(); + return; + } + + complete = true; + success = result; + notify.notify_all(); +} + +auto tasks::task_wait::wait() const -> bool { + unique_mutex_lock lock(mtx); + while (not complete) { + notify.wait(lock); + } + + return success; +} + +auto tasks::schedule(task item) -> task_ptr { + scheduled_task runnable{item}; + unique_mutex_lock lock(mutex_); ++count_; - tasks_.push_back(std::move(task)); + tasks_.push_back(runnable); notify_.notify_all(); lock.unlock(); @@ -39,6 +62,8 @@ void tasks::schedule(task_item task) { notify_.notify_all(); lock.unlock(); } + + return runnable.wait; } void tasks::start(app_config *config) { @@ -101,19 +126,22 @@ void tasks::task_thread() { continue; } - auto task = tasks_.front(); + auto runnable = tasks_.front(); tasks_.pop_front(); release(); try { - task.action(stop_requested_); + runnable.item.action(stop_requested_); + runnable.wait->set_result(true); + if (stop_requested_) { return; } - std::this_thread::sleep_for(std::chrono::milliseconds(task.delay_ms)); + std::this_thread::sleep_for(std::chrono::milliseconds(runnable.delay_ms)); --count_; } catch (const std::exception &e) { + runnable.wait->set_result(false); utils::error::raise_error(function_name, e, "failed to execute task"); } diff --git a/repertory/repertory_test/src/file_manager_test.cpp b/repertory/repertory_test/src/file_manager_test.cpp index 5e384240..2c28c6f9 100644 --- a/repertory/repertory_test/src/file_manager_test.cpp +++ b/repertory/repertory_test/src/file_manager_test.cpp @@ -109,7 +109,6 @@ TEST_F(file_manager_test, can_create_and_close_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -211,7 +210,6 @@ TEST_F(file_manager_test, can_create_and_close_file) { mgr.stop(); polling::instance().stop(); - tasks::instance().stop(); } TEST_F(file_manager_test, can_open_and_close_file) { @@ -220,7 +218,6 @@ TEST_F(file_manager_test, can_open_and_close_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -321,14 +318,12 @@ TEST_F(file_manager_test, can_open_and_close_file) { mgr.stop(); polling::instance().stop(); - tasks::instance().stop(); } TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -385,7 +380,6 @@ TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) { EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count()); polling::instance().stop(); - tasks::instance().stop(); } TEST_F(file_manager_test, @@ -544,7 +538,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -649,7 +642,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) { file.close(); polling::instance().stop(); - tasks::instance().stop(); } TEST_F(file_manager_test, can_evict_file) { @@ -1431,7 +1423,6 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { cfg->set_chunk_downloader_timeout_secs(3U); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); @@ -1514,7 +1505,6 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) { mgr.stop(); polling::instance().stop(); - tasks::instance().stop(); } TEST_F(file_manager_test, remove_file_fails_if_file_does_not_exist) { @@ -1562,7 +1552,6 @@ TEST_F(file_manager_test, EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false)); polling::instance().start(cfg.get()); - tasks::instance().start(cfg.get()); file_manager mgr(*cfg, mp); mgr.start(); @@ -1635,6 +1624,5 @@ TEST_F(file_manager_test, mgr.stop(); polling::instance().stop(); - tasks::instance().stop(); } } // namespace repertory