refactor packet client

This commit is contained in:
2025-10-04 10:00:30 -05:00
parent c338d35c4f
commit 02ec672a6d
2 changed files with 41 additions and 25 deletions

View File

@@ -24,8 +24,9 @@
namespace repertory::comm { namespace repertory::comm {
inline static constexpr std::uint32_t max_packet_bytes{32U * 1024U * 1024U}; inline static constexpr std::uint32_t max_packet_bytes{32U * 1024U * 1024U};
inline constexpr const std::uint8_t max_read_attempts{5U}; inline constexpr const std::uint8_t max_read_attempts{2U};
inline constexpr const std::uint16_t packet_nonce_size{256U}; inline constexpr const std::uint16_t packet_nonce_size{256U};
inline constexpr const std::size_t read_write_size{131072U};
inline constexpr const std::uint16_t server_handshake_timeout_ms{3000U}; inline constexpr const std::uint16_t server_handshake_timeout_ms{3000U};
struct non_blocking_guard final { struct non_blocking_guard final {

View File

@@ -45,8 +45,6 @@ packet_client::packet_client(remote::remote_config cfg)
} }
packet_client::~packet_client() { packet_client::~packet_client() {
REPERTORY_USES_FUNCTION_NAME();
allow_connections_ = false; allow_connections_ = false;
try { try {
@@ -54,22 +52,31 @@ packet_client::~packet_client() {
} catch (...) { } catch (...) {
} }
io_context_.stop(); try {
io_context_.stop();
} catch (...) {
}
for (auto &thread : service_threads_) { for (auto &thread : service_threads_) {
if (thread.joinable()) { try {
thread.join(); if (thread.joinable()) {
thread.join();
}
} catch (...) {
} }
} }
} }
void packet_client::close(client &cli) noexcept { void packet_client::close(client &cli) noexcept {
boost::system::error_code err; boost::system::error_code err;
[[maybe_unused]] auto res = [[maybe_unused]] auto res = cli.socket.cancel(err);
cli.socket.shutdown(boost::asio::socket_base::shutdown_both, err);
boost::system::error_code err2; boost::system::error_code err2;
[[maybe_unused]] auto res2 = cli.socket.close(err2); [[maybe_unused]] auto res2 =
cli.socket.shutdown(boost::asio::socket_base::shutdown_both, err2);
boost::system::error_code err3;
[[maybe_unused]] auto res3 = cli.socket.close(err3);
} }
void packet_client::close_all() { void packet_client::close_all() {
@@ -93,7 +100,7 @@ auto packet_client::check_version(std::uint32_t client_version,
boost::asio::io_context ctx{}; boost::asio::io_context ctx{};
client cli(ctx); client cli(ctx);
utils::timeout connect_timeout( utils::timeout timeout(
[&cli]() { [&cli]() {
event_system::instance().raise<packet_client_timeout>("connect", event_system::instance().raise<packet_client_timeout>("connect",
function_name); function_name);
@@ -104,7 +111,7 @@ auto packet_client::check_version(std::uint32_t client_version,
boost::asio::connect( boost::asio::connect(
cli.socket, tcp::resolver(ctx).resolve(cfg_.host_name_or_ip, cli.socket, tcp::resolver(ctx).resolve(cfg_.host_name_or_ip,
std::to_string(cfg_.api_port))); std::to_string(cfg_.api_port)));
connect_timeout.disable(); timeout.disable();
if (not is_socket_still_alive(cli.socket)) { if (not is_socket_still_alive(cli.socket)) {
throw std::runtime_error("failed to connect"); throw std::runtime_error("failed to connect");
} }
@@ -131,7 +138,7 @@ auto packet_client::connect(client &cli) -> bool {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
try { try {
utils::timeout connect_timeout( utils::timeout timeout(
[&cli]() { [&cli]() {
event_system::instance().raise<packet_client_timeout>("connect", event_system::instance().raise<packet_client_timeout>("connect",
function_name); function_name);
@@ -142,7 +149,7 @@ auto packet_client::connect(client &cli) -> bool {
resolve(); resolve();
boost::asio::connect(cli.socket, resolve_results_.load()); boost::asio::connect(cli.socket, resolve_results_.load());
connect_timeout.disable(); timeout.disable();
if (not is_socket_still_alive(cli.socket)) { if (not is_socket_still_alive(cli.socket)) {
throw std::runtime_error("failed to connect"); throw std::runtime_error("failed to connect");
@@ -182,15 +189,22 @@ auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
clients_lock.unlock(); clients_lock.unlock();
auto cli = std::make_shared<client>(io_context_); auto cli = std::make_shared<client>(io_context_);
if (not connect(*cli)) { if (connect(*cli)) {
return nullptr; return cli;
} }
return cli;
return nullptr;
} }
auto cli = clients_.at(0U); auto cli = clients_.at(0U);
utils::collection::remove_element(clients_, cli); utils::collection::remove_element(clients_, cli);
return cli;
if (is_socket_still_alive(cli->socket)) {
return cli;
}
clients_lock.unlock();
return get_client();
} catch (const std::exception &e) { } catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "get_client failed"); utils::error::raise_error(function_name, e, "get_client failed");
} }
@@ -257,7 +271,7 @@ void packet_client::put_client(std::shared_ptr<client> &cli) {
void packet_client::read_data(client &cli, data_buffer &buffer) const { void packet_client::read_data(client &cli, data_buffer &buffer) const {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
utils::timeout read_timeout( utils::timeout timeout(
[&cli]() { [&cli]() {
event_system::instance().raise<packet_client_timeout>("response", event_system::instance().raise<packet_client_timeout>("response",
function_name); function_name);
@@ -267,14 +281,14 @@ void packet_client::read_data(client &cli, data_buffer &buffer) const {
std::uint32_t offset{}; std::uint32_t offset{};
while (offset < buffer.size()) { while (offset < buffer.size()) {
auto to_read = std::min(read_write_size, buffer.size() - offset);
auto bytes_read = boost::asio::read( auto bytes_read = boost::asio::read(
cli.socket, cli.socket, boost::asio::buffer(&buffer[offset], to_read));
boost::asio::buffer(&buffer[offset], buffer.size() - offset));
if (bytes_read <= 0) { if (bytes_read <= 0) {
throw std::runtime_error("read failed|" + std::to_string(bytes_read)); throw std::runtime_error("read failed|" + std::to_string(bytes_read));
} }
offset += static_cast<std::uint32_t>(bytes_read); offset += static_cast<std::uint32_t>(bytes_read);
read_timeout.reset(); timeout.reset();
} }
} }
@@ -401,7 +415,7 @@ auto packet_client::send(std::string_view method, packet &request,
void packet_client::write_data(client &cli, const packet &request) const { void packet_client::write_data(client &cli, const packet &request) const {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
utils::timeout write_timeout( utils::timeout timeout(
[&cli]() { [&cli]() {
event_system::instance().raise<packet_client_timeout>("request", event_system::instance().raise<packet_client_timeout>("request",
function_name); function_name);
@@ -411,14 +425,15 @@ void packet_client::write_data(client &cli, const packet &request) const {
std::uint32_t offset{}; std::uint32_t offset{};
while (offset < request.get_size()) { while (offset < request.get_size()) {
auto to_write =
std::min(read_write_size, std::size_t{request.get_size() - offset});
auto bytes_written = boost::asio::write( auto bytes_written = boost::asio::write(
cli.socket, cli.socket, boost::asio::buffer(&request[offset], to_write));
boost::asio::buffer(&request[offset], request.get_size() - offset));
if (bytes_written <= 0) { if (bytes_written <= 0) {
throw std::runtime_error("write failed|" + std::to_string(bytes_written)); throw std::runtime_error("write failed|" + std::to_string(bytes_written));
} }
offset += static_cast<std::uint32_t>(bytes_written); offset += static_cast<std::uint32_t>(bytes_written);
write_timeout.reset(); timeout.reset();
} }
} }
} // namespace repertory } // namespace repertory