client pool tests
This commit is contained in:
@@ -26,6 +26,10 @@
|
||||
|
||||
namespace repertory {
|
||||
class client_pool final {
|
||||
public:
|
||||
static constexpr const std::uint16_t default_expired_seconds{120U};
|
||||
static constexpr const std::uint16_t min_expired_seconds{30U};
|
||||
|
||||
public:
|
||||
using worker_callback = std::function<packet::error_type()>;
|
||||
using worker_complete_callback =
|
||||
@@ -86,7 +90,7 @@ private:
|
||||
void execute(std::uint64_t thread_id, worker_callback worker,
|
||||
worker_complete_callback worker_complete);
|
||||
|
||||
void remove_expired();
|
||||
void remove_expired(std::uint16_t seconds);
|
||||
|
||||
void shutdown();
|
||||
};
|
||||
@@ -106,8 +110,11 @@ private:
|
||||
std::unordered_map<std::string, std::unique_ptr<pool>> pool_lookup_;
|
||||
std::mutex pool_mutex_;
|
||||
stop_type shutdown_{false};
|
||||
std::atomic<std::uint16_t> expired_seconds_{default_expired_seconds};
|
||||
|
||||
public:
|
||||
[[nodiscard]] auto get_expired_seconds() const -> std::uint16_t;
|
||||
|
||||
void execute(std::string client_id, std::uint64_t thread_id,
|
||||
worker_callback worker,
|
||||
worker_complete_callback worker_complete);
|
||||
@@ -116,6 +123,8 @@ public:
|
||||
|
||||
void remove_expired();
|
||||
|
||||
void set_expired_seconds(std::uint16_t seconds);
|
||||
|
||||
void shutdown();
|
||||
};
|
||||
} // namespace repertory
|
||||
|
@@ -115,17 +115,17 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker,
|
||||
pool_queue->notify.notify_all();
|
||||
}
|
||||
|
||||
void client_pool::pool::remove_expired() {
|
||||
void client_pool::pool::remove_expired(std::uint16_t seconds) {
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
unique_mutex_lock pool_lock(pool_mtx_);
|
||||
auto results = std::accumulate(
|
||||
pool_queues_.begin(), pool_queues_.end(),
|
||||
std::unordered_map<std::uint64_t, std::shared_ptr<work_queue>>(),
|
||||
[&now](auto &&res, auto &&entry) -> auto {
|
||||
[&now, seconds](auto &&res, auto &&entry) -> auto {
|
||||
auto duration = now - entry.second->modified.load();
|
||||
if (std::chrono::duration_cast<std::chrono::minutes>(duration) >=
|
||||
std::chrono::minutes(2U)) {
|
||||
if (std::chrono::duration_cast<std::chrono::seconds>(duration) >=
|
||||
std::chrono::seconds(seconds)) {
|
||||
res[entry.first] = entry.second;
|
||||
}
|
||||
|
||||
@@ -162,6 +162,10 @@ client_pool::client_pool() noexcept {
|
||||
"client_pool");
|
||||
}
|
||||
|
||||
auto client_pool::get_expired_seconds() const -> std::uint16_t {
|
||||
return expired_seconds_.load();
|
||||
}
|
||||
|
||||
void client_pool::execute(std::string client_id, std::uint64_t thread_id,
|
||||
worker_callback worker,
|
||||
worker_complete_callback worker_complete) {
|
||||
@@ -188,10 +192,14 @@ void client_pool::remove_client(std::string client_id) {
|
||||
void client_pool::remove_expired() {
|
||||
mutex_lock pool_lock(pool_mutex_);
|
||||
for (auto &entry : pool_lookup_) {
|
||||
entry.second->remove_expired();
|
||||
entry.second->remove_expired(expired_seconds_);
|
||||
}
|
||||
}
|
||||
|
||||
void client_pool::set_expired_seconds(std::uint16_t seconds) {
|
||||
expired_seconds_ = std::max(std::uint16_t{min_expired_seconds}, seconds);
|
||||
}
|
||||
|
||||
void client_pool::shutdown() {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
|
225
repertory/repertory_test/src/client_pool_test.cpp
Normal file
225
repertory/repertory_test/src/client_pool_test.cpp
Normal file
@@ -0,0 +1,225 @@
|
||||
/*
|
||||
Copyright <2018-2025> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "test_common.hpp"
|
||||
|
||||
#include "comm/packet/client_pool.hpp"
|
||||
#include "events/consumers/console_consumer.hpp"
|
||||
#include "events/event_system.hpp"
|
||||
|
||||
namespace {
|
||||
class client_pool_test : public ::testing::Test {
|
||||
public:
|
||||
repertory::console_consumer con_consumer;
|
||||
|
||||
protected:
|
||||
void SetUp() override { repertory::event_system::instance().start(); }
|
||||
void TearDown() override { repertory::event_system::instance().stop(); }
|
||||
};
|
||||
|
||||
[[nodiscard]] inline auto wait_for_bool(std::atomic<bool> &flag,
|
||||
std::chrono::milliseconds timeout)
|
||||
-> bool {
|
||||
const auto start{std::chrono::steady_clock::now()};
|
||||
while (not flag.load(std::memory_order_acquire)) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
if (std::chrono::steady_clock::now() - start >= timeout) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
[[nodiscard]] inline auto wait_for_count(std::atomic<std::uint32_t> &count,
|
||||
std::uint32_t target,
|
||||
std::chrono::milliseconds timeout)
|
||||
-> bool {
|
||||
const auto start{std::chrono::steady_clock::now()};
|
||||
while (count.load(std::memory_order_acquire) < target) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
if (std::chrono::steady_clock::now() - start >= timeout) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} // namespace
|
||||
|
||||
namespace repertory {
|
||||
TEST(client_pool_test, execute_invokes_completion) {
|
||||
client_pool pool;
|
||||
|
||||
std::atomic<bool> done{false};
|
||||
|
||||
pool.execute(
|
||||
"alpha", 1U, []() -> packet::error_type { return packet::error_type{0}; },
|
||||
[&](packet::error_type err) -> void {
|
||||
EXPECT_EQ(err, packet::error_type{0});
|
||||
done.store(true, std::memory_order_release);
|
||||
});
|
||||
|
||||
ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500}));
|
||||
}
|
||||
|
||||
TEST(client_pool_test, fifo_on_same_thread_id) {
|
||||
client_pool pool;
|
||||
|
||||
std::mutex vec_mutex;
|
||||
std::vector<int> order;
|
||||
std::atomic<std::uint32_t> completed{0U};
|
||||
|
||||
constexpr std::uint32_t count{10U};
|
||||
for (std::uint32_t idx = 0U; idx < count; ++idx) {
|
||||
pool.execute(
|
||||
"alpha", 42U,
|
||||
[idx]() -> packet::error_type { return static_cast<int>(idx); },
|
||||
[&](auto && /*err*/) -> void {
|
||||
std::lock_guard<std::mutex> lock(vec_mutex);
|
||||
order.emplace_back(static_cast<int>(order.size()));
|
||||
completed.fetch_add(1U, std::memory_order_acq_rel);
|
||||
});
|
||||
}
|
||||
|
||||
ASSERT_TRUE(
|
||||
wait_for_count(completed, count, std::chrono::milliseconds{2000}));
|
||||
|
||||
ASSERT_EQ(order.size(), static_cast<std::size_t>(count));
|
||||
for (int idx = 0; idx < static_cast<int>(count); ++idx) {
|
||||
EXPECT_EQ(order[static_cast<std::size_t>(idx)], idx);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(client_pool_test, parallel_on_different_thread_ids) {
|
||||
client_pool pool;
|
||||
|
||||
std::atomic<std::uint32_t> started{0U};
|
||||
std::atomic<std::uint32_t> completed{0U};
|
||||
|
||||
const auto barrier = [&started]() -> void {
|
||||
started.fetch_add(1U, std::memory_order_acq_rel);
|
||||
auto start{std::chrono::steady_clock::now()};
|
||||
while (started.load(std::memory_order_acquire) < 2U) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{1});
|
||||
if (std::chrono::steady_clock::now() - start >=
|
||||
std::chrono::milliseconds{500}) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pool.execute(
|
||||
"alpha", 1U,
|
||||
[&]() -> packet::error_type {
|
||||
barrier();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{50});
|
||||
return packet::error_type{0};
|
||||
},
|
||||
[&](packet::error_type err) -> void {
|
||||
EXPECT_EQ(err, packet::error_type{0});
|
||||
completed.fetch_add(1U, std::memory_order_acq_rel);
|
||||
});
|
||||
|
||||
pool.execute(
|
||||
"alpha", 2U,
|
||||
[&]() -> packet::error_type {
|
||||
barrier();
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{50});
|
||||
return packet::error_type{0};
|
||||
},
|
||||
[&](packet::error_type err) -> void {
|
||||
EXPECT_EQ(err, packet::error_type{0});
|
||||
completed.fetch_add(1U, std::memory_order_acq_rel);
|
||||
});
|
||||
|
||||
ASSERT_TRUE(wait_for_count(completed, 2U, std::chrono::milliseconds{1000}));
|
||||
}
|
||||
|
||||
TEST(client_pool_test, remove_client_then_recreate_pool) {
|
||||
client_pool pool;
|
||||
|
||||
std::atomic<bool> first_done{false};
|
||||
std::atomic<bool> second_done{false};
|
||||
|
||||
pool.execute(
|
||||
"bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; },
|
||||
[&](packet::error_type err) -> void {
|
||||
EXPECT_EQ(err, packet::error_type{0});
|
||||
first_done.store(true, std::memory_order_release);
|
||||
});
|
||||
|
||||
ASSERT_TRUE(wait_for_bool(first_done, std::chrono::milliseconds{500}));
|
||||
|
||||
pool.remove_client("bravo");
|
||||
|
||||
pool.execute(
|
||||
"bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; },
|
||||
[&](packet::error_type err) -> void {
|
||||
EXPECT_EQ(err, packet::error_type{0});
|
||||
second_done.store(true, std::memory_order_release);
|
||||
});
|
||||
|
||||
ASSERT_TRUE(wait_for_bool(second_done, std::chrono::milliseconds{500}));
|
||||
}
|
||||
|
||||
TEST(client_pool_test, shutdown_prevents_future_execute) {
|
||||
client_pool pool;
|
||||
|
||||
std::atomic<bool> done{false};
|
||||
pool.execute(
|
||||
"charlie", 3U,
|
||||
[]() -> packet::error_type { return packet::error_type{0}; },
|
||||
[&](packet::error_type) -> void {
|
||||
done.store(true, std::memory_order_release);
|
||||
});
|
||||
ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500}));
|
||||
|
||||
pool.shutdown();
|
||||
|
||||
EXPECT_THROW(pool.execute(
|
||||
"charlie", 3U,
|
||||
[]() -> packet::error_type { return packet::error_type{0}; },
|
||||
[](packet::error_type) -> void {}),
|
||||
std::runtime_error);
|
||||
}
|
||||
|
||||
TEST(client_pool_test, worker_exception_is_contained_and_no_completion) {
|
||||
client_pool pool;
|
||||
|
||||
std::atomic<bool> completion_called{false};
|
||||
|
||||
pool.execute(
|
||||
"delta", 1U,
|
||||
[]() -> packet::error_type { throw std::runtime_error("boom"); },
|
||||
[&](packet::error_type) -> void {
|
||||
completion_called.store(true, std::memory_order_release);
|
||||
});
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds{150});
|
||||
|
||||
EXPECT_FALSE(completion_called.load(std::memory_order_acquire));
|
||||
}
|
||||
|
||||
TEST(client_pool_test, remove_expired_is_safe_to_call) {
|
||||
client_pool pool;
|
||||
EXPECT_NO_THROW(pool.remove_expired());
|
||||
}
|
||||
} // namespace repertory
|
Reference in New Issue
Block a user