fix intermintent hang on remote server disconnect
All checks were successful
Blockstorage/repertory/pipeline/head This commit looks good
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
2025-09-20 20:03:56 -05:00
parent d4aba93051
commit 75093c2b81
7 changed files with 258 additions and 84 deletions

View File

@@ -47,7 +47,7 @@ public:
auto operator=(packet_client &&) -> packet_client & = delete; auto operator=(packet_client &&) -> packet_client & = delete;
private: private:
boost::asio::io_context io_context_; mutable boost::asio::io_context io_context_;
remote::remote_config cfg_; remote::remote_config cfg_;
std::string unique_id_; std::string unique_id_;
@@ -69,21 +69,21 @@ private:
void put_client(std::shared_ptr<client> &cli); void put_client(std::shared_ptr<client> &cli);
[[nodiscard]] auto read_packet(client &cli, [[nodiscard]] auto read_packet(client &cli, packet &response) const
packet &response) const -> packet::error_type; -> packet::error_type;
void resolve(); void resolve();
public: public:
[[nodiscard]] auto send(std::string_view method, [[nodiscard]] auto send(std::string_view method, std::uint32_t &service_flags)
std::uint32_t &service_flags) -> packet::error_type; -> packet::error_type;
[[nodiscard]] auto send(std::string_view method, packet &request, [[nodiscard]] auto send(std::string_view method, packet &request,
std::uint32_t &service_flags) -> packet::error_type; std::uint32_t &service_flags) -> packet::error_type;
[[nodiscard]] auto send(std::string_view method, packet &request, [[nodiscard]] auto send(std::string_view method, packet &request,
packet &response, packet &response, std::uint32_t &service_flags)
std::uint32_t &service_flags) -> packet::error_type; -> packet::error_type;
}; };
} // namespace repertory } // namespace repertory

View File

@@ -58,7 +58,7 @@ inline constexpr std::string_view REPERTORY{"repertory"};
inline constexpr std::string_view REPERTORY_DATA_NAME{"repertory2"}; inline constexpr std::string_view REPERTORY_DATA_NAME{"repertory2"};
inline constexpr std::wstring_view REPERTORY_W{L"repertory"}; inline constexpr std::wstring_view REPERTORY_W{L"repertory"};
inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{2ULL}; inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{3ULL};
inline constexpr std::string_view REPERTORY_MIN_REMOTE_VERSION{"2.1.0"}; inline constexpr std::string_view REPERTORY_MIN_REMOTE_VERSION{"2.1.0"};
inline constexpr std::string_view RENTERD_MIN_VERSION{"2.0.0"}; inline constexpr std::string_view RENTERD_MIN_VERSION{"2.0.0"};

View File

@@ -28,11 +28,10 @@
namespace repertory { namespace repertory {
struct packet_client_timeout final : public i_event { struct packet_client_timeout final : public i_event {
packet_client_timeout() = default; packet_client_timeout() = default;
packet_client_timeout(std::string event_name_, packet_client_timeout(std::string_view event_name_,
std::string_view function_name_, std::string msg_) std::string_view function_name_)
: event_name(std::move(event_name_)), : event_name(std::string(event_name_)),
function_name(std::string(function_name_)), function_name(std::string(function_name_)) {}
msg(std::move(msg_)) {}
static constexpr event_level level{event_level::warn}; static constexpr event_level level{event_level::warn};
static constexpr std::string_view name{"packet_client_timeout"}; static constexpr std::string_view name{"packet_client_timeout"};
@@ -50,8 +49,7 @@ struct packet_client_timeout final : public i_event {
} }
[[nodiscard]] auto get_single_line() const -> std::string override { [[nodiscard]] auto get_single_line() const -> std::string override {
return fmt::format("{}|func|{}|event|{}|msg|{}", name, function_name, return fmt::format("{}|func|{}|event|{}", name, function_name, event_name);
event_name, msg);
} }
}; };
} // namespace repertory } // namespace repertory
@@ -62,14 +60,12 @@ template <> struct adl_serializer<repertory::packet_client_timeout> {
const repertory::packet_client_timeout &value) { const repertory::packet_client_timeout &value) {
data["event_name"] = value.event_name; data["event_name"] = value.event_name;
data["function_name"] = value.function_name; data["function_name"] = value.function_name;
data["msg"] = value.msg;
} }
static void from_json(const json &data, static void from_json(const json &data,
repertory::packet_client_timeout &value) { repertory::packet_client_timeout &value) {
data.at("event_name").get_to<std::string>(value.event_name); data.at("event_name").get_to<std::string>(value.event_name);
data.at("function_name").get_to<std::string>(value.function_name); data.at("function_name").get_to<std::string>(value.function_name);
data.at("msg").get_to<std::string>(value.msg);
} }
}; };
NLOHMANN_JSON_NAMESPACE_END NLOHMANN_JSON_NAMESPACE_END

