client pool tests

This commit is contained in:
2025-10-04 16:52:47 -05:00
parent 46ab10b92f
commit 163c443d8e
2 changed files with 217 additions and 60 deletions

View File

@@ -28,7 +28,7 @@ namespace repertory {
class client_pool final {
public:
static constexpr const std::uint16_t default_expired_seconds{120U};
static constexpr const std::uint16_t min_expired_seconds{30U};
static constexpr const std::uint16_t min_expired_seconds{5U};
public:
using worker_callback = std::function<packet::error_type()>;

View File

@@ -29,34 +29,20 @@
namespace {
class client_pool_test : public ::testing::Test {
public:
repertory::console_consumer con_consumer;
repertory::console_consumer consumer;
protected:
void SetUp() override { repertory::event_system::instance().start(); }
void TearDown() override { repertory::event_system::instance().stop(); }
};
[[nodiscard]] inline auto wait_for_bool(std::atomic<bool> &flag,
std::chrono::milliseconds timeout)
-> bool {
const auto start{std::chrono::steady_clock::now()};
while (not flag.load(std::memory_order_acquire)) {
template <class callback_t>
[[nodiscard]] auto wait_until(callback_t callback,
std::chrono::milliseconds timeout) -> bool {
const auto began{std::chrono::steady_clock::now()};
while (not callback()) {
std::this_thread::sleep_for(std::chrono::milliseconds{1});
if (std::chrono::steady_clock::now() - start >= timeout) {
return false;
}
}
return true;
}
[[nodiscard]] inline auto wait_for_count(std::atomic<std::uint32_t> &count,
std::uint32_t target,
std::chrono::milliseconds timeout)
-> bool {
const auto start{std::chrono::steady_clock::now()};
while (count.load(std::memory_order_acquire) < target) {
std::this_thread::sleep_for(std::chrono::milliseconds{1});
if (std::chrono::steady_clock::now() - start >= timeout) {
if (std::chrono::steady_clock::now() - began >= timeout) {
return false;
}
}
@@ -65,7 +51,8 @@ protected:
} // namespace
namespace repertory {
TEST(client_pool_test, execute_invokes_completion) {
TEST_F(client_pool_test, execute_invokes_completion) {
client_pool pool;
std::atomic<bool> done{false};
@@ -74,13 +61,14 @@ TEST(client_pool_test, execute_invokes_completion) {
"alpha", 1U, []() -> packet::error_type { return packet::error_type{0}; },
[&](packet::error_type err) -> void {
EXPECT_EQ(err, packet::error_type{0});
done.store(true, std::memory_order_release);
done = true;
});
ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500}));
ASSERT_TRUE(wait_until([&]() -> bool { return done.load(); },
std::chrono::milliseconds{500}));
}
TEST(client_pool_test, fifo_on_same_thread_id) {
TEST_F(client_pool_test, fifo_on_same_thread_id) {
client_pool pool;
std::mutex vec_mutex;
@@ -99,8 +87,8 @@ TEST(client_pool_test, fifo_on_same_thread_id) {
});
}
ASSERT_TRUE(
wait_for_count(completed, count, std::chrono::milliseconds{2000}));
ASSERT_TRUE(wait_until([&]() -> bool { return completed.load() >= count; },
std::chrono::milliseconds{2000}));
ASSERT_EQ(order.size(), static_cast<std::size_t>(count));
for (int idx = 0; idx < static_cast<int>(count); ++idx) {
@@ -108,7 +96,7 @@ TEST(client_pool_test, fifo_on_same_thread_id) {
}
}
TEST(client_pool_test, parallel_on_different_thread_ids) {
TEST_F(client_pool_test, parallel_on_different_thread_ids) {
client_pool pool;
std::atomic<std::uint32_t> started{0U};
@@ -116,14 +104,8 @@ TEST(client_pool_test, parallel_on_different_thread_ids) {
const auto barrier = [&started]() -> void {
started.fetch_add(1U, std::memory_order_acq_rel);
auto start{std::chrono::steady_clock::now()};
while (started.load(std::memory_order_acquire) < 2U) {
std::this_thread::sleep_for(std::chrono::milliseconds{1});
if (std::chrono::steady_clock::now() - start >=
std::chrono::milliseconds{500}) {
break;
}
}
(void)wait_until([&]() -> bool { return started.load() >= 2U; },
std::chrono::milliseconds{500});
};
pool.execute(
@@ -150,58 +132,59 @@ TEST(client_pool_test, parallel_on_different_thread_ids) {
completed.fetch_add(1U, std::memory_order_acq_rel);
});
ASSERT_TRUE(wait_for_count(completed, 2U, std::chrono::milliseconds{1000}));
ASSERT_TRUE(wait_until([&]() -> bool { return completed.load() >= 2U; },
std::chrono::milliseconds{1000}));
}
TEST(client_pool_test, remove_client_then_recreate_pool) {
TEST_F(client_pool_test, remove_client_then_recreate_pool) {
client_pool pool;
std::atomic<bool> first_done{false};
std::atomic<bool> second_done{false};
pool.execute(
"bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; },
"moose", 7U, []() -> packet::error_type { return packet::error_type{0}; },
[&](packet::error_type err) -> void {
EXPECT_EQ(err, packet::error_type{0});
first_done.store(true, std::memory_order_release);
first_done = true;
});
ASSERT_TRUE(wait_for_bool(first_done, std::chrono::milliseconds{500}));
ASSERT_TRUE(wait_until([&]() -> bool { return first_done.load(); },
std::chrono::milliseconds{500}));
pool.remove_client("bravo");
pool.remove_client("moose");
pool.execute(
"bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; },
"moose", 7U, []() -> packet::error_type { return packet::error_type{0}; },
[&](packet::error_type err) -> void {
EXPECT_EQ(err, packet::error_type{0});
second_done.store(true, std::memory_order_release);
second_done = true;
});
ASSERT_TRUE(wait_for_bool(second_done, std::chrono::milliseconds{500}));
ASSERT_TRUE(wait_until([&]() -> bool { return second_done.load(); },
std::chrono::milliseconds{500}));
}
TEST(client_pool_test, shutdown_prevents_future_execute) {
TEST_F(client_pool_test, shutdown_prevents_future_execute) {
client_pool pool;
std::atomic<bool> done{false};
pool.execute(
"charlie", 3U,
[]() -> packet::error_type { return packet::error_type{0}; },
[&](packet::error_type) -> void {
done.store(true, std::memory_order_release);
});
ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500}));
"cmdc", 3U, []() -> packet::error_type { return packet::error_type{0}; },
[&](packet::error_type) -> void { done = true; });
ASSERT_TRUE(wait_until([&]() -> bool { return done.load(); },
std::chrono::milliseconds{500}));
pool.shutdown();
EXPECT_THROW(pool.execute(
"charlie", 3U,
"cmdc", 3U,
[]() -> packet::error_type { return packet::error_type{0}; },
[](packet::error_type) -> void {}),
std::runtime_error);
}
TEST(client_pool_test, worker_exception_is_contained_and_no_completion) {
TEST_F(client_pool_test, worker_exception_is_contained_and_no_completion) {
client_pool pool;
std::atomic<bool> completion_called{false};
@@ -209,17 +192,191 @@ TEST(client_pool_test, worker_exception_is_contained_and_no_completion) {
pool.execute(
"delta", 1U,
[]() -> packet::error_type { throw std::runtime_error("boom"); },
[&](packet::error_type) -> void {
completion_called.store(true, std::memory_order_release);
});
[&](packet::error_type) -> void { completion_called = true; });
std::this_thread::sleep_for(std::chrono::milliseconds{150});
EXPECT_FALSE(completion_called.load(std::memory_order_acquire));
EXPECT_FALSE(completion_called.load());
}
TEST(client_pool_test, remove_expired_is_safe_to_call) {
TEST_F(client_pool_test, remove_expired_is_safe_to_call) {
client_pool pool;
EXPECT_NO_THROW(pool.remove_expired());
}
TEST_F(client_pool_test, defaults_and_minimum_constants) {
client_pool pool;
EXPECT_EQ(pool.get_expired_seconds(), client_pool::default_expired_seconds);
EXPECT_EQ(client_pool::min_expired_seconds, static_cast<std::uint16_t>(5U));
}
TEST_F(client_pool_test, setter_clamps_below_minimum_to_minimum) {
client_pool pool;
pool.set_expired_seconds(0U);
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
pool.set_expired_seconds(1U);
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
pool.set_expired_seconds(
static_cast<std::uint16_t>(client_pool::min_expired_seconds - 1U));
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
pool.set_expired_seconds(client_pool::min_expired_seconds);
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
pool.set_expired_seconds(300U);
EXPECT_EQ(pool.get_expired_seconds(), static_cast<std::uint16_t>(300));
}
TEST_F(client_pool_test, does_not_remove_queues_before_minimum_threshold) {
client_pool pool;
pool.set_expired_seconds(1U);
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
std::string client_id{"alpha"};
std::thread::id tid_one_initial{};
std::thread::id tid_two_initial{};
std::atomic<bool> one_done{false};
std::atomic<bool> two_done{false};
pool.execute(
client_id, 1U,
[&]() -> packet::error_type {
tid_one_initial = std::this_thread::get_id();
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { one_done = true; });
pool.execute(
client_id, 2U,
[&]() -> packet::error_type {
tid_two_initial = std::this_thread::get_id();
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { two_done = true; });
ASSERT_TRUE(wait_until([&]() -> bool { return one_done.load(); },
std::chrono::milliseconds{500}));
ASSERT_TRUE(wait_until([&]() -> bool { return two_done.load(); },
std::chrono::milliseconds{500}));
std::this_thread::sleep_for(std::chrono::milliseconds{1100});
pool.remove_expired();
std::thread::id tid_one_after{};
std::thread::id tid_two_after{};
std::atomic<bool> one_again{false};
std::atomic<bool> two_again{false};
pool.execute(
client_id, 1U,
[&]() -> packet::error_type {
tid_one_after = std::this_thread::get_id();
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { one_again = true; });
pool.execute(
client_id, 2U,
[&]() -> packet::error_type {
tid_two_after = std::this_thread::get_id();
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { two_again = true; });
ASSERT_TRUE(wait_until([&]() -> bool { return one_again.load(); },
std::chrono::milliseconds{500}));
ASSERT_TRUE(wait_until([&]() -> bool { return two_again.load(); },
std::chrono::milliseconds{500}));
EXPECT_EQ(tid_one_after, tid_one_initial);
EXPECT_EQ(tid_two_after, tid_two_initial);
}
TEST_F(client_pool_test,
remove_expired_returns_quickly_when_no_queues_eligible) {
client_pool pool;
pool.set_expired_seconds(client_pool::min_expired_seconds);
EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds);
std::string client_id{"moose"};
std::atomic<bool> started{false};
constexpr auto job_ms = std::chrono::milliseconds{150};
pool.execute(
client_id, 1U,
[&]() -> packet::error_type {
started = true;
std::this_thread::sleep_for(job_ms);
return packet::error_type{0};
},
[&](const packet::error_type &) -> void {});
ASSERT_TRUE(wait_until([&]() -> bool { return started.load(); },
std::chrono::milliseconds{200}));
auto start_time{std::chrono::steady_clock::now()};
pool.remove_expired();
const auto elapsed_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_time);
EXPECT_LT(elapsed_ms.count(), 50);
}
TEST_F(client_pool_test, removes_after_minimum_threshold) {
client_pool pool;
pool.set_expired_seconds(client_pool::min_expired_seconds);
const auto threshold_secs = static_cast<int>(pool.get_expired_seconds());
EXPECT_GE(threshold_secs, static_cast<int>(client_pool::min_expired_seconds));
std::string client_id{"cmdc"};
static thread_local std::int32_t tls_counter{0};
std::atomic<std::int32_t> first_gen{-1};
std::atomic<bool> first_done{false};
pool.execute(
client_id, 1U,
[&]() -> packet::error_type {
first_gen = tls_counter;
tls_counter += 1;
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { first_done = true; });
ASSERT_TRUE(wait_until([&]() -> bool { return first_done.load(); },
std::chrono::milliseconds{500}));
std::this_thread::sleep_for(std::chrono::seconds{threshold_secs} +
std::chrono::milliseconds{200});
pool.remove_expired();
std::atomic<std::int32_t> second_gen{-1};
std::atomic<bool> second_done{false};
pool.execute(
client_id, 1U,
[&]() -> packet::error_type {
second_gen = tls_counter;
tls_counter += 1;
return packet::error_type{0};
},
[&](const packet::error_type &) -> void { second_done = true; });
ASSERT_TRUE(wait_until([&]() -> bool { return second_done.load(); },
std::chrono::milliseconds{500}));
EXPECT_EQ(first_gen.load(), 0);
EXPECT_EQ(second_gen.load(), 0);
}
} // namespace repertory