diff --git a/repertory/librepertory/include/comm/packet/client_pool.hpp b/repertory/librepertory/include/comm/packet/client_pool.hpp index a9786f06..3cae2b8a 100644 --- a/repertory/librepertory/include/comm/packet/client_pool.hpp +++ b/repertory/librepertory/include/comm/packet/client_pool.hpp @@ -23,7 +23,6 @@ #define REPERTORY_INCLUDE_COMM_PACKET_CLIENT_POOL_HPP_ #include "comm/packet/packet.hpp" -#include "types/repertory.hpp" namespace repertory { class client_pool final { @@ -46,25 +45,26 @@ private: }; struct work_queue final { - work_queue(std::function &&run) - : thread([this, run]() { run(*this); }) {} - - ~work_queue() { - if (thread.joinable()) { - thread.join(); - } - } + work_queue(); + ~work_queue(); work_queue(const work_queue &) = delete; work_queue(work_queue &&) = delete; + std::deque> actions; + std::chrono::steady_clock::time_point modified{ + std::chrono::steady_clock::now(), + }; std::mutex mutex; std::condition_variable notify; - std::deque> actions; - std::thread thread; + stop_type shutdown{false}; + std::unique_ptr thread; auto operator=(const work_queue &) -> work_queue & = delete; auto operator=(work_queue &&) -> work_queue & = delete; + + private: + void work_thread(); }; public: @@ -81,12 +81,13 @@ private: private: std::mutex pool_mtx_; std::unordered_map> pool_queues_; - stop_type shutdown_{false}; public: void execute(std::uint64_t thread_id, worker_callback worker, worker_complete_callback worker_complete); + void remove_expired(); + void shutdown(); }; @@ -113,6 +114,8 @@ public: void remove_client(std::string client_id); + void remove_expired(); + void shutdown(); }; } // namespace repertory diff --git a/repertory/librepertory/include/drives/remote/remote_server_base.hpp b/repertory/librepertory/include/drives/remote/remote_server_base.hpp index e8c17f92..804b3ec5 100644 --- a/repertory/librepertory/include/drives/remote/remote_server_base.hpp +++ b/repertory/librepertory/include/drives/remote/remote_server_base.hpp @@ -38,6 +38,7 @@ #include "types/repertory.hpp" #include "utils/base64.hpp" #include "utils/path.hpp" +#include "utils/polling.hpp" namespace repertory { template @@ -77,6 +78,13 @@ public: method, request, response, message_complete); }); + + polling::instance().set_callback({ + "remote_server_expired", + polling::frequency::high, + [this](auto && /* stop */) { client_pool_.remove_expired(); }, + }); + event_system::instance().raise(function_name, "remote_server_base"); } @@ -86,7 +94,10 @@ public: event_system::instance().raise(function_name, "remote_server_base"); + polling::instance().remove_callback("remote_server_expired"); + client_pool_.shutdown(); + packet_server_.reset(); event_system::instance().raise(function_name, "remote_server_base"); diff --git a/repertory/librepertory/src/comm/packet/client_pool.cpp b/repertory/librepertory/src/comm/packet/client_pool.cpp index 7c27cd04..3fc568b1 100644 --- a/repertory/librepertory/src/comm/packet/client_pool.cpp +++ b/repertory/librepertory/src/comm/packet/client_pool.cpp @@ -27,58 +27,75 @@ #include "events/types/service_stop_begin.hpp" #include "events/types/service_stop_end.hpp" #include "utils/error.hpp" +#include +#include namespace repertory { +client_pool::pool::work_queue::work_queue() { + thread = std::make_unique([this]() { work_thread(); }); +} + +client_pool::pool::work_queue::~work_queue() { + if (thread->joinable()) { + thread->join(); + } + + thread.reset(); +} + +void client_pool::pool::work_queue::work_thread() { + REPERTORY_USES_FUNCTION_NAME(); + + unique_mutex_lock lock(mutex); + const auto unlock_and_notify = [this, &lock]() { + notify.notify_all(); + lock.unlock(); + }; + unlock_and_notify(); + + const auto process_next_item = [this, &unlock_and_notify]() { + if (actions.empty()) { + unlock_and_notify(); + return; + } + + auto item = actions.front(); + actions.pop_front(); + unlock_and_notify(); + + try { + item->work_complete(item->work()); + } catch (const std::exception &e) { + utils::error::handle_exception(function_name, e); + } catch (...) { + utils::error::handle_exception(function_name); + } + }; + + while (not shutdown) { + lock.lock(); + if (actions.empty()) { + notify.wait(lock); + unlock_and_notify(); + continue; + } + + process_next_item(); + } + + while (not actions.empty()) { + lock.lock(); + process_next_item(); + } +} + void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker, worker_complete_callback worker_complete) { REPERTORY_USES_FUNCTION_NAME(); unique_mutex_lock pool_lock(pool_mtx_); if (pool_queues_[thread_id] == nullptr) { - pool_queues_[thread_id] = - std::make_shared([this](work_queue &queue) { - unique_mutex_lock lock(queue.mutex); - const auto unlock_and_notify = [&lock, &queue]() { - queue.notify.notify_all(); - lock.unlock(); - }; - unlock_and_notify(); - - const auto process_next_item = [&unlock_and_notify, &queue]() { - if (queue.actions.empty()) { - unlock_and_notify(); - return; - } - - auto item = queue.actions.front(); - queue.actions.pop_front(); - unlock_and_notify(); - - try { - item->work_complete(item->work()); - } catch (const std::exception &e) { - utils::error::handle_exception(function_name, e); - } catch (...) { - utils::error::handle_exception(function_name); - } - }; - - while (not shutdown_) { - lock.lock(); - if (queue.actions.empty()) { - queue.notify.wait(lock); - unlock_and_notify(); - continue; - } - - process_next_item(); - } - - while (not queue.actions.empty()) { - lock.lock(); - process_next_item(); - } - }); + pool_queues_[thread_id] = std::make_shared(); } auto pool_queue = pool_queues_[thread_id]; @@ -89,13 +106,41 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker, mutex_lock queue_lock(pool_queue->mutex); pool_queue->actions.emplace_back(std::move(job)); + pool_queue->modified = std::chrono::steady_clock::now(); pool_queue->notify.notify_all(); } -void client_pool::pool::shutdown() { - shutdown_ = true; +void client_pool::pool::remove_expired() { + auto now = std::chrono::steady_clock::now(); + unique_mutex_lock pool_lock(pool_mtx_); + auto results = std::accumulate( + pool_queues_.begin(), pool_queues_.end(), + std::unordered_map>(), + [&now](auto &&res, auto &&entry) -> auto { + auto duration = now - entry.second->modified; + if (std::chrono::duration_cast(duration) >= + std::chrono::minutes(2U)) { + res[entry.first] = entry.second; + } + + return res; + }); + pool_lock.unlock(); + + for (const auto &entry : results) { + pool_lock.lock(); + pool_queues_.erase(entry.first); + pool_lock.unlock(); + } + + results.clear(); +} + +void client_pool::pool::shutdown() { for (auto &entry : pool_queues_) { + entry.second->shutdown = true; + mutex_lock lock(entry.second->mutex); entry.second->notify.notify_all(); } @@ -136,6 +181,14 @@ void client_pool::remove_client(std::string client_id) { pool_lookup_.erase(client_id); } +void client_pool::remove_expired() { + mutex_lock pool_lock(pool_mutex_); + + for (auto &entry : pool_lookup_) { + entry.second->remove_expired(); + } +} + void client_pool::shutdown() { REPERTORY_USES_FUNCTION_NAME();