revert timer implementation

This commit is contained in:
2025-10-01 23:07:53 -05:00
parent 04518d7b8c
commit 71bf6e107b
5 changed files with 71 additions and 520 deletions

View File

@@ -22,10 +22,6 @@
#ifndef REPERTORY_INCLUDE_COMM_PACKET_COMMON_HPP_
#define REPERTORY_INCLUDE_COMM_PACKET_COMMON_HPP_
#include "events/event_system.hpp"
#include "events/types/packet_client_timeout.hpp"
#include "utils/common.hpp"
namespace repertory::comm {
inline static constexpr std::uint32_t max_packet_bytes{32U * 1024U * 1024U};
inline constexpr const std::uint8_t max_read_attempts{5U};
@@ -52,91 +48,6 @@ void apply_common_socket_properties(boost::asio::ip::tcp::socket &sock);
[[nodiscard]] auto is_socket_still_alive(boost::asio::ip::tcp::socket &sock)
-> bool;
void connect_with_deadline(
boost::asio::io_context &ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type
&endpoints,
std::chrono::milliseconds deadline);
void read_exact_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline);
template <class op_t>
void run_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock, op_t &&operation,
std::chrono::milliseconds deadline,
std::string event_name, std::string function_name);
void write_all_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline);
template <class op_t>
void run_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock, op_t &&operation,
std::chrono::milliseconds deadline,
std::string event_name, std::string function_name) {
deadline = std::max(deadline, std::chrono::milliseconds{250});
struct request_state final {
request_state(boost::asio::io_context &ctx_,
boost::asio::ip::tcp::socket &sock_)
: sock(sock_),
timer(ctx_),
work_guard(boost::asio::make_work_guard(ctx_.get_executor())) {}
boost::asio::ip::tcp::socket &sock;
boost::asio::steady_timer timer;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard;
std::atomic<bool> done{false};
boost::system::error_code err;
std::atomic<bool> timed_out{false};
};
auto state_ptr = std::make_shared<request_state>(ctx, sock);
state_ptr->timer.expires_after(deadline);
state_ptr->timer.async_wait([state_ptr](auto &&err) {
if (not err and not state_ptr->done) {
state_ptr->timed_out = true;
}
});
operation([state_ptr](auto &&err) {
state_ptr->err = err;
state_ptr->done = true;
});
ctx.restart();
while (not state_ptr->done and not state_ptr->timed_out) {
ctx.run_one();
}
auto timed_out = state_ptr->timed_out.load();
if (timed_out) {
boost::system::error_code err;
[[maybe_unused]] auto res = state_ptr->sock.cancel(err);
}
state_ptr->timer.cancel();
state_ptr->work_guard.reset();
if (timed_out) {
repertory::event_system::instance().raise<repertory::packet_client_timeout>(
event_name, function_name);
throw std::runtime_error(fmt::format("{} timed-out", event_name));
}
if (state_ptr->err) {
throw std::runtime_error(
fmt::format("{} failed|err|{}", event_name, state_ptr->err.message()));
}
}
} // namespace repertory::comm
#endif // REPERTORY_INCLUDE_COMM_PACKET_COMMON_HPP_

View File

@@ -69,19 +69,17 @@ private:
[[nodiscard]] auto handshake(client &cli, std::uint32_t &min_version) const
-> bool;
[[nodiscard]] auto handshake(client &cli, boost::asio::io_context &ctx,
std::uint32_t &min_version) const -> bool;
void put_client(std::shared_ptr<client> &cli);
void read_data(client &cli, data_buffer &buffer) const;
[[nodiscard]] auto read_packet(client &cli, packet &response) const
-> packet::error_type;
[[nodiscard]] auto read_packet(client &cli, boost::asio::io_context &ctx,
packet &response) const -> packet::error_type;
void resolve();
void write_data(client &cli, const packet &request) const;
public:
[[nodiscard]] auto check_version(std::uint32_t client_version,
std::uint32_t &min_version) -> api_error;

View File

