match client threads

This commit is contained in:
2025-10-04 12:57:43 -05:00
parent de2ca33e83
commit 445f0481b9
4 changed files with 119 additions and 118 deletions

View File

@@ -46,13 +46,29 @@ private:
};
struct work_queue final {
work_queue(std::function<void(work_queue &)> &&run)
: thread([this, run]() { run(*this); }) {}
~work_queue() {
if (thread.joinable()) {
thread.join();
}
}
work_queue(const work_queue &) = delete;
work_queue(work_queue &&) = delete;
std::mutex mutex;
std::condition_variable notify;
std::deque<std::shared_ptr<work_item>> queue;
std::deque<std::shared_ptr<work_item>> actions;
std::thread thread;
auto operator=(const work_queue &) -> work_queue & = delete;
auto operator=(work_queue &&) -> work_queue & = delete;
};
public:
explicit pool(std::uint8_t pool_size);
pool() noexcept = default;
~pool() { shutdown(); }
@@ -63,21 +79,19 @@ private:
auto operator=(pool &&) -> pool & = delete;
private:
std::vector<std::unique_ptr<work_queue>> pool_queues_;
std::vector<std::thread> pool_threads_;
bool shutdown_{false};
std::atomic<std::uint8_t> thread_index_{};
std::mutex pool_mtx_;
std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>> pool_queues_;
stop_type shutdown_{false};
public:
void execute(std::uint64_t thread_id, const worker_callback &worker,
const worker_complete_callback &worker_complete);
void execute(std::uint64_t thread_id, worker_callback worker,
worker_complete_callback worker_complete);
void shutdown();
};
public:
explicit client_pool(std::uint8_t pool_size = min_pool_size)
: pool_size_(pool_size == 0U ? min_pool_size : pool_size) {}
client_pool() noexcept;
~client_pool() { shutdown(); }
@@ -88,18 +102,14 @@ public:
auto operator=(client_pool &&) -> client_pool & = delete;
private:
std::uint8_t pool_size_;
std::unordered_map<std::string, std::shared_ptr<pool>> pool_lookup_;
std::unordered_map<std::string, std::unique_ptr<pool>> pool_lookup_;
std::mutex pool_mutex_;
bool shutdown_ = false;
private:
static constexpr auto min_pool_size = 10U;
stop_type shutdown_{false};
public:
void execute(std::string client_id, std::uint64_t thread_id,
const worker_callback &worker,
const worker_complete_callback &worker_complete);
worker_callback worker,
worker_complete_callback worker_complete);
void remove_client(std::string client_id);

View File

@@ -60,8 +60,7 @@ public:
std::string_view mount_location)
: config_(config),
drive_(drv),
mount_location_(std::string(mount_location)),
client_pool_(config.get_remote_mount().client_pool_size) {
mount_location_(std::string(mount_location)) {
REPERTORY_USES_FUNCTION_NAME();
event_system::instance().raise<service_start_begin>(function_name,
@@ -1633,7 +1632,7 @@ private:
return this->handler_lookup_.at(lookup_method_name)(
service_flags, client_id, thread_id, method, request, response);
},
message_complete);
std::move(message_complete));
}
protected:

View File

