fix intermintent hang on remote server disconnect
All checks were successful
Blockstorage/repertory/pipeline/head This commit looks good
All checks were successful
Blockstorage/repertory/pipeline/head This commit looks good
This commit is contained in:
@@ -58,7 +58,7 @@ inline constexpr std::string_view REPERTORY{"repertory"};
|
|||||||
inline constexpr std::string_view REPERTORY_DATA_NAME{"repertory2"};
|
inline constexpr std::string_view REPERTORY_DATA_NAME{"repertory2"};
|
||||||
inline constexpr std::wstring_view REPERTORY_W{L"repertory"};
|
inline constexpr std::wstring_view REPERTORY_W{L"repertory"};
|
||||||
|
|
||||||
inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{3ULL};
|
inline constexpr std::uint64_t REPERTORY_CONFIG_VERSION{4ULL};
|
||||||
inline constexpr std::string_view REPERTORY_MIN_REMOTE_VERSION{"2.1.0"};
|
inline constexpr std::string_view REPERTORY_MIN_REMOTE_VERSION{"2.1.0"};
|
||||||
inline constexpr std::string_view RENTERD_MIN_VERSION{"2.0.0"};
|
inline constexpr std::string_view RENTERD_MIN_VERSION{"2.0.0"};
|
||||||
|
|
||||||
|
|||||||
@@ -34,11 +34,11 @@ inline constexpr auto PACKET_SERVICE_FLAGS{PACKET_SERVICE_FUSE};
|
|||||||
#endif // defined(_WIN32)
|
#endif // defined(_WIN32)
|
||||||
|
|
||||||
inline constexpr auto default_remote_client_pool_size{20U};
|
inline constexpr auto default_remote_client_pool_size{20U};
|
||||||
inline constexpr auto default_remote_conn_timeout_ms{500U};
|
inline constexpr auto default_remote_conn_timeout_ms{3000U};
|
||||||
inline constexpr auto default_remote_directory_page_size{std::size_t(100U)};
|
inline constexpr auto default_remote_directory_page_size{std::size_t(100U)};
|
||||||
inline constexpr auto default_remote_max_connections{20U};
|
inline constexpr auto default_remote_max_connections{20U};
|
||||||
inline constexpr auto default_remote_recv_timeout_ms{500U};
|
inline constexpr auto default_remote_recv_timeout_ms{3000U};
|
||||||
inline constexpr auto default_remote_send_timeout_ms{250U};
|
inline constexpr auto default_remote_send_timeout_ms{1500U};
|
||||||
|
|
||||||
namespace repertory::remote {
|
namespace repertory::remote {
|
||||||
struct remote_config final {
|
struct remote_config final {
|
||||||
|
|||||||
@@ -1097,7 +1097,7 @@ auto app_config::load() -> bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (version_ == 3U) {
|
if (version_ == 3U || version_ == 4U) {
|
||||||
if (json_document.contains(JSON_REMOTE_CONFIG)) {
|
if (json_document.contains(JSON_REMOTE_CONFIG)) {
|
||||||
auto cfg = get_remote_config();
|
auto cfg = get_remote_config();
|
||||||
cfg.conn_timeout_ms = default_remote_conn_timeout_ms;
|
cfg.conn_timeout_ms = default_remote_conn_timeout_ms;
|
||||||
|
|||||||
@@ -105,15 +105,16 @@ void run_with_deadline(net::io_context &io_ctx, op_t &&operation,
|
|||||||
std::string_view timeout_event_tag,
|
std::string_view timeout_event_tag,
|
||||||
std::string_view op_name,
|
std::string_view op_name,
|
||||||
std::string_view function_name) {
|
std::string_view function_name) {
|
||||||
deadline = std::max(deadline / max_attempts, 250ms);
|
deadline = std::max(deadline, std::chrono::milliseconds{250});
|
||||||
|
|
||||||
boost::system::error_code err{};
|
boost::system::error_code err{};
|
||||||
bool done = false;
|
bool done = false;
|
||||||
bool timed_out = false;
|
bool timed_out = false;
|
||||||
|
|
||||||
net::steady_timer timer{io_ctx};
|
net::steady_timer timer{io_ctx};
|
||||||
timer.expires_after(deadline);
|
timer.expires_after(deadline);
|
||||||
timer.async_wait([&](const boost::system::error_code &err_) {
|
timer.async_wait([&](const boost::system::error_code &err_) {
|
||||||
if (not err_) {
|
if (not err_ && not done) {
|
||||||
timed_out = true;
|
timed_out = true;
|
||||||
std::forward<cancel_t>(cancel_op)();
|
std::forward<cancel_t>(cancel_op)();
|
||||||
}
|
}
|
||||||
@@ -125,14 +126,14 @@ void run_with_deadline(net::io_context &io_ctx, op_t &&operation,
|
|||||||
});
|
});
|
||||||
|
|
||||||
io_ctx.restart();
|
io_ctx.restart();
|
||||||
while (not done && not timed_out) {
|
while (not done) {
|
||||||
io_ctx.run_one();
|
io_ctx.run_one();
|
||||||
}
|
}
|
||||||
timer.cancel();
|
timer.cancel();
|
||||||
|
|
||||||
if (timed_out) {
|
if (timed_out) {
|
||||||
repertory::event_system::instance().raise<repertory::packet_client_timeout>(
|
repertory::event_system::instance().raise<repertory::packet_client_timeout>(
|
||||||
timeout_event_tag, function_name);
|
std::string(timeout_event_tag), std::string(function_name));
|
||||||
throw std::runtime_error(std::string(op_name) + " timed-out");
|
throw std::runtime_error(std::string(op_name) + " timed-out");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -183,6 +184,10 @@ void read_exact_with_deadline(net::io_context &io_ctx,
|
|||||||
},
|
},
|
||||||
[&]() { sock.cancel(); }, deadline, "response", "read", function_name);
|
[&]() { sock.cancel(); }, deadline, "response", "read", function_name);
|
||||||
|
|
||||||
|
if (bytes_read == 0U) {
|
||||||
|
throw std::runtime_error("0 bytes read");
|
||||||
|
}
|
||||||
|
|
||||||
offset += bytes_read;
|
offset += bytes_read;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -201,7 +206,6 @@ void write_all_with_deadline(net::io_context &io_ctx,
|
|||||||
|
|
||||||
run_with_deadline(
|
run_with_deadline(
|
||||||
io_ctx,
|
io_ctx,
|
||||||
// start one chunk write
|
|
||||||
[&](auto &&handler) {
|
[&](auto &&handler) {
|
||||||
sock.async_write_some(
|
sock.async_write_some(
|
||||||
net::buffer(base + offset, total - offset),
|
net::buffer(base + offset, total - offset),
|
||||||
@@ -213,6 +217,10 @@ void write_all_with_deadline(net::io_context &io_ctx,
|
|||||||
},
|
},
|
||||||
[&]() { sock.cancel(); }, deadline, "request", "write", function_name);
|
[&]() { sock.cancel(); }, deadline, "request", "write", function_name);
|
||||||
|
|
||||||
|
if (bytes_written == 0U) {
|
||||||
|
throw std::runtime_error("0 bytes written");
|
||||||
|
}
|
||||||
|
|
||||||
offset += bytes_written;
|
offset += bytes_written;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -255,7 +263,7 @@ void packet_client::connect(client &cli) {
|
|||||||
resolve();
|
resolve();
|
||||||
|
|
||||||
connect_with_deadline(io_context_, cli.socket, resolve_results_,
|
connect_with_deadline(io_context_, cli.socket, resolve_results_,
|
||||||
std::chrono::milliseconds(cfg_.send_timeout_ms));
|
std::chrono::milliseconds(cfg_.conn_timeout_ms));
|
||||||
|
|
||||||
cli.socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
cli.socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
||||||
cli.socket.set_option(boost::asio::socket_base::linger(false, 0));
|
cli.socket.set_option(boost::asio::socket_base::linger(false, 0));
|
||||||
@@ -317,8 +325,10 @@ auto packet_client::read_packet(client &cli, packet &response) const
|
|||||||
read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
|
read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
|
||||||
std::chrono::milliseconds(cfg_.recv_timeout_ms));
|
std::chrono::milliseconds(cfg_.recv_timeout_ms));
|
||||||
|
|
||||||
auto size = boost::endian::big_to_native(
|
std::uint32_t size_be = 0U;
|
||||||
*reinterpret_cast<std::uint32_t *>(buffer.data()));
|
std::memcpy(&size_be, buffer.data(), sizeof(size_be));
|
||||||
|
const std::uint32_t size = boost::endian::big_to_native(size_be);
|
||||||
|
|
||||||
buffer.resize(size);
|
buffer.resize(size);
|
||||||
|
|
||||||
read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
|
read_exact_with_deadline(io_context_, cli.socket, boost::asio::buffer(buffer),
|
||||||
|
|||||||
Reference in New Issue
Block a user