@@ -71,87 +71,4 @@ void apply_common_socket_properties(boost::asio::ip::tcp::socket &sock) {
sock.set_option(boost::asio::socket_base::linger(false, 0));
sock.set_option(boost::asio::socket_base::keep_alive(true));
}
void connect_with_deadline(
boost::asio::io_context &ctx, boost::asio::ip::tcp::socket &sock,
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type
&endpoints,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
run_with_deadline(
ctx, sock,
[&sock, &endpoints](auto &&handler) {
boost::asio::async_connect(
sock, endpoints, [handler](auto &&err, auto &&) { handler(err); });
},
deadline, "connect", std::string{function_name});
}
void read_exact_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
auto *base = static_cast<std::uint8_t *>(buf.data());
std::size_t total = buf.size();
std::size_t offset = 0U;
while (offset < total) {
std::size_t bytes_read = 0U;
run_with_deadline(
ctx, sock,
[&](auto &&handler) {
sock.async_read_some(
boost::asio::buffer(base + offset, total - offset),
[&bytes_read, handler](auto &&err, auto &&count) {
bytes_read = count;
handler(err);
});
},
deadline, "read", std::string{function_name});
if (bytes_read == 0U) {
throw std::runtime_error("0 bytes read");
}
offset += bytes_read;
}
}
void write_all_with_deadline(boost::asio::io_context &ctx,
boost::asio::ip::tcp::socket &sock,
boost::asio::mutable_buffer buf,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
auto *base = static_cast<std::uint8_t *>(buf.data());
std::size_t total = buf.size();
std::size_t offset = 0U;
while (offset < total) {
std::size_t bytes_written = 0U;
run_with_deadline(
ctx, sock,
[&](auto &&handler) {
sock.async_write_some(
boost::asio::buffer(base + offset, total - offset),
[&bytes_written, handler](auto &&err, auto &&count) {
bytes_written = count;
handler(err);
});
},
deadline, "write", std::string{function_name});
if (bytes_written == 0U) {
throw std::runtime_error("0 bytes written");
}
offset += bytes_written;
}
}
} // namespace repertory::comm

View File