@@ -26,124 +26,108 @@
#include "events/types/service_start_end.hpp"
#include "events/types/service_stop_begin.hpp"
#include "events/types/service_stop_end.hpp"
#include "platform/platform.hpp"
#include "utils/error_utils.hpp"
#include "utils/error.hpp"
namespace repertory {
void client_pool::pool::execute(
std::uint64_t thread_id, const worker_callback &worker,
const worker_complete_callback &worker_complete) {
auto index = thread_id % pool_queues_.size();
auto job = std::make_shared<work_item>(worker, worker_complete);
auto &pool_queue = pool_queues_[index];
unique_mutex_lock queue_lock(pool_queue->mutex);
pool_queue->queue.emplace_back(job);
pool_queue->notify.notify_all();
queue_lock.unlock();
}
client_pool::pool::pool(std::uint8_t pool_size) {
void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
worker_complete_callback worker_complete) {
REPERTORY_USES_FUNCTION_NAME();
event_system::instance().raise<service_start_begin>(function_name,
"client_pool");
unique_mutex_lock pool_lock(pool_mtx_);
if (pool_queues_[thread_id] == nullptr) {
pool_queues_[thread_id] =
std::make_shared<work_queue>([this](work_queue &queue) {
unique_mutex_lock lock(queue.mutex);
const auto unlock_and_notify = [&lock, &queue]() {
queue.notify.notify_all();
lock.unlock();
};
unlock_and_notify();
for (std::uint8_t i = 0U; i < pool_size; i++) {
pool_queues_.emplace_back(std::make_unique<work_queue>());
}
const auto process_next_item = [&unlock_and_notify, &queue]() {
if (queue.actions.empty()) {
unlock_and_notify();
return;
}
for (std::size_t i = 0U; i < pool_queues_.size(); i++) {
pool_threads_.emplace_back([this]() {
auto thread_index = thread_index_++;
auto item = queue.actions.front();
queue.actions.pop_front();
unlock_and_notify();
auto &pool_queue = pool_queues_[thread_index];
auto &queue = pool_queue->queue;
auto &queue_mutex = pool_queue->mutex;
auto &queue_notify = pool_queue->notify;
try {
item->work_complete(item->work());
} catch (const std::exception &e) {
utils::error::handle_exception(function_name, e);
} catch (...) {
utils::error::handle_exception(function_name);
}
};
unique_mutex_lock queue_lock(queue_mutex);
queue_notify.notify_all();
queue_lock.unlock();
while (not shutdown_) {
queue_lock.lock();
if (queue.empty()) {
queue_notify.wait(queue_lock);
}
while (not shutdown_) {
lock.lock();
if (queue.actions.empty()) {
queue.notify.wait(lock);
unlock_and_notify();
continue;
}
while (not queue.empty()) {
auto item = queue.front();
queue.pop_front();
queue_notify.notify_all();
queue_lock.unlock();
try {
auto result = item->work();
item->work_complete(result);
} catch (const std::exception &e) {
item->work_complete(utils::from_api_error(api_error::error));
utils::error::raise_error(function_name, e,
"exception occurred in work item");
process_next_item();
}
queue_lock.lock();
}
queue_notify.notify_all();
queue_lock.unlock();
}
queue_lock.lock();
while (not queue.empty()) {
auto job = queue.front();
queue.pop_front();
queue_notify.notify_all();
queue_lock.unlock();
job->work_complete(utils::from_api_error(api_error::download_stopped));
queue_lock.lock();
}
queue_notify.notify_all();
queue_lock.unlock();
});
while (not queue.actions.empty()) {
lock.lock();
process_next_item();
}
});
}
event_system::instance().raise<service_start_end>(function_name,
"client_pool");
auto pool_queue = pool_queues_[thread_id];
pool_lock.unlock();
auto job = std::make_shared<work_item>(std::move(worker),
std::move(worker_complete));
mutex_lock queue_lock(pool_queue->mutex);
pool_queue->actions.emplace_back(std::move(job));
pool_queue->notify.notify_all();
}
void client_pool::pool::shutdown() {
shutdown_ = true;
for (auto &pool_queue : pool_queues_) {
mutex_lock lock(pool_queue->mutex);
pool_queue->notify.notify_all();
}
for (auto &thread : pool_threads_) {
thread.join();
for (auto &entry : pool_queues_) {
mutex_lock lock(entry.second->mutex);
entry.second->notify.notify_all();
}
pool_queues_.clear();
pool_threads_.clear();
}
client_pool::client_pool() noexcept {
REPERTORY_USES_FUNCTION_NAME();
event_system::instance().raise<service_start_begin>(function_name,
"client_pool");
event_system::instance().raise<service_start_end>(function_name,
"client_pool");
}
void client_pool::execute(std::string client_id, std::uint64_t thread_id,
const worker_callback &worker,
const worker_complete_callback &worker_complete) {
worker_callback worker,
worker_complete_callback worker_complete) {
unique_mutex_lock pool_lock(pool_mutex_);
if (shutdown_) {
pool_lock.unlock();
throw std::runtime_error("Client pool is shutdown");
throw std::runtime_error("client pool is shutdown");
}
if (not pool_lookup_[client_id]) {
pool_lookup_[client_id] = std::make_shared<pool>(pool_size_);
pool_lookup_[client_id] = std::make_unique<pool>();
}
pool_lookup_[client_id]->execute(thread_id, worker, worker_complete);
pool_lookup_[client_id]->execute(thread_id, std::move(worker),
std::move(worker_complete));
pool_lock.unlock();
}
@@ -159,17 +143,23 @@ void client_pool::shutdown() {
return;
}
shutdown_ = true;
event_system::instance().raise<service_stop_begin>(function_name,
"client_pool");
unique_mutex_lock pool_lock(pool_mutex_);
if (not shutdown_) {
shutdown_ = true;
for (auto &pool_entry : pool_lookup_) {
std::unordered_map<std::string, std::unique_ptr<pool>> pool_lookup;
std::swap(pool_lookup, pool_lookup_);
pool_lock.unlock();
if (not pool_lookup.empty()) {
for (auto &pool_entry : pool_lookup) {
pool_entry.second->shutdown();
}
pool_lookup_.clear();
pool_lookup.clear();
}
pool_lock.unlock();
event_system::instance().raise<service_stop_end>(function_name,
"client_pool");
}

View File

@@ -364,12 +364,14 @@ void packet_server::read_packet(std::shared_ptr<connection> conn,
}
void packet_server::remove_client(connection &conn) {
if (not conn.client_id.empty()) {
recur_mutex_lock connection_lock(connection_mutex_);
if (--connection_lookup_[conn.client_id] == 0U) {
connection_lookup_.erase(conn.client_id);
closed_(conn.client_id);
}
recur_mutex_lock connection_lock(connection_mutex_);
if (conn.client_id.empty()) {
return;
}
if (--connection_lookup_[conn.client_id] == 0U) {
connection_lookup_.erase(conn.client_id);
closed_(conn.client_id);
}
}