View File

@@ -33,24 +33,27 @@ inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_WINFSP};
inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_FUSE}; inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_FUSE};
#endif // defined(_WIN32) #endif // defined(_WIN32)
inline constexpr auto default_remote_directory_page_size{std::size_t(100U)};
inline constexpr auto default_remote_client_pool_size{20U}; inline constexpr auto default_remote_client_pool_size{20U};
inline constexpr auto default_remote_conn_timeout_ms{500U};
inline constexpr auto default_remote_directory_page_size{std::size_t(100U)};
inline constexpr auto default_remote_max_connections{20U}; inline constexpr auto default_remote_max_connections{20U};
inline constexpr auto default_remote_receive_timeout_ms{120U * 1000U}; inline constexpr auto default_remote_recv_timeout_ms{500U};
inline constexpr auto default_remote_send_timeout_ms{30U * 1000U}; inline constexpr auto default_remote_send_timeout_ms{250U};
namespace repertory::remote { namespace repertory::remote {
struct remote_config final { struct remote_config final {
std::uint16_t api_port{}; std::uint16_t api_port{};
std::uint32_t conn_timeout_ms{default_remote_conn_timeout_ms};
std::string encryption_token; std::string encryption_token;
std::string host_name_or_ip; std::string host_name_or_ip;
std::uint8_t max_connections{default_remote_max_connections}; std::uint8_t max_connections{default_remote_max_connections};
std::uint32_t recv_timeout_ms{default_remote_receive_timeout_ms}; std::uint32_t recv_timeout_ms{default_remote_recv_timeout_ms};
std::uint32_t send_timeout_ms{default_remote_send_timeout_ms}; std::uint32_t send_timeout_ms{default_remote_send_timeout_ms};
auto operator==(const remote_config &cfg) const noexcept -> bool { auto operator==(const remote_config &cfg) const noexcept -> bool {
if (&cfg != this) { if (&cfg != this) {
return api_port == cfg.api_port && return api_port == cfg.api_port &&
conn_timeout_ms == cfg.conn_timeout_ms &&
encryption_token == cfg.encryption_token && encryption_token == cfg.encryption_token &&
host_name_or_ip == cfg.host_name_or_ip && host_name_or_ip == cfg.host_name_or_ip &&
max_connections == cfg.max_connections && max_connections == cfg.max_connections &&
@@ -228,6 +231,7 @@ template <> struct adl_serializer<repertory::remote::remote_config> {
static void to_json(json &data, static void to_json(json &data,
const repertory::remote::remote_config &value) { const repertory::remote::remote_config &value) {
data[repertory::JSON_API_PORT] = value.api_port; data[repertory::JSON_API_PORT] = value.api_port;
data[repertory::JSON_CONNECT_TIMEOUT_MS] = value.api_port;
data[repertory::JSON_ENCRYPTION_TOKEN] = value.encryption_token; data[repertory::JSON_ENCRYPTION_TOKEN] = value.encryption_token;
data[repertory::JSON_HOST_NAME_OR_IP] = value.host_name_or_ip; data[repertory::JSON_HOST_NAME_OR_IP] = value.host_name_or_ip;
data[repertory::JSON_MAX_CONNECTIONS] = value.max_connections; data[repertory::JSON_MAX_CONNECTIONS] = value.max_connections;
@@ -238,6 +242,9 @@ template <> struct adl_serializer<repertory::remote::remote_config> {
static void from_json(const json &data, static void from_json(const json &data,
repertory::remote::remote_config &value) { repertory::remote::remote_config &value) {
data.at(repertory::JSON_API_PORT).get_to(value.api_port); data.at(repertory::JSON_API_PORT).get_to(value.api_port);
if (data.contains(repertory::JSON_CONNECT_TIMEOUT_MS)) {
data.at(repertory::JSON_CONNECT_TIMEOUT_MS).get_to(value.conn_timeout_ms);
}
data.at(repertory::JSON_ENCRYPTION_TOKEN).get_to(value.encryption_token); data.at(repertory::JSON_ENCRYPTION_TOKEN).get_to(value.encryption_token);
data.at(repertory::JSON_HOST_NAME_OR_IP).get_to(value.host_name_or_ip); data.at(repertory::JSON_HOST_NAME_OR_IP).get_to(value.host_name_or_ip);
data.at(repertory::JSON_MAX_CONNECTIONS).get_to(value.max_connections); data.at(repertory::JSON_MAX_CONNECTIONS).get_to(value.max_connections);

View File

@@ -404,6 +404,7 @@ inline constexpr auto JSON_API_USER{"ApiUser"};
inline constexpr auto JSON_AUTO_START{"AutoStart"}; inline constexpr auto JSON_AUTO_START{"AutoStart"};
inline constexpr auto JSON_BUCKET{"Bucket"}; inline constexpr auto JSON_BUCKET{"Bucket"};
inline constexpr auto JSON_CLIENT_POOL_SIZE{"ClientPoolSize"}; inline constexpr auto JSON_CLIENT_POOL_SIZE{"ClientPoolSize"};
inline constexpr auto JSON_CONNECT_TIMEOUT_MS{"ConnectTimeoutMs"};
inline constexpr auto JSON_DATABASE_TYPE{"DatabaseType"}; inline constexpr auto JSON_DATABASE_TYPE{"DatabaseType"};
inline constexpr auto JSON_DIRECTORY{"Directory"}; inline constexpr auto JSON_DIRECTORY{"Directory"};
inline constexpr auto JSON_DOWNLOAD_TIMEOUT_SECS{"DownloadTimeoutSeconds"}; inline constexpr auto JSON_DOWNLOAD_TIMEOUT_SECS{"DownloadTimeoutSeconds"};

View File

@@ -1097,6 +1097,16 @@ auto app_config::load() -> bool {
} }
} }
if (version_ == 3U) {
if (json_document.contains(JSON_REMOTE_CONFIG)) {
auto cfg = get_remote_config();
cfg.conn_timeout_ms = default_remote_conn_timeout_ms;
cfg.recv_timeout_ms = default_remote_recv_timeout_ms;
cfg.send_timeout_ms = default_remote_send_timeout_ms;
set_remote_config(cfg);
}
}
found = false; found = false;
} }

View File

@@ -28,9 +28,196 @@
#include "utils/collection.hpp" #include "utils/collection.hpp"
#include "utils/common.hpp" #include "utils/common.hpp"
#include "utils/error_utils.hpp" #include "utils/error_utils.hpp"
#include "utils/timeout.hpp"
#include "version.hpp" #include "version.hpp"
namespace {
namespace net = boost::asio;
constexpr std::uint8_t max_attempts{3U};
struct non_blocking_guard final {
net::ip::tcp::socket &sock;
bool non_blocking{};
non_blocking_guard(const non_blocking_guard &) = delete;
non_blocking_guard(non_blocking_guard &&) = delete;
auto operator=(const non_blocking_guard &) -> non_blocking_guard & = delete;
auto operator=(non_blocking_guard &&) -> non_blocking_guard & = delete;
explicit non_blocking_guard(net::ip::tcp::socket &sock_)
: sock(sock_), non_blocking(sock_.non_blocking()) {
boost::system::error_code err;
[[maybe_unused]] auto ret = sock_.non_blocking(true, err);
}
~non_blocking_guard() {
if (not sock.is_open()) {
return;
}
boost::system::error_code err;
[[maybe_unused]] auto ret = sock.non_blocking(non_blocking, err);
}
};
[[nodiscard]] auto is_socket_still_alive(boost::asio::ip::tcp::socket &sock)
-> bool {
if (not sock.is_open()) {
return false;
}
non_blocking_guard guard{sock};
boost::system::error_code err{};
std::array<std::uint8_t, 1> tmp{};
auto available = sock.receive(boost::asio::buffer(tmp),
boost::asio::socket_base::message_peek, err);
if (err == boost::asio::error::would_block ||
err == boost::asio::error::try_again) {
return true;
}
if (err == boost::asio::error::eof ||
err == boost::asio::error::connection_reset ||
err == boost::asio::error::operation_aborted ||
err == boost::asio::error::not_connected ||
err == boost::asio::error::bad_descriptor ||
err == boost::asio::error::network_down) {
return false;
}
if (not err && available == 0) {
return false;
}
if (not err && available > 0) {
return true;
}
return false;
}
template <class op_t, class cancel_t>
void run_with_deadline(net::io_context &io_ctx, op_t &&operation,
cancel_t &&cancel_op, std::chrono::milliseconds deadline,
std::string_view timeout_event_tag,
std::string_view op_name,
std::string_view function_name) {
deadline = std::max(deadline / max_attempts, 250ms);
boost::system::error_code err{};
bool done = false;
bool timed_out = false;
net::steady_timer timer{io_ctx};
timer.expires_after(deadline);
timer.async_wait([&](const boost::system::error_code &err_) {
if (not err_) {
timed_out = true;
std::forward<cancel_t>(cancel_op)();
}
});
std::forward<op_t>(operation)([&](const boost::system::error_code &err_) {
err = err_;
done = true;
});
io_ctx.restart();
while (not done && not timed_out) {
io_ctx.run_one();
}
timer.cancel();
if (timed_out) {
repertory::event_system::instance().raise<repertory::packet_client_timeout>(
timeout_event_tag, function_name);
throw std::runtime_error(std::string(op_name) + " timed-out");
}
if (err) {
throw std::runtime_error(std::string(op_name) + " failed|" + err.message());
}
}
void connect_with_deadline(net::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock,
const auto &endpoints,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
run_with_deadline(
io_ctx,
[&](auto &&handler) {
net::async_connect(sock, endpoints,
[handler = std::forward<decltype(handler)>(handler)](
const boost::system::error_code &err,
const auto &) { handler(err); });
},
[&]() { sock.cancel(); }, deadline, "connect", "connect", function_name);
}
void read_exact_with_deadline(net::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock, auto buf,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
auto *base = static_cast<std::uint8_t *>(const_cast<void *>(buf.data()));
std::size_t total = buf.size();
std::size_t offset = 0U;
while (offset < total) {
std::size_t bytes_read = 0U;
run_with_deadline(
io_ctx,
[&](auto &&handler) {
sock.async_read_some(
net::buffer(base + offset, total - offset),
[&, handler = std::forward<decltype(handler)>(handler)](
const boost::system::error_code &err, std::size_t count) {
bytes_read = count;
handler(err);
});
},
[&]() { sock.cancel(); }, deadline, "response", "read", function_name);
offset += bytes_read;
}
}
void write_all_with_deadline(net::io_context &io_ctx,
boost::asio::ip::tcp::socket &sock, auto buf,
std::chrono::milliseconds deadline) {
REPERTORY_USES_FUNCTION_NAME();
auto *base = static_cast<std::uint8_t *>(const_cast<void *>(buf.data()));
std::size_t total = buf.size();
std::size_t offset = 0U;
while (offset < total) {
std::size_t bytes_written = 0U;
run_with_deadline(
io_ctx,
// start one chunk write
[&](auto &&handler) {
sock.async_write_some(
net::buffer(base + offset, total - offset),
[&, handler = std::forward<decltype(handler)>(handler)](
const boost::system::error_code &err, std::size_t count) {
bytes_written = count;
handler(err);
});
},
[&]() { sock.cancel(); }, deadline, "request", "write", function_name);
offset += bytes_written;
}
}
} // namespace
namespace repertory { namespace repertory {
packet_client::packet_client(remote::remote_config cfg) packet_client::packet_client(remote::remote_config cfg)
: cfg_(std::move(cfg)), unique_id_(utils::create_uuid_string()) {} : cfg_(std::move(cfg)), unique_id_(utils::create_uuid_string()) {}
@@ -64,11 +251,12 @@ void packet_client::close_all() {
} }
void packet_client::connect(client &cli) { void packet_client::connect(client &cli) {
REPERTORY_USES_FUNCTION_NAME();
try { try {
resolve(); resolve();
boost::asio::connect(cli.socket, resolve_results_);
connect_with_deadline(io_context_, cli.socket, resolve_results_,
std::chrono::milliseconds(cfg_.send_timeout_ms));
cli.socket.set_option(boost::asio::ip::tcp::no_delay(true)); cli.socket.set_option(boost::asio::ip::tcp::no_delay(true));
cli.socket.set_option(boost::asio::socket_base::linger(false, 0)); cli.socket.set_option(boost::asio::socket_base::linger(false, 0));
cli.socket.set_option(boost::asio::socket_base::keep_alive(true)); cli.socket.set_option(boost::asio::socket_base::keep_alive(true));
@@ -78,33 +266,42 @@ void packet_client::connect(client &cli) {
if (res != 0) { if (res != 0) {
throw std::runtime_error(std::to_string(res)); throw std::runtime_error(std::to_string(res));
} }
} catch (const std::exception &e) { } catch (...) {
utils::error::raise_error(function_name, e, "connection handshake failed"); close(cli);
resolve_results_ = {}; resolve_results_ = {};
throw;
} }
} }
auto packet_client::get_client() -> std::shared_ptr<packet_client::client> { auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock clients_lock(clients_mutex_); unique_mutex_lock clients_lock(clients_mutex_);
if (not allow_connections_) { if (not allow_connections_) {
return nullptr; return nullptr;
} }
if (clients_.empty()) { try {
clients_lock.unlock(); if (clients_.empty()) {
clients_lock.unlock();
auto cli = std::make_shared<client>(io_context_); auto cli = std::make_shared<client>(io_context_);
connect(*cli); connect(*cli);
return cli;
}
auto cli = clients_.at(0U);
utils::collection::remove_element(clients_, cli);
return cli; return cli;
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "connection handshake failed");
} }
auto cli = clients_.at(0U); return nullptr;
utils::collection::remove_element(clients_, cli);
return cli;
} }
void packet_client::put_client(std::shared_ptr<client> &cli) { void packet_client::put_client(std::shared_ptr<client> &cli) {
if (not cli->socket.is_open()) { if (not cli || not is_socket_still_alive(cli->socket)) {
return; return;
} }
@@ -117,32 +314,21 @@ void packet_client::put_client(std::shared_ptr<client> &cli) {
auto packet_client::read_packet(client &cli, packet &response) const auto packet_client::read_packet(client &cli, packet &response) const
-> packet::error_type { -> packet::error_type {
data_buffer buffer(sizeof(std::uint32_t)); data_buffer buffer(sizeof(std::uint32_t));
const auto read_buffer = [&]() { read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
std::uint32_t offset{}; std::chrono::milliseconds(cfg_.recv_timeout_ms));
while (offset < buffer.size()) {
auto bytes_read = boost::asio::read(
cli.socket,
boost::asio::buffer(&buffer.at(offset), buffer.size() - offset));
if (bytes_read <= 0) {
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
}
offset += static_cast<std::uint32_t>(bytes_read);
}
};
read_buffer();
auto size = boost::endian::big_to_native( auto size = boost::endian::big_to_native(
*reinterpret_cast<std::uint32_t *>(buffer.data())); *reinterpret_cast<std::uint32_t *>(buffer.data()));
buffer.resize(size); buffer.resize(size);
read_buffer(); read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.recv_timeout_ms));
response = std::move(buffer); response = std::move(buffer);
auto ret = response.decrypt(cfg_.encryption_token); auto ret = response.decrypt(cfg_.encryption_token);
if (ret == 0) { if (ret == 0) {
ret = response.decode(cli.nonce); ret = response.decode(cli.nonce);
} }
return ret; return ret;
} }
@@ -181,47 +367,20 @@ auto packet_client::send(std::string_view method, packet &request,
request.encode_top(PACKET_SERVICE_FLAGS); request.encode_top(PACKET_SERVICE_FLAGS);
request.encode_top(std::string{project_get_version()}); request.encode_top(std::string{project_get_version()});
static constexpr std::uint8_t max_attempts{5U}; for (std::uint8_t retry = 1U;
for (std::uint8_t i = 1U; allow_connections_ && not success && (retry <= max_attempts); ++retry) {
allow_connections_ && not success && (i <= max_attempts); i++) {
auto current_client = get_client(); auto current_client = get_client();
if (current_client) { if (current_client) {
try { try {
request.encode_top(current_client->nonce); request.encode_top(current_client->nonce);
request.encrypt(cfg_.encryption_token); request.encrypt(cfg_.encryption_token);
timeout request_timeout( write_all_with_deadline(
[method, current_client]() { io_context_, current_client->socket,
event_system::instance().raise<packet_client_timeout>( boost::asio::buffer(&request[0], request.get_size()),
"request", function_name, std::string{method});
packet_client::close(*current_client);
},
std::chrono::milliseconds(cfg_.send_timeout_ms)); std::chrono::milliseconds(cfg_.send_timeout_ms));
std::uint32_t offset{};
while (offset < request.get_size()) {
auto bytes_written = boost::asio::write(
current_client->socket,
boost::asio::buffer(&request[offset],
request.get_size() - offset));
if (bytes_written <= 0) {
throw std::runtime_error("write failed|" +
std::to_string(bytes_written));
}
offset += static_cast<std::uint32_t>(bytes_written);
}
request_timeout.disable();
timeout response_timeout(
[method, current_client]() {
event_system::instance().raise<packet_client_timeout>(
"response", function_name, std::string{method});
packet_client::close(*current_client);
},
std::chrono::milliseconds(cfg_.recv_timeout_ms));
ret = read_packet(*current_client, response); ret = read_packet(*current_client, response);
response_timeout.disable();
if (ret == 0) { if (ret == 0) {
ret = response.decode(service_flags); ret = response.decode(service_flags);
if (ret == 0) { if (ret == 0) {
@@ -236,8 +395,9 @@ auto packet_client::send(std::string_view method, packet &request,
} }
} catch (const std::exception &e) { } catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "send failed"); utils::error::raise_error(function_name, e, "send failed");
close_all();
if (allow_connections_ && (i < max_attempts)) { close(*current_client);
if (allow_connections_ && (retry < max_attempts)) {
std::this_thread::sleep_for(1s); std::this_thread::sleep_for(1s);
} }
} }