use new tasks interface
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
Scott E. Graves 2024-12-06 17:40:26 -06:00
parent 660bc28f0c
commit 18c5948e3f
6 changed files with 134 additions and 82 deletions

View File

@ -22,7 +22,9 @@
#ifndef REPERTORY_INCLUDE_UTILS_TASKS_HPP_
#define REPERTORY_INCLUDE_UTILS_TASKS_HPP_
#include "common.hpp"
#include "types/repertory.hpp"
#include <memory>
namespace repertory {
class app_config;
@ -31,9 +33,43 @@ class tasks final {
public:
static constexpr const auto default_delay_ms{10U};
struct task_item final {
std::function<void(const stop_type &stop_requested)> action;
std::uint16_t delay_ms{default_delay_ms};
struct task final {
std::function<void(const stop_type &task_stopped)> action;
};
class i_task {
INTERFACE_SETUP(i_task);
public:
virtual auto wait() const -> bool = 0;
};
using task_ptr = std::shared_ptr<i_task>;
private:
class task_wait : public i_task {
private:
bool complete{false};
mutable std::mutex mtx;
mutable std::condition_variable notify;
bool success{false};
public:
void set_result(bool result);
auto wait() const -> bool override;
};
struct scheduled_task final {
task item;
std::uint16_t delay_ms{
default_delay_ms,
};
std::shared_ptr<task_wait> wait{
std::make_shared<task_wait>(),
};
};
public:
@ -61,13 +97,13 @@ private:
std::mutex start_stop_mutex_;
stop_type stop_requested_{false};
std::vector<std::unique_ptr<std::jthread>> task_threads_;
std::deque<task_item> tasks_;
std::deque<scheduled_task> tasks_;
private:
void task_thread();
public:
void schedule(task_item task);
auto schedule(task item) -> task_ptr;
void start(app_config *config);

View File

@ -42,7 +42,6 @@
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
#include "utils/polling.hpp"
#include "utils/tasks.hpp"
#include "utils/time.hpp"
#include "utils/utils.hpp"
@ -82,8 +81,8 @@ auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid,
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid)
-> api_error {
auto fuse_drive::chown_impl(std::string api_path, uid_t uid,
gid_t gid) -> api_error {
#endif
return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error {
@ -259,7 +258,6 @@ void fuse_drive::destroy_impl(void *ptr) {
}
polling::instance().stop();
tasks::instance().stop();
if (eviction_) {
eviction_->stop();
@ -483,8 +481,8 @@ auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st,
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st)
-> api_error {
auto fuse_drive::getattr_impl(std::string api_path,
struct stat *unix_st) -> api_error {
#endif
auto parent = utils::path::get_parent_api_path(api_path);
@ -567,8 +565,8 @@ auto fuse_drive::getxtimes_impl(std::string api_path, struct timespec *bkuptime,
#endif // __APPLE__
#if FUSE_USE_VERSION >= 30
auto fuse_drive::init_impl(struct fuse_conn_info *conn, struct fuse_config *cfg)
-> void * {
auto fuse_drive::init_impl(struct fuse_conn_info *conn,
struct fuse_config *cfg) -> void * {
#else
void *fuse_drive::init_impl(struct fuse_conn_info *conn) {
#endif
@ -624,7 +622,6 @@ void *fuse_drive::init_impl(struct fuse_conn_info *conn) {
}
polling::instance().start(&config_);
tasks::instance().start(&config_);
event_system::instance().raise<drive_mounted>(get_mount_location());
} catch (const std::exception &e) {
@ -803,9 +800,8 @@ auto fuse_drive::release_impl(std::string /*api_path*/,
return api_error::success;
}
auto fuse_drive::releasedir_impl(std::string /*api_path*/,
struct fuse_file_info *file_info)
-> api_error {
auto fuse_drive::releasedir_impl(
std::string /*api_path*/, struct fuse_file_info *file_info) -> api_error {
auto iter = directory_cache_->get_directory(file_info->fh);
if (iter == nullptr) {
return api_error::invalid_handle;
@ -823,8 +819,8 @@ auto fuse_drive::rename_directory(const std::string &from_api_path,
}
auto fuse_drive::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite)
-> int {
const std::string &to_api_path,
bool overwrite) -> int {
auto res = fm_->rename_file(from_api_path, to_api_path, overwrite);
errno = std::abs(utils::from_api_error(res));
return (res == api_error::success) ? 0 : -1;
@ -834,8 +830,8 @@ auto fuse_drive::rename_file(const std::string &from_api_path,
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path,
unsigned int /*flags*/) -> api_error {
#else
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path)
-> api_error {
auto fuse_drive::rename_impl(std::string from_api_path,
std::string to_api_path) -> api_error {
#endif
auto res = check_parent_access(to_api_path, W_OK | X_OK);
if (res != api_error::success) {
@ -949,15 +945,15 @@ auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
}
#else // __APPLE__
auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
char *value, size_t size, int &attribute_size)
-> api_error {
char *value, size_t size,
int &attribute_size) -> api_error {
return getxattr_common(api_path, name, value, size, attribute_size, nullptr);
}
#endif // __APPLE__
auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
int &required_size, bool &return_size)
-> api_error {
int &required_size,
bool &return_size) -> api_error {
auto check_size = (size == 0);
auto res = check_parent_access(api_path, X_OK);
@ -997,8 +993,8 @@ auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
return res;
}
auto fuse_drive::removexattr_impl(std::string api_path, const char *name)
-> api_error {
auto fuse_drive::removexattr_impl(std::string api_path,
const char *name) -> api_error {
std::string attribute_name;
#if defined(__APPLE__)
auto res = parse_xattr_parameters(name, 0, attribute_name, api_path);
@ -1026,8 +1022,8 @@ auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
uint32_t position) -> api_error {
#else // __APPLE__
auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
const char *value, size_t size, int flags)
-> api_error {
const char *value, size_t size,
int flags) -> api_error {
#endif
std::string attribute_name;
#if defined(__APPLE__)
@ -1105,8 +1101,8 @@ void fuse_drive::set_item_meta(const std::string &api_path,
}
#if defined(__APPLE__)
auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
-> api_error {
auto fuse_drive::setattr_x_impl(std::string api_path,
struct setattr_x *attr) -> api_error {
bool exists{};
auto res = provider_.is_file(api_path, exists);
if (res != api_error::success) {
@ -1205,9 +1201,8 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
return api_error::success;
}
auto fuse_drive::setbkuptime_impl(std::string api_path,
const struct timespec *bkuptime)
-> api_error {
auto fuse_drive::setbkuptime_impl(
std::string api_path, const struct timespec *bkuptime) -> api_error {
return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error {
auto nanos = bkuptime->tv_nsec +
@ -1243,8 +1238,8 @@ auto fuse_drive::setvolname_impl(const char * /*volname*/) -> api_error {
return api_error::success;
}
auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf)
-> api_error {
auto fuse_drive::statfs_x_impl(std::string /*api_path*/,
struct statfs *stbuf) -> api_error {
if (statfs(&config_.get_cache_directory()[0], stbuf) != 0) {
return api_error::os_error;
}
@ -1269,8 +1264,8 @@ auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf)
return api_error::success;
}
#else // __APPLE__
auto fuse_drive::statfs_impl(std::string /*api_path*/, struct statvfs *stbuf)
-> api_error {
auto fuse_drive::statfs_impl(std::string /*api_path*/,
struct statvfs *stbuf) -> api_error {
if (statvfs(config_.get_cache_directory().data(), stbuf) != 0) {
return api_error::os_error;
}
@ -1350,8 +1345,8 @@ auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2],
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2])
-> api_error {
auto fuse_drive::utimens_impl(std::string api_path,
const struct timespec tv[2]) -> api_error {
#endif
api_meta_map meta;
auto res = provider_.get_item_meta(api_path, meta);

View File

@ -38,7 +38,6 @@
#include "utils/file_utils.hpp"
#include "utils/polling.hpp"
#include "utils/string.hpp"
#include "utils/tasks.hpp"
#include "utils/time.hpp"
#include "utils/utils.hpp"
@ -90,8 +89,8 @@ auto winfsp_drive::handle_error(std::string_view function_name,
return ret;
}
auto winfsp_drive::winfsp_service::OnStart(ULONG /*Argc*/, PWSTR * /*Argv*/)
-> NTSTATUS {
auto winfsp_drive::winfsp_service::OnStart(ULONG /*Argc*/,
PWSTR * /*Argv*/) -> NTSTATUS {
REPERTORY_USES_FUNCTION_NAME();
auto mount_location = utils::string::to_lower(
@ -458,10 +457,9 @@ auto winfsp_drive::get_item_meta(const std::string &api_path,
return ret;
}
auto winfsp_drive::get_security_by_name(PWSTR file_name, PUINT32 attributes,
PSECURITY_DESCRIPTOR descriptor,
std::uint64_t *descriptor_size)
-> NTSTATUS {
auto winfsp_drive::get_security_by_name(
PWSTR file_name, PUINT32 attributes, PSECURITY_DESCRIPTOR descriptor,
std::uint64_t *descriptor_size) -> NTSTATUS {
auto api_path =
utils::path::create_api_path(utils::string::to_utf8(file_name));
@ -653,7 +651,6 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS {
}
polling::instance().start(&config_);
tasks::instance().start(&config_);
event_system::instance().raise<drive_mounted>(mount_location);
} catch (const std::exception &e) {
@ -663,7 +660,6 @@ auto winfsp_drive::Mounted(PVOID host) -> NTSTATUS {
}
server_->stop();
polling::instance().stop();
tasks::instance().stop();
if (eviction_) {
eviction_->stop();
}
@ -724,8 +720,8 @@ auto winfsp_drive::Open(PWSTR file_name, UINT32 create_options,
auto winfsp_drive::Overwrite(PVOID /*file_node*/, PVOID file_desc,
UINT32 attributes, BOOLEAN replace_attributes,
UINT64 /*allocation_size*/, FileInfo *file_info)
-> NTSTATUS {
UINT64 /*allocation_size*/,
FileInfo *file_info) -> NTSTATUS {
REPERTORY_USES_FUNCTION_NAME();
std::string api_path;
@ -831,8 +827,8 @@ void winfsp_drive::populate_file_info(std::uint64_t file_size,
}
auto winfsp_drive::Read(PVOID /*file_node*/, PVOID file_desc, PVOID buffer,
UINT64 offset, ULONG length, PULONG bytes_transferred)
-> NTSTATUS {
UINT64 offset, ULONG length,
PULONG bytes_transferred) -> NTSTATUS {
REPERTORY_USES_FUNCTION_NAME();
*bytes_transferred = 0U;
@ -887,8 +883,8 @@ auto winfsp_drive::Read(PVOID /*file_node*/, PVOID file_desc, PVOID buffer,
auto winfsp_drive::ReadDirectory(PVOID /*file_node*/, PVOID file_desc,
PWSTR /*pattern*/, PWSTR marker, PVOID buffer,
ULONG buffer_length, PULONG bytes_transferred)
-> NTSTATUS {
ULONG buffer_length,
PULONG bytes_transferred) -> NTSTATUS {
REPERTORY_USES_FUNCTION_NAME();
std::string api_path;
@ -1050,8 +1046,8 @@ auto winfsp_drive::Rename(PVOID /*file_node*/, PVOID /*file_desc*/,
auto winfsp_drive::SetBasicInfo(PVOID /*file_node*/, PVOID file_desc,
UINT32 attributes, UINT64 creation_time,
UINT64 last_access_time, UINT64 last_write_time,
UINT64 change_time, FileInfo *file_info)
-> NTSTATUS {
UINT64 change_time,
FileInfo *file_info) -> NTSTATUS {
REPERTORY_USES_FUNCTION_NAME();
std::string api_path;
@ -1176,7 +1172,6 @@ VOID winfsp_drive::Unmounted(PVOID host) {
}
server_->stop();
polling::instance().stop();
tasks::instance().stop();
if (eviction_) {
eviction_->stop();
}

View File

@ -22,6 +22,7 @@
#include "utils/polling.hpp"
#include "app_config.hpp"
#include "utils/tasks.hpp"
namespace repertory {
polling polling::instance_;
@ -31,31 +32,34 @@ void polling::frequency_thread(
while (not stop_requested_) {
unique_mutex_lock lock(mutex_);
auto futures = std::accumulate(
items_.begin(), items_.end(), std::deque<std::future<void>>{},
items_.begin(), items_.end(), std::deque<tasks::task_ptr>{},
[this, &freq](auto &&list, auto &&item) {
if (item.second.freq != freq) {
return list;
}
list.emplace_back(
std::async(std::launch::async, [this, &freq, item]() -> void {
auto future = tasks::instance().schedule({
[this, &freq, item](auto &&task_stopped) {
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_begin>(
item.first);
}
item.second.action(stop_requested_);
item.second.action(task_stopped);
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_end>(item.first);
}
}));
},
});
list.emplace_back(future);
return list;
});
lock.unlock();
while (not futures.empty()) {
futures.front().wait();
futures.front()->wait();
futures.pop_front();
}
@ -97,6 +101,7 @@ void polling::start(app_config *config) {
},
frequency::high);
});
frequency_threads_.at(idx++) =
std::make_unique<std::thread>([this]() -> void {
this->frequency_thread(
@ -105,11 +110,14 @@ void polling::start(app_config *config) {
},
frequency::low);
});
frequency_threads_.at(idx++) =
std::make_unique<std::thread>([this]() -> void {
this->frequency_thread([]() -> std::uint32_t { return 1U; },
frequency::second);
});
tasks::instance().start(config);
}
void polling::stop() {
@ -121,6 +129,8 @@ void polling::stop() {
event_system::instance().raise<service_shutdown_begin>("polling");
stop_requested_ = true;
tasks::instance().stop();
unique_mutex_lock thread_lock(mutex_);
notify_.notify_all();
thread_lock.unlock();

View File

@ -26,10 +26,33 @@
namespace repertory {
tasks tasks::instance_;
void tasks::schedule(task_item task) {
void tasks::task_wait::set_result(bool result) {
unique_mutex_lock lock(mtx);
if (complete) {
notify.notify_all();
return;
}
complete = true;
success = result;
notify.notify_all();
}
auto tasks::task_wait::wait() const -> bool {
unique_mutex_lock lock(mtx);
while (not complete) {
notify.wait(lock);
}
return success;
}
auto tasks::schedule(task item) -> task_ptr {
scheduled_task runnable{item};
unique_mutex_lock lock(mutex_);
++count_;
tasks_.push_back(std::move(task));
tasks_.push_back(runnable);
notify_.notify_all();
lock.unlock();
@ -39,6 +62,8 @@ void tasks::schedule(task_item task) {
notify_.notify_all();
lock.unlock();
}
return runnable.wait;
}
void tasks::start(app_config *config) {
@ -101,19 +126,22 @@ void tasks::task_thread() {
continue;
}
auto task = tasks_.front();
auto runnable = tasks_.front();
tasks_.pop_front();
release();
try {
task.action(stop_requested_);
runnable.item.action(stop_requested_);
runnable.wait->set_result(true);
if (stop_requested_) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(task.delay_ms));
std::this_thread::sleep_for(std::chrono::milliseconds(runnable.delay_ms));
--count_;
} catch (const std::exception &e) {
runnable.wait->set_result(false);
utils::error::raise_error(function_name, e, "failed to execute task");
}

View File

@ -109,7 +109,6 @@ TEST_F(file_manager_test, can_create_and_close_file) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
file_manager mgr(*cfg, mp);
mgr.start();
@ -211,7 +210,6 @@ TEST_F(file_manager_test, can_create_and_close_file) {
mgr.stop();
polling::instance().stop();
tasks::instance().stop();
}
TEST_F(file_manager_test, can_open_and_close_file) {
@ -220,7 +218,6 @@ TEST_F(file_manager_test, can_open_and_close_file) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
file_manager mgr(*cfg, mp);
mgr.start();
@ -321,14 +318,12 @@ TEST_F(file_manager_test, can_open_and_close_file) {
mgr.stop();
polling::instance().stop();
tasks::instance().stop();
}
TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
file_manager mgr(*cfg, mp);
mgr.start();
@ -385,7 +380,6 @@ TEST_F(file_manager_test, can_open_and_close_multiple_handles_for_same_file) {
EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count());
polling::instance().stop();
tasks::instance().stop();
}
TEST_F(file_manager_test,
@ -544,7 +538,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
file_manager mgr(*cfg, mp);
mgr.start();
@ -649,7 +642,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
file.close();
polling::instance().stop();
tasks::instance().stop();
}
TEST_F(file_manager_test, can_evict_file) {
@ -1431,7 +1423,6 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) {
cfg->set_chunk_downloader_timeout_secs(3U);
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
@ -1514,7 +1505,6 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) {
mgr.stop();
polling::instance().stop();
tasks::instance().stop();
}
TEST_F(file_manager_test, remove_file_fails_if_file_does_not_exist) {
@ -1562,7 +1552,6 @@ TEST_F(file_manager_test,
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
polling::instance().start(cfg.get());
tasks::instance().start(cfg.get());
file_manager mgr(*cfg, mp);
mgr.start();
@ -1635,6 +1624,5 @@ TEST_F(file_manager_test,
mgr.stop();
polling::instance().stop();
tasks::instance().stop();
}
} // namespace repertory