remove expired client threads

This commit is contained in:
2025-10-04 13:36:25 -05:00
parent 445f0481b9
commit 05b12ece89
3 changed files with 125 additions and 58 deletions

View File

@@ -23,7 +23,6 @@
#define REPERTORY_INCLUDE_COMM_PACKET_CLIENT_POOL_HPP_
#include "comm/packet/packet.hpp"
#include "types/repertory.hpp"
namespace repertory {
class client_pool final {
@@ -46,25 +45,26 @@ private:
};
struct work_queue final {
work_queue(std::function<void(work_queue &)> &&run)
: thread([this, run]() { run(*this); }) {}
~work_queue() {
if (thread.joinable()) {
thread.join();
}
}
work_queue();
~work_queue();
work_queue(const work_queue &) = delete;
work_queue(work_queue &&) = delete;
std::deque<std::shared_ptr<work_item>> actions;
std::chrono::steady_clock::time_point modified{
std::chrono::steady_clock::now(),
};
std::mutex mutex;
std::condition_variable notify;
std::deque<std::shared_ptr<work_item>> actions;
std::thread thread;
stop_type shutdown{false};
std::unique_ptr<std::thread> thread;
auto operator=(const work_queue &) -> work_queue & = delete;
auto operator=(work_queue &&) -> work_queue & = delete;
private:
void work_thread();
};
public:
@@ -81,12 +81,13 @@ private:
private:
std::mutex pool_mtx_;
std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>> pool_queues_;
stop_type shutdown_{false};
public:
void execute(std::uint64_t thread_id, worker_callback worker,
worker_complete_callback worker_complete);
void remove_expired();
void shutdown();
};
@@ -113,6 +114,8 @@ public:
void remove_client(std::string client_id);
void remove_expired();
void shutdown();
};
} // namespace repertory

View File

@@ -38,6 +38,7 @@
#include "types/repertory.hpp"
#include "utils/base64.hpp"
#include "utils/path.hpp"
#include "utils/polling.hpp"
namespace repertory {
template <typename drive>
@@ -77,6 +78,13 @@ public:
method, request, response,
message_complete);
});
polling::instance().set_callback({
"remote_server_expired",
polling::frequency::high,
[this](auto && /* stop */) { client_pool_.remove_expired(); },
});
event_system::instance().raise<service_start_end>(function_name,
"remote_server_base");
}
@@ -86,7 +94,10 @@ public:
event_system::instance().raise<service_stop_begin>(function_name,
"remote_server_base");
polling::instance().remove_callback("remote_server_expired");
client_pool_.shutdown();
packet_server_.reset();
event_system::instance().raise<service_stop_end>(function_name,
"remote_server_base");

View File

@@ -27,58 +27,75 @@
#include "events/types/service_stop_begin.hpp"
#include "events/types/service_stop_end.hpp"
#include "utils/error.hpp"
#include <chrono>
#include <thread>
namespace repertory {
client_pool::pool::work_queue::work_queue() {
thread = std::make_unique<std::thread>([this]() { work_thread(); });
}
client_pool::pool::work_queue::~work_queue() {
if (thread->joinable()) {
thread->join();
}
thread.reset();
}
void client_pool::pool::work_queue::work_thread() {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock lock(mutex);
const auto unlock_and_notify = [this, &lock]() {
notify.notify_all();
lock.unlock();
};
unlock_and_notify();
const auto process_next_item = [this, &unlock_and_notify]() {
if (actions.empty()) {
unlock_and_notify();
return;
}
auto item = actions.front();
actions.pop_front();
unlock_and_notify();
try {
item->work_complete(item->work());
} catch (const std::exception &e) {
utils::error::handle_exception(function_name, e);
} catch (...) {
utils::error::handle_exception(function_name);
}
};
while (not shutdown) {
lock.lock();
if (actions.empty()) {
notify.wait(lock);
unlock_and_notify();
continue;
}
process_next_item();
}
while (not actions.empty()) {
lock.lock();
process_next_item();
}
}
void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
worker_complete_callback worker_complete) {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock pool_lock(pool_mtx_);
if (pool_queues_[thread_id] == nullptr) {
pool_queues_[thread_id] =
std::make_shared<work_queue>([this](work_queue &queue) {
unique_mutex_lock lock(queue.mutex);
const auto unlock_and_notify = [&lock, &queue]() {
queue.notify.notify_all();
lock.unlock();
};
unlock_and_notify();
const auto process_next_item = [&unlock_and_notify, &queue]() {
if (queue.actions.empty()) {
unlock_and_notify();
return;
}
auto item = queue.actions.front();
queue.actions.pop_front();
unlock_and_notify();
try {
item->work_complete(item->work());
} catch (const std::exception &e) {
utils::error::handle_exception(function_name, e);
} catch (...) {
utils::error::handle_exception(function_name);
}
};
while (not shutdown_) {
lock.lock();
if (queue.actions.empty()) {
queue.notify.wait(lock);
unlock_and_notify();
continue;
}
process_next_item();
}
while (not queue.actions.empty()) {
lock.lock();
process_next_item();
}
});
pool_queues_[thread_id] = std::make_shared<work_queue>();
}
auto pool_queue = pool_queues_[thread_id];
@@ -89,13 +106,41 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
mutex_lock queue_lock(pool_queue->mutex);
pool_queue->actions.emplace_back(std::move(job));
pool_queue->modified = std::chrono::steady_clock::now();
pool_queue->notify.notify_all();
}
void client_pool::pool::shutdown() {
shutdown_ = true;
void client_pool::pool::remove_expired() {
auto now = std::chrono::steady_clock::now();
unique_mutex_lock pool_lock(pool_mtx_);
auto results = std::accumulate(
pool_queues_.begin(), pool_queues_.end(),
std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>>(),
[&now](auto &&res, auto &&entry) -> auto {
auto duration = now - entry.second->modified;
if (std::chrono::duration_cast<std::chrono::minutes>(duration) >=
std::chrono::minutes(2U)) {
res[entry.first] = entry.second;
}
return res;
});
pool_lock.unlock();
for (const auto &entry : results) {
pool_lock.lock();
pool_queues_.erase(entry.first);
pool_lock.unlock();
}
results.clear();
}
void client_pool::pool::shutdown() {
for (auto &entry : pool_queues_) {
entry.second->shutdown = true;
mutex_lock lock(entry.second->mutex);
entry.second->notify.notify_all();
}
@@ -136,6 +181,14 @@ void client_pool::remove_client(std::string client_id) {
pool_lookup_.erase(client_id);
}
void client_pool::remove_expired() {
mutex_lock pool_lock(pool_mutex_);
for (auto &entry : pool_lookup_) {
entry.second->remove_expired();
}
}
void client_pool::shutdown() {
REPERTORY_USES_FUNCTION_NAME();