From 46ab10b92f06f36d435e59cdbee86d9e6f824d44 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 4 Oct 2025 16:26:34 -0500 Subject: [PATCH] client pool tests --- .../include/comm/packet/client_pool.hpp | 11 +- .../src/comm/packet/client_pool.cpp | 18 +- .../repertory_test/src/client_pool_test.cpp | 225 ++++++++++++++++++ 3 files changed, 248 insertions(+), 6 deletions(-) create mode 100644 repertory/repertory_test/src/client_pool_test.cpp diff --git a/repertory/librepertory/include/comm/packet/client_pool.hpp b/repertory/librepertory/include/comm/packet/client_pool.hpp index d62c1abc..bd4f0272 100644 --- a/repertory/librepertory/include/comm/packet/client_pool.hpp +++ b/repertory/librepertory/include/comm/packet/client_pool.hpp @@ -26,6 +26,10 @@ namespace repertory { class client_pool final { +public: + static constexpr const std::uint16_t default_expired_seconds{120U}; + static constexpr const std::uint16_t min_expired_seconds{30U}; + public: using worker_callback = std::function; using worker_complete_callback = @@ -86,7 +90,7 @@ private: void execute(std::uint64_t thread_id, worker_callback worker, worker_complete_callback worker_complete); - void remove_expired(); + void remove_expired(std::uint16_t seconds); void shutdown(); }; @@ -106,8 +110,11 @@ private: std::unordered_map> pool_lookup_; std::mutex pool_mutex_; stop_type shutdown_{false}; + std::atomic expired_seconds_{default_expired_seconds}; public: + [[nodiscard]] auto get_expired_seconds() const -> std::uint16_t; + void execute(std::string client_id, std::uint64_t thread_id, worker_callback worker, worker_complete_callback worker_complete); @@ -116,6 +123,8 @@ public: void remove_expired(); + void set_expired_seconds(std::uint16_t seconds); + void shutdown(); }; } // namespace repertory diff --git a/repertory/librepertory/src/comm/packet/client_pool.cpp b/repertory/librepertory/src/comm/packet/client_pool.cpp index 388bba70..b51bc3be 100644 --- a/repertory/librepertory/src/comm/packet/client_pool.cpp +++ b/repertory/librepertory/src/comm/packet/client_pool.cpp @@ -115,17 +115,17 @@ void client_pool::pool::execute(std::uint64_t thread_id, worker_callback worker, pool_queue->notify.notify_all(); } -void client_pool::pool::remove_expired() { +void client_pool::pool::remove_expired(std::uint16_t seconds) { auto now = std::chrono::steady_clock::now(); unique_mutex_lock pool_lock(pool_mtx_); auto results = std::accumulate( pool_queues_.begin(), pool_queues_.end(), std::unordered_map>(), - [&now](auto &&res, auto &&entry) -> auto { + [&now, seconds](auto &&res, auto &&entry) -> auto { auto duration = now - entry.second->modified.load(); - if (std::chrono::duration_cast(duration) >= - std::chrono::minutes(2U)) { + if (std::chrono::duration_cast(duration) >= + std::chrono::seconds(seconds)) { res[entry.first] = entry.second; } @@ -162,6 +162,10 @@ client_pool::client_pool() noexcept { "client_pool"); } +auto client_pool::get_expired_seconds() const -> std::uint16_t { + return expired_seconds_.load(); +} + void client_pool::execute(std::string client_id, std::uint64_t thread_id, worker_callback worker, worker_complete_callback worker_complete) { @@ -188,10 +192,14 @@ void client_pool::remove_client(std::string client_id) { void client_pool::remove_expired() { mutex_lock pool_lock(pool_mutex_); for (auto &entry : pool_lookup_) { - entry.second->remove_expired(); + entry.second->remove_expired(expired_seconds_); } } +void client_pool::set_expired_seconds(std::uint16_t seconds) { + expired_seconds_ = std::max(std::uint16_t{min_expired_seconds}, seconds); +} + void client_pool::shutdown() { REPERTORY_USES_FUNCTION_NAME(); diff --git a/repertory/repertory_test/src/client_pool_test.cpp b/repertory/repertory_test/src/client_pool_test.cpp new file mode 100644 index 00000000..64855a41 --- /dev/null +++ b/repertory/repertory_test/src/client_pool_test.cpp @@ -0,0 +1,225 @@ +/* + Copyright <2018-2025> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +#include "test_common.hpp" + +#include "comm/packet/client_pool.hpp" +#include "events/consumers/console_consumer.hpp" +#include "events/event_system.hpp" + +namespace { +class client_pool_test : public ::testing::Test { +public: + repertory::console_consumer con_consumer; + +protected: + void SetUp() override { repertory::event_system::instance().start(); } + void TearDown() override { repertory::event_system::instance().stop(); } +}; + +[[nodiscard]] inline auto wait_for_bool(std::atomic &flag, + std::chrono::milliseconds timeout) + -> bool { + const auto start{std::chrono::steady_clock::now()}; + while (not flag.load(std::memory_order_acquire)) { + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + if (std::chrono::steady_clock::now() - start >= timeout) { + return false; + } + } + return true; +} + +[[nodiscard]] inline auto wait_for_count(std::atomic &count, + std::uint32_t target, + std::chrono::milliseconds timeout) + -> bool { + const auto start{std::chrono::steady_clock::now()}; + while (count.load(std::memory_order_acquire) < target) { + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + if (std::chrono::steady_clock::now() - start >= timeout) { + return false; + } + } + return true; +} +} // namespace + +namespace repertory { +TEST(client_pool_test, execute_invokes_completion) { + client_pool pool; + + std::atomic done{false}; + + pool.execute( + "alpha", 1U, []() -> packet::error_type { return packet::error_type{0}; }, + [&](packet::error_type err) -> void { + EXPECT_EQ(err, packet::error_type{0}); + done.store(true, std::memory_order_release); + }); + + ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500})); +} + +TEST(client_pool_test, fifo_on_same_thread_id) { + client_pool pool; + + std::mutex vec_mutex; + std::vector order; + std::atomic completed{0U}; + + constexpr std::uint32_t count{10U}; + for (std::uint32_t idx = 0U; idx < count; ++idx) { + pool.execute( + "alpha", 42U, + [idx]() -> packet::error_type { return static_cast(idx); }, + [&](auto && /*err*/) -> void { + std::lock_guard lock(vec_mutex); + order.emplace_back(static_cast(order.size())); + completed.fetch_add(1U, std::memory_order_acq_rel); + }); + } + + ASSERT_TRUE( + wait_for_count(completed, count, std::chrono::milliseconds{2000})); + + ASSERT_EQ(order.size(), static_cast(count)); + for (int idx = 0; idx < static_cast(count); ++idx) { + EXPECT_EQ(order[static_cast(idx)], idx); + } +} + +TEST(client_pool_test, parallel_on_different_thread_ids) { + client_pool pool; + + std::atomic started{0U}; + std::atomic completed{0U}; + + const auto barrier = [&started]() -> void { + started.fetch_add(1U, std::memory_order_acq_rel); + auto start{std::chrono::steady_clock::now()}; + while (started.load(std::memory_order_acquire) < 2U) { + std::this_thread::sleep_for(std::chrono::milliseconds{1}); + if (std::chrono::steady_clock::now() - start >= + std::chrono::milliseconds{500}) { + break; + } + } + }; + + pool.execute( + "alpha", 1U, + [&]() -> packet::error_type { + barrier(); + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + return packet::error_type{0}; + }, + [&](packet::error_type err) -> void { + EXPECT_EQ(err, packet::error_type{0}); + completed.fetch_add(1U, std::memory_order_acq_rel); + }); + + pool.execute( + "alpha", 2U, + [&]() -> packet::error_type { + barrier(); + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + return packet::error_type{0}; + }, + [&](packet::error_type err) -> void { + EXPECT_EQ(err, packet::error_type{0}); + completed.fetch_add(1U, std::memory_order_acq_rel); + }); + + ASSERT_TRUE(wait_for_count(completed, 2U, std::chrono::milliseconds{1000})); +} + +TEST(client_pool_test, remove_client_then_recreate_pool) { + client_pool pool; + + std::atomic first_done{false}; + std::atomic second_done{false}; + + pool.execute( + "bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; }, + [&](packet::error_type err) -> void { + EXPECT_EQ(err, packet::error_type{0}); + first_done.store(true, std::memory_order_release); + }); + + ASSERT_TRUE(wait_for_bool(first_done, std::chrono::milliseconds{500})); + + pool.remove_client("bravo"); + + pool.execute( + "bravo", 7U, []() -> packet::error_type { return packet::error_type{0}; }, + [&](packet::error_type err) -> void { + EXPECT_EQ(err, packet::error_type{0}); + second_done.store(true, std::memory_order_release); + }); + + ASSERT_TRUE(wait_for_bool(second_done, std::chrono::milliseconds{500})); +} + +TEST(client_pool_test, shutdown_prevents_future_execute) { + client_pool pool; + + std::atomic done{false}; + pool.execute( + "charlie", 3U, + []() -> packet::error_type { return packet::error_type{0}; }, + [&](packet::error_type) -> void { + done.store(true, std::memory_order_release); + }); + ASSERT_TRUE(wait_for_bool(done, std::chrono::milliseconds{500})); + + pool.shutdown(); + + EXPECT_THROW(pool.execute( + "charlie", 3U, + []() -> packet::error_type { return packet::error_type{0}; }, + [](packet::error_type) -> void {}), + std::runtime_error); +} + +TEST(client_pool_test, worker_exception_is_contained_and_no_completion) { + client_pool pool; + + std::atomic completion_called{false}; + + pool.execute( + "delta", 1U, + []() -> packet::error_type { throw std::runtime_error("boom"); }, + [&](packet::error_type) -> void { + completion_called.store(true, std::memory_order_release); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds{150}); + + EXPECT_FALSE(completion_called.load(std::memory_order_acquire)); +} + +TEST(client_pool_test, remove_expired_is_safe_to_call) { + client_pool pool; + EXPECT_NO_THROW(pool.remove_expired()); +} +} // namespace repertory