This commit is contained in:
2025-10-01 16:01:34 -05:00
parent 4a360fa1e0
commit f83c4954af
5 changed files with 331 additions and 61 deletions

View File

@@ -48,81 +48,94 @@ private:
boost::asio::ip::tcp::socket &sock;
};
template <class op_t>
inline void
run_with_deadline(boost::asio::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock, op_t operation,
std::chrono::milliseconds deadline,
std::string_view event_name, std::string_view function_name);
void apply_common_socket_properties(boost::asio::ip::tcp::socket &sock);
[[nodiscard]] auto is_socket_still_alive(boost::asio::ip::tcp::socket &sock)
-> bool;
void connect_with_deadline(
boost::asio::io_context &io_ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::io_context &ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type
&endpoints,
std::chrono::milliseconds deadline);
void read_exact_with_deadline(boost::asio::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline);
void write_all_with_deadline(boost::asio::io_context &io_ctx,
void read_exact_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline);
template <class op_t>
inline void
run_with_deadline(boost::asio::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock, op_t operation,
auto run_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock, op_t &&operation,
std::chrono::milliseconds deadline,
std::string_view event_name, std::string_view function_name) {
std::string event_name, std::string function_name)
-> void;
void write_all_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline);
template <class op_t>
auto run_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock, op_t &&operation,
std::chrono::milliseconds deadline,
std::string event_name, std::string function_name)
-> void {
deadline = std::max(deadline, std::chrono::milliseconds{250});
struct request_state final {
request_state(boost::asio::io_context &ctx_,
boost::asio::ip::tcp::socket &sock_)
: sock(sock_),
timer(ctx_),
work_guard(boost::asio::make_work_guard(ctx_.get_executor())) {}
boost::asio::ip::tcp::socket &sock;
boost::asio::steady_timer timer;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard;
std::atomic<bool> done{false};
std::atomic<bool> timed_out{false};
boost::system::error_code err;
std::atomic<bool> timed_out{false};
};
auto state = std::make_shared<request_state>();
boost::asio::steady_timer timer{io_ctx};
timer.expires_after(deadline);
timer.async_wait([state, &sock](auto &&err) {
if (not err && not state->done) {
state->timed_out = true;
boost::system::error_code ignored_ec;
[[maybe_unused]] auto res = sock.cancel(ignored_ec);
auto state_ptr = std::make_shared<request_state>(ctx, sock);
state_ptr->timer.expires_after(deadline);
state_ptr->timer.async_wait([state_ptr](auto &&err) {
if (not err and not state_ptr->done) {
state_ptr->timed_out = true;
boost::system::error_code err2;
[[maybe_unused]] auto res = state_ptr->sock.cancel(err2);
}
});
operation([state](auto &&err) {
state->err = err;
state->done = true;
operation([state_ptr](auto &&err) {
state_ptr->err = err;
state_ptr->done = true;
});
io_ctx.restart();
while (not state->done && not state->timed_out) {
io_ctx.run_one();
ctx.restart();
while (not state_ptr->done and not state_ptr->timed_out) {
ctx.run_one();
}
timer.cancel();
state_ptr->timer.cancel();
while (ctx.poll_one() > 0) {
}
if (state->timed_out) {
state_ptr->work_guard.reset();
if (state_ptr->timed_out) {
repertory::event_system::instance().raise<repertory::packet_client_timeout>(
event_name, function_name);
throw std::runtime_error(std::string{event_name} + " timed-out");
throw std::runtime_error(fmt::format("{} timed-out", event_name));
}
if (state->err) {
throw std::runtime_error(std::string{event_name} + " failed|err|" +
state->err.message());
if (state_ptr->err) {
throw std::runtime_error(
fmt::format("{} failed|err|{}", event_name, state_ptr->err.message()));
}
}
} // namespace repertory::comm

View File

@@ -73,14 +73,14 @@ void apply_common_socket_properties(boost::asio::ip::tcp::socket &sock) {
}
void connect_with_deadline(
boost::asio::io_context &io_ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::io_context &ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type
&endpoints,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
run_with_deadline(
io_ctx, sock,
ctx, sock,
[&sock, &endpoints](auto &&handler) {
boost::asio::async_connect(
sock, endpoints, [handler](auto &&err, auto &&) { handler(err); });
@@ -88,7 +88,7 @@ void connect_with_deadline(
deadline, "connect", std::string{function_name});
}
void read_exact_with_deadline(boost::asio::io_context &io_ctx,
void read_exact_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline) {
@@ -103,7 +103,7 @@ void read_exact_with_deadline(boost::asio::io_context &io_ctx,
std::size_t bytes_read = 0U;
run_with_deadline(
io_ctx, sock,
ctx, sock,
[&](auto &&handler) {
sock.async_read_some(
boost::asio::buffer(base + offset, total - offset),
@@ -122,7 +122,7 @@ void read_exact_with_deadline(boost::asio::io_context &io_ctx,
}
}
void write_all_with_deadline(boost::asio::io_context &io_ctx,
void write_all_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline) {
@@ -136,7 +136,7 @@ void write_all_with_deadline(boost::asio::io_context &io_ctx,
std::size_t bytes_written = 0U;
run_with_deadline(
io_ctx, sock,
ctx, sock,
[&](auto &&handler) {
sock.async_write_some(
boost::asio::buffer(base + offset, total - offset),

View File

@@ -57,9 +57,9 @@ packet_client::~packet_client() {
}
void packet_client::close(client &cli) noexcept {
boost::system::error_code err1;
boost::system::error_code err;
[[maybe_unused]] auto res =
cli.socket.shutdown(boost::asio::socket_base::shutdown_both, err1);
cli.socket.shutdown(boost::asio::socket_base::shutdown_both, err);
boost::system::error_code err2;
[[maybe_unused]] auto res2 = cli.socket.close(err2);

View File

@@ -1,12 +1,16 @@
/*
Copyright <2018-2025> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to do so, subject to the
following conditions: The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -17,7 +21,6 @@
*/
#include "test_common.hpp"
#include "comm/packet/common.hpp"
#include "comm/packet/packet.hpp"
#include "comm/packet/packet_client.hpp"
#include "comm/packet/packet_server.hpp"

View File

@@ -1,30 +1,284 @@
/*
Copyright <2018-2025> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: The above copyright
notice and this permission notice shall be included in all copies or
substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS",
WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "test_common.hpp"
#include "comm/packet/common.hpp"
using boost::asio::ip::tcp;
namespace {
using namespace std::chrono_literals;
class packet_comm_common_test2 : public ::testing::Test {
protected:
void SetUp() override { repertory::event_system::instance().start(); }
void TearDown() override { repertory::event_system::instance().stop(); }
};
class loopback_server final {
public:
loopback_server(std::size_t send_bytes,
std::chrono::milliseconds delay_before_send, bool never_send)
: acceptor_{io_ctx_, {boost::asio::ip::tcp::v4(), 0}},
send_bytes_{send_bytes},
delay_before_send_{delay_before_send},
never_send_{never_send} {
start_accept();
io_thr_ = std::thread([this]() { io_ctx_.run(); });
}
~loopback_server() {
boost::system::error_code ign{};
[[maybe_unused]] auto res = acceptor_.close(ign);
io_ctx_.stop();
if (io_thr_.joinable()) {
io_thr_.join();
}
}
[[nodiscard]] auto port() const -> std::uint16_t {
return acceptor_.local_endpoint().port();
}
private:
auto start_accept() -> void {
auto sock_ptr = std::make_shared<boost::asio::ip::tcp::socket>(io_ctx_);
acceptor_.async_accept(*sock_ptr, [this, sock_ptr](auto &&err) {
if (err) {
return;
}
if (never_send_) {
return;
}
auto tmr_ptr = std::make_shared<boost::asio::steady_timer>(io_ctx_);
tmr_ptr->expires_after(delay_before_send_);
tmr_ptr->async_wait([this, sock_ptr, tmr_ptr](auto &&err2) {
if (err2) {
return;
}
auto buf_ptr =
std::make_shared<std::vector<std::uint8_t>>(send_bytes_, 0xAB);
boost::asio::async_write(
*sock_ptr, boost::asio::buffer(*buf_ptr),
[buf_ptr, sock_ptr](auto &&, auto &&) {
boost::system::error_code ign{};
[[maybe_unused]] auto res = sock_ptr->shutdown(
boost::asio::ip::tcp::socket::shutdown_both, ign);
[[maybe_unused]] auto res2 = sock_ptr->close(ign);
});
});
});
}
boost::asio::io_context io_ctx_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread io_thr_;
std::size_t send_bytes_;
std::chrono::milliseconds delay_before_send_;
bool never_send_;
};
auto connect_client(boost::asio::ip::tcp::socket &sock, std::uint16_t srv_port)
-> void {
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::address_v4::loopback(),
srv_port,
};
boost::system::error_code err;
[[maybe_unused]] auto res = sock.connect(endpoint, err);
ASSERT_FALSE(err) << "connect failed: " << err.message();
}
} // namespace
namespace repertory {
TEST_F(packet_comm_common_test2, run_with_deadline_op_finishes_first) {
constexpr std::size_t test_bytes{16U * 1024U};
const auto deadline = 750ms;
const auto server_delay = 50ms;
loopback_server server{test_bytes, server_delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_NO_THROW({
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "op_finishes_first");
});
for (auto val : recv_buf) {
ASSERT_EQ(val, 0xAB);
}
}
TEST_F(packet_comm_common_test2, run_with_deadline_timeout_fires_first) {
constexpr std::size_t test_bytes{8U * 1024U};
const auto deadline = 100ms;
const auto server_delay = 1500ms;
loopback_server server{test_bytes, server_delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_THROW(
{
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "timeout_fires_first");
},
std::runtime_error);
}
TEST_F(packet_comm_common_test2, run_with_deadline_threaded_boundary_stress) {
using namespace std::chrono_literals;
constexpr int thread_count{6};
constexpr int iter_count{16};
constexpr std::size_t test_bytes{4U * 1024U};
const auto deadline = 250ms;
std::atomic<int> seen_timeouts{0};
std::atomic<int> seen_success{0};
std::vector<std::thread> thr_pool;
thr_pool.reserve(thread_count);
for (int tid = 0; tid < thread_count; ++tid) {
thr_pool.emplace_back([tid, deadline, &seen_timeouts, &seen_success]() {
std::mt19937 rng(static_cast<std::mt19937::result_type>(
std::chrono::high_resolution_clock::now().time_since_epoch().count() +
tid));
std::uniform_int_distribution<int> tiny_jitter{-10, 10};
for (int iter = 0; iter < iter_count; ++iter) {
const bool make_timeout = (iter % 2) == 1;
auto delay = make_timeout
? (deadline + 200ms +
std::chrono::milliseconds{tiny_jitter(rng)})
: (deadline - 100ms +
std::chrono::milliseconds{tiny_jitter(rng)});
if (delay < 0ms) {
delay = 0ms;
}
loopback_server server{test_bytes, delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::address_v4::loopback(),
server.port(),
};
boost::system::error_code err{};
[[maybe_unused]] auto res = cli_sock.connect(endpoint, err);
if (err) {
ADD_FAILURE() << "connect failed: " << err.message();
continue;
}
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
try {
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err2, auto &&) { complete(err2); });
},
deadline, "read_exact", "threaded_boundary_stress");
seen_success.fetch_add(1);
} catch (const std::runtime_error &ex) {
std::string msg{ex.what()};
if (msg.find("timed-out") != std::string::npos) {
seen_timeouts.fetch_add(1);
} else {
ADD_FAILURE() << "unexpected runtime_error: " << msg;
}
}
}
});
}
for (auto &thr : thr_pool) {
thr.join();
}
EXPECT_GT(seen_timeouts.load(), 0);
EXPECT_GT(seen_success.load(), 0);
}
TEST_F(packet_comm_common_test2, run_with_deadline_server_never_sends) {
constexpr std::size_t test_bytes{2U * 1024U};
const auto deadline = 120ms;
loopback_server server{test_bytes, 0ms, true};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_THROW(
{
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "server_never_sends");
},
std::runtime_error);
}
TEST(packet_comm_common_test, operation_completes_prior_to_timeout) {
boost::asio::io_context io_ctx;
tcp::socket sock(io_ctx);
boost::asio::ip::tcp::socket sock(io_ctx);
std::atomic<bool> completed{false};
@@ -46,7 +300,7 @@ TEST(packet_comm_common_test, operation_completes_prior_to_timeout) {
TEST(packet_comm_common_test, timeout_completes_prior_to_operation) {
boost::asio::io_context io_ctx;
tcp::socket sock(io_ctx);
boost::asio::ip::tcp::socket sock(io_ctx);
std::atomic<bool> completed{false};