@@ -18,12 +18,15 @@
#include "comm/packet/packet_client.hpp"
#include "comm/packet/common.hpp"
#include "events/event_system.hpp"
#include "events/types/packet_client_timeout.hpp"
#include "platform/platform.hpp"
#include "types/repertory.hpp"
#include "utils/collection.hpp"
#include "utils/common.hpp"
#include "utils/config.hpp"
#include "utils/error_utils.hpp"
#include "utils/timeout.hpp"
#include "utils/utils.hpp"
#include "version.hpp"
@@ -87,12 +90,11 @@ auto packet_client::check_version(std::uint32_t client_version,
cfg_.host_name_or_ip, std::to_string(cfg_.api_port));
client cli(ctx);
connect_with_deadline(ctx, cli.socket, resolve_results,
std::chrono::milliseconds(cfg_.conn_timeout_ms));
boost::asio::connect(cli.socket, resolve_results);
comm::apply_common_socket_properties(cli.socket);
if (not handshake(cli, ctx, min_version)) {
if (not handshake(cli, min_version)) {
return api_error::comm_error;
}
@@ -112,14 +114,12 @@ auto packet_client::connect(client &cli) -> bool {
try {
resolve();
auto cached = resolve_results_.load();
connect_with_deadline(io_context_, cli.socket, cached,
std::chrono::milliseconds(cfg_.conn_timeout_ms));
boost::asio::connect(cli.socket, resolve_results_.load());
comm::apply_common_socket_properties(cli.socket);
std::uint32_t min_version{};
if (not handshake(cli, io_context_, min_version)) {
if (not handshake(cli, min_version)) {
close(cli);
return false;
}
@@ -168,11 +168,6 @@ auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
auto packet_client::handshake(client &cli, std::uint32_t &min_version) const
-> bool {
return handshake(cli, io_context_, min_version);
}
auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
std::uint32_t &min_version) const -> bool {
REPERTORY_USES_FUNCTION_NAME();
try {
@@ -187,8 +182,7 @@ auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
tmp.to_buffer(buffer);
}
read_exact_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.recv_timeout_ms));
read_data(cli, buffer);
packet response(buffer);
auto res = response.decode(min_version);
@@ -207,10 +201,8 @@ auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
}
response.encrypt(cfg_.encryption_token, false);
response.to_buffer(buffer);
write_data(cli, response);
write_all_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.send_timeout_ms));
return true;
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "handshake failed");
@@ -230,39 +222,56 @@ void packet_client::put_client(std::shared_ptr<client> &cli) {
}
}
auto packet_client::read_packet(client &cli, packet &response) const
-> packet::error_type {
return read_packet(cli, io_context_, response);
void packet_client::read_data(client &cli, data_buffer &buffer) const {
REPERTORY_USES_FUNCTION_NAME();
timeout read_timeout(
[&cli]() {
event_system::instance().raise<packet_client_timeout>("response",
function_name);
packet_client::close(cli);
},
std::chrono::milliseconds(cfg_.recv_timeout_ms));
std::uint32_t offset{};
while (offset < buffer.size()) {
auto bytes_read = boost::asio::read(
cli.socket,
boost::asio::buffer(&buffer[offset], buffer.size() - offset));
if (bytes_read <= 0) {
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
}
offset += static_cast<std::uint32_t>(bytes_read);
}
}
auto packet_client::read_packet(client &cli, boost::asio::io_context &ctx,
packet &response) const -> packet::error_type {
auto packet_client::read_packet(client &cli, packet &response) const
-> packet::error_type {
data_buffer buffer(sizeof(std::uint32_t));
read_exact_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.recv_timeout_ms));
read_data(cli, buffer);
std::uint32_t to_read{};
std::memcpy(&to_read, buffer.data(), sizeof(to_read));
boost::endian::big_to_native_inplace(to_read);
std::uint32_t size{};
std::memcpy(&size, buffer.data(), buffer.size());
boost::endian::big_to_native_inplace(size);
if (to_read > comm::max_packet_bytes) {
throw std::runtime_error(fmt::format("packet too large|size|{}", to_read));
if (size > comm::max_packet_bytes) {
throw std::runtime_error(fmt::format("packet too large|size|{}", size));
}
if (to_read < utils::encryption::encryption_header_size) {
throw std::runtime_error(fmt::format("packet too small|size|{}", to_read));
if (size < utils::encryption::encryption_header_size) {
throw std::runtime_error(fmt::format("packet too small|size|{}", size));
}
buffer.resize(to_read);
buffer.resize(size);
read_data(cli, buffer);
read_exact_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.recv_timeout_ms));
response = std::move(buffer);
auto ret = response.decrypt(cfg_.encryption_token);
if (ret == 0) {
ret = response.decode(cli.nonce);
}
return ret;
}
@@ -326,11 +335,7 @@ auto packet_client::send(std::string_view method, packet &request,
request = current_request;
current_request.encrypt(cfg_.encryption_token);
write_all_with_deadline(
io_context_, current_client->socket,
boost::asio::buffer(&current_request[0], current_request.get_size()),
std::chrono::milliseconds(cfg_.send_timeout_ms));
write_data(*current_client, current_request);
ret = read_packet(*current_client, response);
if (ret == 0) {
@@ -359,4 +364,27 @@ auto packet_client::send(std::string_view method, packet &request,
return CONVERT_STATUS_NOT_IMPLEMENTED(ret);
}
void packet_client::write_data(client &cli, const packet &request) const {
REPERTORY_USES_FUNCTION_NAME();
timeout write_timeout(
[&cli]() {
event_system::instance().raise<packet_client_timeout>("request",
function_name);
packet_client::close(cli);
},
std::chrono::milliseconds(cfg_.send_timeout_ms));
std::uint32_t offset{};
while (offset < request.get_size()) {
auto bytes_written = boost::asio::write(
cli.socket,
boost::asio::buffer(&request[offset], request.get_size() - offset));
if (bytes_written <= 0) {
throw std::runtime_error("write failed|" + std::to_string(bytes_written));
}
offset += static_cast<std::uint32_t>(bytes_written);
}
}
} // namespace repertory

View File

@@ -23,310 +23,7 @@
#include "comm/packet/common.hpp"
namespace {
using namespace std::chrono_literals;
class packet_comm_common_test2 : public ::testing::Test {
protected:
void SetUp() override { repertory::event_system::instance().start(); }
void TearDown() override { repertory::event_system::instance().stop(); }
};
class loopback_server final {
public:
loopback_server(std::size_t send_bytes,
std::chrono::milliseconds delay_before_send, bool never_send)
: acceptor_{io_ctx_, {boost::asio::ip::tcp::v4(), 0}},
send_bytes_{send_bytes},
delay_before_send_{delay_before_send},
never_send_{never_send} {
acceptor_.set_option(boost::asio::socket_base::reuse_address(true));
start_accept();
io_thr_ = std::thread([this]() { io_ctx_.run(); });
}
~loopback_server() {
boost::system::error_code ign{};
[[maybe_unused]] auto res = acceptor_.close(ign);
io_ctx_.stop();
if (io_thr_.joinable()) {
io_thr_.join();
}
}
[[nodiscard]] auto port() const -> std::uint16_t {
return acceptor_.local_endpoint().port();
}
private:
auto start_accept() -> void {
auto sock_ptr = std::make_shared<boost::asio::ip::tcp::socket>(io_ctx_);
acceptor_.async_accept(*sock_ptr, [this, sock_ptr](auto &&err) {
if (err) {
return;
}
if (never_send_) {
return;
}
auto tmr_ptr = std::make_shared<boost::asio::steady_timer>(io_ctx_);
tmr_ptr->expires_after(delay_before_send_);
tmr_ptr->async_wait([this, sock_ptr, tmr_ptr](auto &&err2) {
if (err2) {
return;
}
auto buf_ptr =
std::make_shared<std::vector<std::uint8_t>>(send_bytes_, 0xAB);
boost::asio::async_write(
*sock_ptr, boost::asio::buffer(*buf_ptr),
[buf_ptr, sock_ptr](auto &&, auto &&) {
boost::system::error_code ign{};
[[maybe_unused]] auto res = sock_ptr->shutdown(
boost::asio::ip::tcp::socket::shutdown_both, ign);
[[maybe_unused]] auto res2 = sock_ptr->close(ign);
});
});
});
}
boost::asio::io_context io_ctx_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread io_thr_;
std::size_t send_bytes_;
std::chrono::milliseconds delay_before_send_;
bool never_send_;
};
auto connect_client(boost::asio::ip::tcp::socket &sock, std::uint16_t srv_port)
-> void {
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::address_v4::loopback(),
srv_port,
};
boost::system::error_code err;
[[maybe_unused]] auto res = sock.connect(endpoint, err);
ASSERT_FALSE(err) << "connect failed: " << err.message();
}
} // namespace
namespace repertory {
TEST_F(packet_comm_common_test2, run_with_deadline_op_finishes_first) {
constexpr std::size_t test_bytes{16U * 1024U};
const auto deadline = 750ms;
const auto server_delay = 50ms;
loopback_server server{test_bytes, server_delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_NO_THROW({
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "op_finishes_first");
});
for (auto val : recv_buf) {
ASSERT_EQ(val, 0xAB);
}
}
TEST_F(packet_comm_common_test2, run_with_deadline_timeout_fires_first) {
constexpr std::size_t test_bytes{8U * 1024U};
const auto deadline = 100ms;
const auto server_delay = 1500ms;
loopback_server server{test_bytes, server_delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_THROW(
{
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "timeout_fires_first");
},
std::runtime_error);
}
TEST_F(packet_comm_common_test2, run_with_deadline_threaded_boundary_stress) {
using namespace std::chrono_literals;
constexpr int thread_count{6};
constexpr int iter_count{16};
constexpr std::size_t test_bytes{4U * 1024U};
const auto deadline = 250ms;
std::atomic<int> seen_timeouts{0};
std::atomic<int> seen_success{0};
std::vector<std::thread> thr_pool;
thr_pool.reserve(thread_count);
for (int tid = 0; tid < thread_count; ++tid) {
thr_pool.emplace_back([tid, deadline, &seen_timeouts, &seen_success]() {
std::mt19937 rng(static_cast<std::mt19937::result_type>(
std::chrono::high_resolution_clock::now().time_since_epoch().count() +
tid));
std::uniform_int_distribution<int> tiny_jitter{-10, 10};
for (int iter = 0; iter < iter_count; ++iter) {
const bool make_timeout = (iter % 2) == 1;
auto delay = make_timeout
? (deadline + 200ms +
std::chrono::milliseconds{tiny_jitter(rng)})
: (deadline - 100ms +
std::chrono::milliseconds{tiny_jitter(rng)});
if (delay < 0ms) {
delay = 0ms;
}
loopback_server server{test_bytes, delay, false};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::address_v4::loopback(),
server.port(),
};
boost::system::error_code err{};
[[maybe_unused]] auto res = cli_sock.connect(endpoint, err);
if (err) {
ADD_FAILURE() << "connect failed: " << err.message();
continue;
}
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
try {
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err2, auto &&) { complete(err2); });
},
deadline, "read_exact", "threaded_boundary_stress");
seen_success.fetch_add(1);
} catch (const std::runtime_error &ex) {
std::string msg{ex.what()};
if (msg.find("timed-out") != std::string::npos) {
seen_timeouts.fetch_add(1);
} else {
ADD_FAILURE() << "unexpected runtime_error: " << msg;
}
}
}
});
}
for (auto &thr : thr_pool) {
thr.join();
}
EXPECT_GT(seen_timeouts.load(), 0);
EXPECT_GT(seen_success.load(), 0);
}
TEST_F(packet_comm_common_test2, run_with_deadline_server_never_sends) {
constexpr std::size_t test_bytes{2U * 1024U};
const auto deadline = 120ms;
loopback_server server{test_bytes, 0ms, true};
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket cli_sock{io_ctx};
connect_client(cli_sock, server.port());
std::vector<std::uint8_t> recv_buf(test_bytes, 0x00);
EXPECT_THROW(
{
repertory::comm::run_with_deadline(
io_ctx, cli_sock,
[&](auto complete) {
boost::asio::async_read(
cli_sock, boost::asio::buffer(recv_buf),
[complete](auto &&err, auto &&) { complete(err); });
},
deadline, "read_exact", "server_never_sends");
},
std::runtime_error);
}
TEST(packet_comm_common_test, operation_completes_prior_to_timeout) {
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket sock(io_ctx);
std::atomic<bool> completed{false};
auto operation = [&](auto &&handler) {
boost::asio::post(io_ctx, [&completed, handler]() {
completed = true;
handler(boost::system::error_code{});
});
};
EXPECT_NO_THROW(comm::run_with_deadline(io_ctx, sock, operation,
std::chrono::milliseconds(300),
"read", "packet_deadline_test"));
EXPECT_TRUE(completed);
io_ctx.poll();
}
TEST(packet_comm_common_test, timeout_completes_prior_to_operation) {
boost::asio::io_context io_ctx;
boost::asio::ip::tcp::socket sock(io_ctx);
std::atomic<bool> completed{false};
auto operation = [&](auto &&handler) {
auto delayed = std::make_shared<boost::asio::steady_timer>(io_ctx);
delayed->expires_after(std::chrono::milliseconds(500));
delayed->async_wait([&completed, delayed, handler](auto &&) {
completed = true;
handler(boost::system::error_code{});
});
};
EXPECT_THROW(comm::run_with_deadline(io_ctx, sock, operation,
std::chrono::milliseconds(300), "read",
"packet_deadline_test"),
std::runtime_error);
for (std::uint8_t idx = 0; idx < 80U && not completed; ++idx) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
io_ctx.poll();
}
EXPECT_TRUE(completed);
}
TEST(packet_comm_common_test, idle_socket_considered_alive) {
using boost::asio::ip::tcp;