This commit is contained in:
2025-10-04 14:43:39 -05:00
parent b5116732d4
commit 72a9005aeb
2 changed files with 10 additions and 11 deletions

View File

@@ -52,7 +52,7 @@ private:
work_queue(work_queue &&) = delete; work_queue(work_queue &&) = delete;
std::deque<std::shared_ptr<work_item>> actions; std::deque<std::shared_ptr<work_item>> actions;
std::chrono::steady_clock::time_point modified{ std::atomic<std::chrono::steady_clock::time_point> modified{
std::chrono::steady_clock::now(), std::chrono::steady_clock::now(),
}; };
std::mutex mutex; std::mutex mutex;

View File

@@ -50,8 +50,8 @@ void client_pool::pool::work_queue::work_thread() {
unique_mutex_lock lock(mutex); unique_mutex_lock lock(mutex);
const auto unlock_and_notify = [this, &lock]() { const auto unlock_and_notify = [this, &lock]() {
notify.notify_all();
lock.unlock(); lock.unlock();
notify.notify_all();
}; };
unlock_and_notify(); unlock_and_notify();
@@ -77,9 +77,7 @@ void client_pool::pool::work_queue::work_thread() {
while (not shutdown) { while (not shutdown) {
lock.lock(); lock.lock();
if (actions.empty()) { if (actions.empty()) {
if (not shutdown) { notify.wait(lock, [this]() { return shutdown || not actions.empty(); });
notify.wait_for(lock, std::chrono::seconds(5U));
}
unlock_and_notify(); unlock_and_notify();
continue; continue;
} }
@@ -99,6 +97,9 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
worker_complete_callback worker_complete) { worker_complete_callback worker_complete) {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
auto job = std::make_shared<work_item>(std::move(worker),
std::move(worker_complete));
unique_mutex_lock pool_lock(pool_mtx_); unique_mutex_lock pool_lock(pool_mtx_);
if (pool_queues_[thread_id] == nullptr) { if (pool_queues_[thread_id] == nullptr) {
pool_queues_[thread_id] = std::make_shared<work_queue>(); pool_queues_[thread_id] = std::make_shared<work_queue>();
@@ -107,12 +108,10 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
auto pool_queue = pool_queues_[thread_id]; auto pool_queue = pool_queues_[thread_id];
pool_lock.unlock(); pool_lock.unlock();
auto job = std::make_shared<work_item>(std::move(worker),
std::move(worker_complete));
mutex_lock queue_lock(pool_queue->mutex);
pool_queue->actions.emplace_back(std::move(job));
pool_queue->modified = std::chrono::steady_clock::now(); pool_queue->modified = std::chrono::steady_clock::now();
unique_mutex_lock queue_lock(pool_queue->mutex);
pool_queue->actions.emplace_back(std::move(job));
pool_queue->notify.notify_all(); pool_queue->notify.notify_all();
} }
@@ -124,7 +123,7 @@ void client_pool::pool::remove_expired() {
pool_queues_.begin(), pool_queues_.end(), pool_queues_.begin(), pool_queues_.end(),
std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>>(), std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>>(),
[&now](auto &&res, auto &&entry) -> auto { [&now](auto &&res, auto &&entry) -> auto {
auto duration = now - entry.second->modified; auto duration = now - entry.second->modified.load();
if (std::chrono::duration_cast<std::chrono::minutes>(duration) >= if (std::chrono::duration_cast<std::chrono::minutes>(duration) >=
std::chrono::minutes(2U)) { std::chrono::minutes(2U)) {
res[entry.first] = entry.second; res[entry.first] = entry.second;