From 75093c2b815c3da991ddee8f50ed74e10aff5b34 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 20 Sep 2025 20:03:56 -0500 Subject: [PATCH] fix intermintent hang on remote server disconnect --- .../include/comm/packet/packet_client.hpp | 14 +- repertory/librepertory/include/common.hpp | 2 +- .../events/types/packet_client_timeout.hpp | 14 +- .../librepertory/include/types/remote.hpp | 15 +- .../librepertory/include/types/repertory.hpp | 1 + repertory/librepertory/src/app_config.cpp | 10 + .../src/comm/packet/packet_client.cpp | 286 ++++++++++++++---- 7 files changed, 258 insertions(+), 84 deletions(-) diff --git a/repertory/librepertory/include/comm/packet/packet_client.hpp b/repertory/librepertory/include/comm/packet/packet_client.hpp index ff47896d..8099794e 100644 --- a/repertory/librepertory/include/comm/packet/packet_client.hpp +++ b/repertory/librepertory/include/comm/packet/packet_client.hpp @@ -47,7 +47,7 @@ public: auto operator=(packet_client &&) -> packet_client & = delete; private: - boost::asio::io_context io_context_; + mutable boost::asio::io_context io_context_; remote::remote_config cfg_; std::string unique_id_; @@ -69,21 +69,21 @@ private: void put_client(std::shared_ptr &cli); - [[nodiscard]] auto read_packet(client &cli, - packet &response) const -> packet::error_type; + [[nodiscard]] auto read_packet(client &cli, packet &response) const + -> packet::error_type; void resolve(); public: - [[nodiscard]] auto send(std::string_view method, - std::uint32_t &service_flags) -> packet::error_type; + [[nodiscard]] auto send(std::string_view method, std::uint32_t &service_flags) + -> packet::error_type; [[nodiscard]] auto send(std::string_view method, packet &request, std::uint32_t &service_flags) -> packet::error_type; [[nodiscard]] auto send(std::string_view method, packet &request, - packet &response, - std::uint32_t &service_flags) -> packet::error_type; + packet &response, std::uint32_t &service_flags) + -> packet::error_type; }; } // namespace repertory diff --git a/repertory/librepertory/include/common.hpp b/repertory/librepertory/include/common.hpp index 8c7549da..5991e7ac 100644 --- a/repertory/librepertory/include/common.hpp +++ b/repertory/librepertory/include/common.hpp @@ -58,7 +58,7 @@ inline constexpr std::string_view REPERTORY{"repertory"}; inline constexpr std::string_view REPERTORY_DATA_NAME{"repertory2"}; inline constexpr std::wstring_view REPERTORY_W{L"repertory"}; -inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{2ULL}; +inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{3ULL}; inline constexpr std::string_view REPERTORY_MIN_REMOTE_VERSION{"2.1.0"}; inline constexpr std::string_view RENTERD_MIN_VERSION{"2.0.0"}; diff --git a/repertory/librepertory/include/events/types/packet_client_timeout.hpp b/repertory/librepertory/include/events/types/packet_client_timeout.hpp index 1c36b242..ff58753b 100644 --- a/repertory/librepertory/include/events/types/packet_client_timeout.hpp +++ b/repertory/librepertory/include/events/types/packet_client_timeout.hpp @@ -28,11 +28,10 @@ namespace repertory { struct packet_client_timeout final : public i_event { packet_client_timeout() = default; - packet_client_timeout(std::string event_name_, - std::string_view function_name_, std::string msg_) - : event_name(std::move(event_name_)), - function_name(std::string(function_name_)), - msg(std::move(msg_)) {} + packet_client_timeout(std::string_view event_name_, + std::string_view function_name_) + : event_name(std::string(event_name_)), + function_name(std::string(function_name_)) {} static constexpr event_level level{event_level::warn}; static constexpr std::string_view name{"packet_client_timeout"}; @@ -50,8 +49,7 @@ struct packet_client_timeout final : public i_event { } [[nodiscard]] auto get_single_line() const -> std::string override { - return fmt::format("{}|func|{}|event|{}|msg|{}", name, function_name, - event_name, msg); + return fmt::format("{}|func|{}|event|{}", name, function_name, event_name); } }; } // namespace repertory @@ -62,14 +60,12 @@ template <> struct adl_serializer { const repertory::packet_client_timeout &value) { data["event_name"] = value.event_name; data["function_name"] = value.function_name; - data["msg"] = value.msg; } static void from_json(const json &data, repertory::packet_client_timeout &value) { data.at("event_name").get_to(value.event_name); data.at("function_name").get_to(value.function_name); - data.at("msg").get_to(value.msg); } }; NLOHMANN_JSON_NAMESPACE_END diff --git a/repertory/librepertory/include/types/remote.hpp b/repertory/librepertory/include/types/remote.hpp index 0bc92b40..9cbd3fd8 100644 --- a/repertory/librepertory/include/types/remote.hpp +++ b/repertory/librepertory/include/types/remote.hpp @@ -33,24 +33,27 @@ inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_WINFSP}; inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_FUSE}; #endif // defined(_WIN32) -inline constexpr auto default_remote_directory_page_size{std::size_t(100U)}; inline constexpr auto default_remote_client_pool_size{20U}; +inline constexpr auto default_remote_conn_timeout_ms{500U}; +inline constexpr auto default_remote_directory_page_size{std::size_t(100U)}; inline constexpr auto default_remote_max_connections{20U}; -inline constexpr auto default_remote_receive_timeout_ms{120U * 1000U}; -inline constexpr auto default_remote_send_timeout_ms{30U * 1000U}; +inline constexpr auto default_remote_recv_timeout_ms{500U}; +inline constexpr auto default_remote_send_timeout_ms{250U}; namespace repertory::remote { struct remote_config final { std::uint16_t api_port{}; + std::uint32_t conn_timeout_ms{default_remote_conn_timeout_ms}; std::string encryption_token; std::string host_name_or_ip; std::uint8_t max_connections{default_remote_max_connections}; - std::uint32_t recv_timeout_ms{default_remote_receive_timeout_ms}; + std::uint32_t recv_timeout_ms{default_remote_recv_timeout_ms}; std::uint32_t send_timeout_ms{default_remote_send_timeout_ms}; auto operator==(const remote_config &cfg) const noexcept -> bool { if (&cfg != this) { return api_port == cfg.api_port && + conn_timeout_ms == cfg.conn_timeout_ms && encryption_token == cfg.encryption_token && host_name_or_ip == cfg.host_name_or_ip && max_connections == cfg.max_connections && @@ -228,6 +231,7 @@ template <> struct adl_serializer { static void to_json(json &data, const repertory::remote::remote_config &value) { data[repertory::JSON_API_PORT] = value.api_port; + data[repertory::JSON_CONNECT_TIMEOUT_MS] = value.api_port; data[repertory::JSON_ENCRYPTION_TOKEN] = value.encryption_token; data[repertory::JSON_HOST_NAME_OR_IP] = value.host_name_or_ip; data[repertory::JSON_MAX_CONNECTIONS] = value.max_connections; @@ -238,6 +242,9 @@ template <> struct adl_serializer { static void from_json(const json &data, repertory::remote::remote_config &value) { data.at(repertory::JSON_API_PORT).get_to(value.api_port); + if (data.contains(repertory::JSON_CONNECT_TIMEOUT_MS)) { + data.at(repertory::JSON_CONNECT_TIMEOUT_MS).get_to(value.conn_timeout_ms); + } data.at(repertory::JSON_ENCRYPTION_TOKEN).get_to(value.encryption_token); data.at(repertory::JSON_HOST_NAME_OR_IP).get_to(value.host_name_or_ip); data.at(repertory::JSON_MAX_CONNECTIONS).get_to(value.max_connections); diff --git a/repertory/librepertory/include/types/repertory.hpp b/repertory/librepertory/include/types/repertory.hpp index d2f37890..26cf8d54 100644 --- a/repertory/librepertory/include/types/repertory.hpp +++ b/repertory/librepertory/include/types/repertory.hpp @@ -404,6 +404,7 @@ inline constexpr auto JSON_API_USER{"ApiUser"}; inline constexpr auto JSON_AUTO_START{"AutoStart"}; inline constexpr auto JSON_BUCKET{"Bucket"}; inline constexpr auto JSON_CLIENT_POOL_SIZE{"ClientPoolSize"}; +inline constexpr auto JSON_CONNECT_TIMEOUT_MS{"ConnectTimeoutMs"}; inline constexpr auto JSON_DATABASE_TYPE{"DatabaseType"}; inline constexpr auto JSON_DIRECTORY{"Directory"}; inline constexpr auto JSON_DOWNLOAD_TIMEOUT_SECS{"DownloadTimeoutSeconds"}; diff --git a/repertory/librepertory/src/app_config.cpp b/repertory/librepertory/src/app_config.cpp index 01a7e1da..04e4de2e 100644 --- a/repertory/librepertory/src/app_config.cpp +++ b/repertory/librepertory/src/app_config.cpp @@ -1097,6 +1097,16 @@ auto app_config::load() -> bool { } } + if (version_ == 3U) { + if (json_document.contains(JSON_REMOTE_CONFIG)) { + auto cfg = get_remote_config(); + cfg.conn_timeout_ms = default_remote_conn_timeout_ms; + cfg.recv_timeout_ms = default_remote_recv_timeout_ms; + cfg.send_timeout_ms = default_remote_send_timeout_ms; + set_remote_config(cfg); + } + } + found = false; } diff --git a/repertory/librepertory/src/comm/packet/packet_client.cpp b/repertory/librepertory/src/comm/packet/packet_client.cpp index b71ba98e..bb24cc31 100644 --- a/repertory/librepertory/src/comm/packet/packet_client.cpp +++ b/repertory/librepertory/src/comm/packet/packet_client.cpp @@ -28,9 +28,196 @@ #include "utils/collection.hpp" #include "utils/common.hpp" #include "utils/error_utils.hpp" -#include "utils/timeout.hpp" #include "version.hpp" +namespace { +namespace net = boost::asio; + +constexpr std::uint8_t max_attempts{3U}; + +struct non_blocking_guard final { + net::ip::tcp::socket &sock; + bool non_blocking{}; + + non_blocking_guard(const non_blocking_guard &) = delete; + non_blocking_guard(non_blocking_guard &&) = delete; + + auto operator=(const non_blocking_guard &) -> non_blocking_guard & = delete; + auto operator=(non_blocking_guard &&) -> non_blocking_guard & = delete; + + explicit non_blocking_guard(net::ip::tcp::socket &sock_) + : sock(sock_), non_blocking(sock_.non_blocking()) { + boost::system::error_code err; + [[maybe_unused]] auto ret = sock_.non_blocking(true, err); + } + + ~non_blocking_guard() { + if (not sock.is_open()) { + return; + } + + boost::system::error_code err; + [[maybe_unused]] auto ret = sock.non_blocking(non_blocking, err); + } +}; + +[[nodiscard]] auto is_socket_still_alive(boost::asio::ip::tcp::socket &sock) + -> bool { + if (not sock.is_open()) { + return false; + } + + non_blocking_guard guard{sock}; + + boost::system::error_code err{}; + std::array tmp{}; + auto available = sock.receive(boost::asio::buffer(tmp), + boost::asio::socket_base::message_peek, err); + + if (err == boost::asio::error::would_block || + err == boost::asio::error::try_again) { + return true; + } + + if (err == boost::asio::error::eof || + err == boost::asio::error::connection_reset || + err == boost::asio::error::operation_aborted || + err == boost::asio::error::not_connected || + err == boost::asio::error::bad_descriptor || + err == boost::asio::error::network_down) { + return false; + } + + if (not err && available == 0) { + return false; + } + + if (not err && available > 0) { + return true; + } + + return false; +} + +template +void run_with_deadline(net::io_context &io_ctx, op_t &&operation, + cancel_t &&cancel_op, std::chrono::milliseconds deadline, + std::string_view timeout_event_tag, + std::string_view op_name, + std::string_view function_name) { + deadline = std::max(deadline / max_attempts, 250ms); + + boost::system::error_code err{}; + bool done = false; + bool timed_out = false; + net::steady_timer timer{io_ctx}; + timer.expires_after(deadline); + timer.async_wait([&](const boost::system::error_code &err_) { + if (not err_) { + timed_out = true; + std::forward(cancel_op)(); + } + }); + + std::forward(operation)([&](const boost::system::error_code &err_) { + err = err_; + done = true; + }); + + io_ctx.restart(); + while (not done && not timed_out) { + io_ctx.run_one(); + } + timer.cancel(); + + if (timed_out) { + repertory::event_system::instance().raise( + timeout_event_tag, function_name); + throw std::runtime_error(std::string(op_name) + " timed-out"); + } + + if (err) { + throw std::runtime_error(std::string(op_name) + " failed|" + err.message()); + } +} + +void connect_with_deadline(net::io_context &io_ctx, + boost::asio::ip::tcp::socket &sock, + const auto &endpoints, + std::chrono::milliseconds deadline) { + REPERTORY_USES_FUNCTION_NAME(); + + run_with_deadline( + io_ctx, + [&](auto &&handler) { + net::async_connect(sock, endpoints, + [handler = std::forward(handler)]( + const boost::system::error_code &err, + const auto &) { handler(err); }); + }, + [&]() { sock.cancel(); }, deadline, "connect", "connect", function_name); +} + +void read_exact_with_deadline(net::io_context &io_ctx, + boost::asio::ip::tcp::socket &sock, auto buf, + std::chrono::milliseconds deadline) { + REPERTORY_USES_FUNCTION_NAME(); + + auto *base = static_cast(const_cast(buf.data())); + std::size_t total = buf.size(); + std::size_t offset = 0U; + + while (offset < total) { + std::size_t bytes_read = 0U; + + run_with_deadline( + io_ctx, + [&](auto &&handler) { + sock.async_read_some( + net::buffer(base + offset, total - offset), + [&, handler = std::forward(handler)]( + const boost::system::error_code &err, std::size_t count) { + bytes_read = count; + handler(err); + }); + }, + [&]() { sock.cancel(); }, deadline, "response", "read", function_name); + + offset += bytes_read; + } +} + +void write_all_with_deadline(net::io_context &io_ctx, + boost::asio::ip::tcp::socket &sock, auto buf, + std::chrono::milliseconds deadline) { + REPERTORY_USES_FUNCTION_NAME(); + + auto *base = static_cast(const_cast(buf.data())); + std::size_t total = buf.size(); + std::size_t offset = 0U; + + while (offset < total) { + std::size_t bytes_written = 0U; + + run_with_deadline( + io_ctx, + // start one chunk write + [&](auto &&handler) { + sock.async_write_some( + net::buffer(base + offset, total - offset), + [&, handler = std::forward(handler)]( + const boost::system::error_code &err, std::size_t count) { + bytes_written = count; + handler(err); + }); + }, + [&]() { sock.cancel(); }, deadline, "request", "write", function_name); + + offset += bytes_written; + } +} +} // namespace + namespace repertory { packet_client::packet_client(remote::remote_config cfg) : cfg_(std::move(cfg)), unique_id_(utils::create_uuid_string()) {} @@ -64,11 +251,12 @@ void packet_client::close_all() { } void packet_client::connect(client &cli) { - REPERTORY_USES_FUNCTION_NAME(); - try { resolve(); - boost::asio::connect(cli.socket, resolve_results_); + + connect_with_deadline(io_context_, cli.socket, resolve_results_, + std::chrono::milliseconds(cfg_.send_timeout_ms)); + cli.socket.set_option(boost::asio::ip::tcp::no_delay(true)); cli.socket.set_option(boost::asio::socket_base::linger(false, 0)); cli.socket.set_option(boost::asio::socket_base::keep_alive(true)); @@ -78,33 +266,42 @@ void packet_client::connect(client &cli) { if (res != 0) { throw std::runtime_error(std::to_string(res)); } - } catch (const std::exception &e) { - utils::error::raise_error(function_name, e, "connection handshake failed"); + } catch (...) { + close(cli); resolve_results_ = {}; + throw; } } auto packet_client::get_client() -> std::shared_ptr { + REPERTORY_USES_FUNCTION_NAME(); + unique_mutex_lock clients_lock(clients_mutex_); if (not allow_connections_) { return nullptr; } - if (clients_.empty()) { - clients_lock.unlock(); + try { + if (clients_.empty()) { + clients_lock.unlock(); - auto cli = std::make_shared(io_context_); - connect(*cli); + auto cli = std::make_shared(io_context_); + connect(*cli); + return cli; + } + + auto cli = clients_.at(0U); + utils::collection::remove_element(clients_, cli); return cli; + } catch (const std::exception &e) { + utils::error::raise_error(function_name, e, "connection handshake failed"); } - auto cli = clients_.at(0U); - utils::collection::remove_element(clients_, cli); - return cli; + return nullptr; } void packet_client::put_client(std::shared_ptr &cli) { - if (not cli->socket.is_open()) { + if (not cli || not is_socket_still_alive(cli->socket)) { return; } @@ -117,32 +314,21 @@ void packet_client::put_client(std::shared_ptr &cli) { auto packet_client::read_packet(client &cli, packet &response) const -> packet::error_type { data_buffer buffer(sizeof(std::uint32_t)); - const auto read_buffer = [&]() { - std::uint32_t offset{}; - while (offset < buffer.size()) { - auto bytes_read = boost::asio::read( - cli.socket, - boost::asio::buffer(&buffer.at(offset), buffer.size() - offset)); - if (bytes_read <= 0) { - throw std::runtime_error("read failed|" + std::to_string(bytes_read)); - } - offset += static_cast(bytes_read); - } - }; - read_buffer(); + read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer), + std::chrono::milliseconds(cfg_.recv_timeout_ms)); auto size = boost::endian::big_to_native( *reinterpret_cast(buffer.data())); buffer.resize(size); - read_buffer(); + read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer), + std::chrono::milliseconds(cfg_.recv_timeout_ms)); response = std::move(buffer); auto ret = response.decrypt(cfg_.encryption_token); if (ret == 0) { ret = response.decode(cli.nonce); } - return ret; } @@ -181,47 +367,20 @@ auto packet_client::send(std::string_view method, packet &request, request.encode_top(PACKET_SERVICE_FLAGS); request.encode_top(std::string{project_get_version()}); - static constexpr std::uint8_t max_attempts{5U}; - for (std::uint8_t i = 1U; - allow_connections_ && not success && (i <= max_attempts); i++) { + for (std::uint8_t retry = 1U; + allow_connections_ && not success && (retry <= max_attempts); ++retry) { auto current_client = get_client(); if (current_client) { try { request.encode_top(current_client->nonce); request.encrypt(cfg_.encryption_token); - timeout request_timeout( - [method, current_client]() { - event_system::instance().raise( - "request", function_name, std::string{method}); - packet_client::close(*current_client); - }, + write_all_with_deadline( + io_context_, current_client->socket, + boost::asio::buffer(&request[0], request.get_size()), std::chrono::milliseconds(cfg_.send_timeout_ms)); - std::uint32_t offset{}; - while (offset < request.get_size()) { - auto bytes_written = boost::asio::write( - current_client->socket, - boost::asio::buffer(&request[offset], - request.get_size() - offset)); - if (bytes_written <= 0) { - throw std::runtime_error("write failed|" + - std::to_string(bytes_written)); - } - offset += static_cast(bytes_written); - } - request_timeout.disable(); - - timeout response_timeout( - [method, current_client]() { - event_system::instance().raise( - "response", function_name, std::string{method}); - packet_client::close(*current_client); - }, - std::chrono::milliseconds(cfg_.recv_timeout_ms)); - ret = read_packet(*current_client, response); - response_timeout.disable(); if (ret == 0) { ret = response.decode(service_flags); if (ret == 0) { @@ -236,8 +395,9 @@ auto packet_client::send(std::string_view method, packet &request, } } catch (const std::exception &e) { utils::error::raise_error(function_name, e, "send failed"); - close_all(); - if (allow_connections_ && (i < max_attempts)) { + + close(*current_client); + if (allow_connections_ && (retry < max_attempts)) { std::this_thread::sleep_for(1s); } }