From 163c443d8e9bf953cbc0eb70d26a876ec6ecba16 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 4 Oct 2025 16:52:47 -0500 Subject: [PATCH] client pool tests --- .../include/comm/packet/client_pool.hpp | 2 +- .../repertory_test/src/client_pool_test.cpp | 275 ++++++++++++++---- 2 files changed, 217 insertions(+), 60 deletions(-) diff --git a/repertory/librepertory/include/comm/packet/client_pool.hpp b/repertory/librepertory/include/comm/packet/client_pool.hpp index bd4f0272..5a5a6837 100644 --- a/repertory/librepertory/include/comm/packet/client_pool.hpp +++ b/repertory/librepertory/include/comm/packet/client_pool.hpp @@ -28,7 +28,7 @@ namespace repertory { class client_pool final { public: static constexpr const std::uint16_t default_expired_seconds{120U}; - static constexpr const std::uint16_t min_expired_seconds{30U}; + static constexpr const std::uint16_t min_expired_seconds{5U}; public: using worker_callback = std::function; diff --git a/repertory/repertory_test/src/client_pool_test.cpp b/repertory/repertory_test/src/client_pool_test.cpp index 64855a41..59d217c5 100644 --- a/repertory/repertory_test/src/client_pool_test.cpp +++ b/repertory/repertory_test/src/client_pool_test.cpp @@ -29,34 +29,20 @@ namespace { class client_pool_test : public ::testing::Test { public: - repertory::console_consumer con_consumer; + repertory::console_consumer consumer; protected: void SetUp() override { repertory::event_system::instance().start(); } void TearDown() override { repertory::event_system::instance().stop(); } }; -[[nodiscard]] inline auto wait_for_bool(std::atomic &flag, - std::chrono::milliseconds timeout) - -> bool { - const auto start{std::chrono::steady_clock::now()}; - while (not flag.load(std::memory_order_acquire)) { +template +[[nodiscard]] auto wait_until(callback_t callback, + std::chrono::milliseconds timeout) -> bool { + const auto began{std::chrono::steady_clock::now()}; + while (not callback()) { std::this_thread::sleep_for(std::chrono::milliseconds{1}); - if (std::chrono::steady_clock::now() - start >= timeout) { - return false; - } - } - return true; -} - -[[nodiscard]] inline auto wait_for_count(std::atomic &count, - std::uint32_t target, - std::chrono::milliseconds timeout) - -> bool { - const auto start{std::chrono::steady_clock::now()}; - while (count.load(std::memory_order_acquire) < target) { - std::this_thread::sleep_for(std::chrono::milliseconds{1}); - if (std::chrono::steady_clock::now() - start >= timeout) { + if (std::chrono::steady_clock::now() - began >= timeout) { return false; } } @@ -65,7 +51,8 @@ protected: } // namespace namespace repertory { -TEST(client_pool_test, execute_invokes_completion) { + +TEST_F(client_pool_test, execute_invokes_completion) { client_pool pool; std::atomic done{false}; @@ -74,13 +61,14 @@ TEST(client_pool_test, execute_invokes_completion) { "alpha", 1U, []() -> packet::error_type { return packet::error_type{0}; }, [&](packet::error_type err) -> void { EXPECT_EQ(err, packet::error_type{0}); - done.store(true, std::memory_order_release); + done = true; }); - ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500})); + ASSERT_TRUE(wait_until([&]() -> bool { return done.load(); }, + std::chrono::milliseconds{500})); } -TEST(client_pool_test, fifo_on_same_thread_id) { +TEST_F(client_pool_test, fifo_on_same_thread_id) { client_pool pool; std::mutex vec_mutex; @@ -99,8 +87,8 @@ TEST(client_pool_test, fifo_on_same_thread_id) { }); } - ASSERT_TRUE( - wait_for_count(completed, count, std::chrono::milliseconds{2000})); + ASSERT_TRUE(wait_until([&]() -> bool { return completed.load() >= count; }, + std::chrono::milliseconds{2000})); ASSERT_EQ(order.size(), static_cast(count)); for (int idx = 0; idx < static_cast(count); ++idx) { @@ -108,7 +96,7 @@ TEST(client_pool_test, fifo_on_same_thread_id) { } } -TEST(client_pool_test, parallel_on_different_thread_ids) { +TEST_F(client_pool_test, parallel_on_different_thread_ids) { client_pool pool; std::atomic started{0U}; @@ -116,14 +104,8 @@ TEST(client_pool_test, parallel_on_different_thread_ids) { const auto barrier = [&started]() -> void { started.fetch_add(1U, std::memory_order_acq_rel); - auto start{std::chrono::steady_clock::now()}; - while (started.load(std::memory_order_acquire) < 2U) { - std::this_thread::sleep_for(std::chrono::milliseconds{1}); - if (std::chrono::steady_clock::now() - start >= - std::chrono::milliseconds{500}) { - break; - } - } + (void)wait_until([&]() -> bool { return started.load() >= 2U; }, + std::chrono::milliseconds{500}); }; pool.execute( @@ -150,58 +132,59 @@ TEST(client_pool_test, parallel_on_different_thread_ids) { completed.fetch_add(1U, std::memory_order_acq_rel); }); - ASSERT_TRUE(wait_for_count(completed, 2U, std::chrono::milliseconds{1000})); + ASSERT_TRUE(wait_until([&]() -> bool { return completed.load() >= 2U; }, + std::chrono::milliseconds{1000})); } -TEST(client_pool_test, remove_client_then_recreate_pool) { +TEST_F(client_pool_test, remove_client_then_recreate_pool) { client_pool pool; std::atomic first_done{false}; std::atomic second_done{false}; pool.execute( - "bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; }, + "moose", 7U, []() -> packet::error_type { return packet::error_type{0}; }, [&](packet::error_type err) -> void { EXPECT_EQ(err, packet::error_type{0}); - first_done.store(true, std::memory_order_release); + first_done = true; }); - ASSERT_TRUE(wait_for_bool(first_done, std::chrono::milliseconds{500})); + ASSERT_TRUE(wait_until([&]() -> bool { return first_done.load(); }, + std::chrono::milliseconds{500})); - pool.remove_client("bravo"); + pool.remove_client("moose"); pool.execute( - "bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; }, + "moose", 7U, []() -> packet::error_type { return packet::error_type{0}; }, [&](packet::error_type err) -> void { EXPECT_EQ(err, packet::error_type{0}); - second_done.store(true, std::memory_order_release); + second_done = true; }); - ASSERT_TRUE(wait_for_bool(second_done, std::chrono::milliseconds{500})); + ASSERT_TRUE(wait_until([&]() -> bool { return second_done.load(); }, + std::chrono::milliseconds{500})); } -TEST(client_pool_test, shutdown_prevents_future_execute) { +TEST_F(client_pool_test, shutdown_prevents_future_execute) { client_pool pool; std::atomic done{false}; pool.execute( - "charlie", 3U, - []() -> packet::error_type { return packet::error_type{0}; }, - [&](packet::error_type) -> void { - done.store(true, std::memory_order_release); - }); - ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500})); + "cmdc", 3U, []() -> packet::error_type { return packet::error_type{0}; }, + [&](packet::error_type) -> void { done = true; }); + ASSERT_TRUE(wait_until([&]() -> bool { return done.load(); }, + std::chrono::milliseconds{500})); pool.shutdown(); EXPECT_THROW(pool.execute( - "charlie", 3U, + "cmdc", 3U, []() -> packet::error_type { return packet::error_type{0}; }, [](packet::error_type) -> void {}), std::runtime_error); } -TEST(client_pool_test, worker_exception_is_contained_and_no_completion) { +TEST_F(client_pool_test, worker_exception_is_contained_and_no_completion) { client_pool pool; std::atomic completion_called{false}; @@ -209,17 +192,191 @@ TEST(client_pool_test, worker_exception_is_contained_and_no_completion) { pool.execute( "delta", 1U, []() -> packet::error_type { throw std::runtime_error("boom"); }, - [&](packet::error_type) -> void { - completion_called.store(true, std::memory_order_release); - }); + [&](packet::error_type) -> void { completion_called = true; }); std::this_thread::sleep_for(std::chrono::milliseconds{150}); - EXPECT_FALSE(completion_called.load(std::memory_order_acquire)); + EXPECT_FALSE(completion_called.load()); } -TEST(client_pool_test, remove_expired_is_safe_to_call) { +TEST_F(client_pool_test, remove_expired_is_safe_to_call) { client_pool pool; EXPECT_NO_THROW(pool.remove_expired()); } + +TEST_F(client_pool_test, defaults_and_minimum_constants) { + client_pool pool; + + EXPECT_EQ(pool.get_expired_seconds(), client_pool::default_expired_seconds); + EXPECT_EQ(client_pool::min_expired_seconds, static_cast(5U)); +} + +TEST_F(client_pool_test, setter_clamps_below_minimum_to_minimum) { + client_pool pool; + + pool.set_expired_seconds(0U); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + pool.set_expired_seconds(1U); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + pool.set_expired_seconds( + static_cast(client_pool::min_expired_seconds - 1U)); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + pool.set_expired_seconds(client_pool::min_expired_seconds); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + pool.set_expired_seconds(300U); + EXPECT_EQ(pool.get_expired_seconds(), static_cast(300)); +} + +TEST_F(client_pool_test, does_not_remove_queues_before_minimum_threshold) { + client_pool pool; + + pool.set_expired_seconds(1U); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + std::string client_id{"alpha"}; + + std::thread::id tid_one_initial{}; + std::thread::id tid_two_initial{}; + std::atomic one_done{false}; + std::atomic two_done{false}; + + pool.execute( + client_id, 1U, + [&]() -> packet::error_type { + tid_one_initial = std::this_thread::get_id(); + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { one_done = true; }); + + pool.execute( + client_id, 2U, + [&]() -> packet::error_type { + tid_two_initial = std::this_thread::get_id(); + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { two_done = true; }); + + ASSERT_TRUE(wait_until([&]() -> bool { return one_done.load(); }, + std::chrono::milliseconds{500})); + ASSERT_TRUE(wait_until([&]() -> bool { return two_done.load(); }, + std::chrono::milliseconds{500})); + + std::this_thread::sleep_for(std::chrono::milliseconds{1100}); + pool.remove_expired(); + + std::thread::id tid_one_after{}; + std::thread::id tid_two_after{}; + std::atomic one_again{false}; + std::atomic two_again{false}; + + pool.execute( + client_id, 1U, + [&]() -> packet::error_type { + tid_one_after = std::this_thread::get_id(); + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { one_again = true; }); + + pool.execute( + client_id, 2U, + [&]() -> packet::error_type { + tid_two_after = std::this_thread::get_id(); + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { two_again = true; }); + + ASSERT_TRUE(wait_until([&]() -> bool { return one_again.load(); }, + std::chrono::milliseconds{500})); + ASSERT_TRUE(wait_until([&]() -> bool { return two_again.load(); }, + std::chrono::milliseconds{500})); + + EXPECT_EQ(tid_one_after, tid_one_initial); + EXPECT_EQ(tid_two_after, tid_two_initial); +} + +TEST_F(client_pool_test, + remove_expired_returns_quickly_when_no_queues_eligible) { + client_pool pool; + + pool.set_expired_seconds(client_pool::min_expired_seconds); + EXPECT_EQ(pool.get_expired_seconds(), client_pool::min_expired_seconds); + + std::string client_id{"moose"}; + + std::atomic started{false}; + constexpr auto job_ms = std::chrono::milliseconds{150}; + + pool.execute( + client_id, 1U, + [&]() -> packet::error_type { + started = true; + std::this_thread::sleep_for(job_ms); + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void {}); + + ASSERT_TRUE(wait_until([&]() -> bool { return started.load(); }, + std::chrono::milliseconds{200})); + + auto start_time{std::chrono::steady_clock::now()}; + pool.remove_expired(); + const auto elapsed_ms = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time); + + EXPECT_LT(elapsed_ms.count(), 50); +} + +TEST_F(client_pool_test, removes_after_minimum_threshold) { + client_pool pool; + + pool.set_expired_seconds(client_pool::min_expired_seconds); + const auto threshold_secs = static_cast(pool.get_expired_seconds()); + EXPECT_GE(threshold_secs, static_cast(client_pool::min_expired_seconds)); + + std::string client_id{"cmdc"}; + + static thread_local std::int32_t tls_counter{0}; + + std::atomic first_gen{-1}; + std::atomic first_done{false}; + + pool.execute( + client_id, 1U, + [&]() -> packet::error_type { + first_gen = tls_counter; + tls_counter += 1; + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { first_done = true; }); + + ASSERT_TRUE(wait_until([&]() -> bool { return first_done.load(); }, + std::chrono::milliseconds{500})); + + std::this_thread::sleep_for(std::chrono::seconds{threshold_secs} + + std::chrono::milliseconds{200}); + + pool.remove_expired(); + + std::atomic second_gen{-1}; + std::atomic second_done{false}; + + pool.execute( + client_id, 1U, + [&]() -> packet::error_type { + second_gen = tls_counter; + tls_counter += 1; + return packet::error_type{0}; + }, + [&](const packet::error_type &) -> void { second_done = true; }); + + ASSERT_TRUE(wait_until([&]() -> bool { return second_done.load(); }, + std::chrono::milliseconds{500})); + + EXPECT_EQ(first_gen.load(), 0); + EXPECT_EQ(second_gen.load(), 0); +} } // namespace repertory