reset timeout on data recv/sent

This commit is contained in:
2025-10-02 11:03:14 -05:00
parent 1a3b969713
commit 04307e0424
4 changed files with 88 additions and 25 deletions

View File

@@ -4,6 +4,8 @@
### BREAKING CHANGES
* Mount state has been moved into the configuration directory
* Unmount all active mounts prior to upgrade
* Remote mounts must be upgraded to v2.1.0+ to support new authentication scheme
* Protocol handshake added for DoS protection

View File

@@ -24,6 +24,9 @@
namespace repertory {
class timeout final {
public:
using callback_t = std::function<void()>;
public:
timeout(const timeout &) noexcept = delete;
timeout(timeout &&) noexcept = delete;
@@ -31,19 +34,23 @@ public:
auto operator=(timeout &&) noexcept -> timeout & = delete;
public:
timeout(std::function<void()> timeout_callback,
timeout(callback_t timeout_callback,
std::chrono::system_clock::duration duration);
~timeout() { disable(); }
~timeout();
private:
std::atomic<bool> timeout_killed_;
std::chrono::system_clock::duration duration_;
callback_t timeout_callback_;
std::atomic<bool> timeout_killed_{false};
std::unique_ptr<std::thread> timeout_thread_{nullptr};
std::mutex timeout_mutex_;
std::condition_variable timeout_notify_;
public:
void disable();
void reset();
};
} // namespace repertory

View File

@@ -73,11 +73,12 @@ void packet_client::close(client &cli) noexcept {
}
void packet_client::close_all() {
unique_mutex_lock clients_lock(clients_mutex_);
mutex_lock clients_lock(clients_mutex_);
for (auto &cli : clients_) {
close(*cli);
}
clients_.clear();
resolve_results_.store({});
unique_id_ = utils::create_uuid_string();
}
@@ -90,11 +91,23 @@ auto packet_client::check_version(std::uint32_t client_version,
min_version = 0U;
boost::asio::io_context ctx{};
auto resolve_results = tcp::resolver(ctx).resolve(
cfg_.host_name_or_ip, std::to_string(cfg_.api_port));
client cli(ctx);
boost::asio::connect(cli.socket, resolve_results);
timeout connect_timeout(
[&cli]() {
event_system::instance().raise<packet_client_timeout>("connect",
function_name);
packet_client::close(cli);
},
std::chrono::milliseconds(cfg_.conn_timeout_ms));
boost::asio::connect(
cli.socket, tcp::resolver(ctx).resolve(cfg_.host_name_or_ip,
std::to_string(cfg_.api_port)));
connect_timeout.disable();
if (not is_socket_still_alive(cli.socket)) {
utils::error::raise_error(function_name, e, "failed to connect");
}
comm::apply_common_socket_properties(cli.socket);
@@ -116,9 +129,23 @@ auto packet_client::check_version(std::uint32_t client_version,
auto packet_client::connect(client &cli) -> bool {
try {
timeout connect_timeout(
[&cli]() {
event_system::instance().raise<packet_client_timeout>("connect",
function_name);
packet_client::close(cli);
},
std::chrono::milliseconds(cfg_.conn_timeout_ms));
resolve();
boost::asio::connect(cli.socket, resolve_results_.load());
connect_timeout.disable();
if (not is_socket_still_alive(cli.socket)) {
utils::error::raise_error(function_name, e, "failed to connect");
}
comm::apply_common_socket_properties(cli.socket);
@@ -246,6 +273,7 @@ void packet_client::read_data(client &cli, data_buffer &buffer) const {
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
}
offset += static_cast<std::uint32_t>(bytes_read);
read_timeout.reset();
}
}
@@ -338,7 +366,7 @@ auto packet_client::send(std::string_view method, packet &request,
current_request.encode_top(current_client->nonce);
request = current_request;
current_request.encrypt(cfg_.encryption_token);
current_request.encrypt(cfg_.encryption_token, true);
write_data(*current_client, current_request);
ret = read_packet(*current_client, response);
@@ -389,6 +417,7 @@ void packet_client::write_data(client &cli, const packet &request) const {
throw std::runtime_error("write failed|" + std::to_string(bytes_written));
}
offset += static_cast<std::uint32_t>(bytes_written);
read_timeout.reset();
}
}
} // namespace repertory

View File

@@ -22,36 +22,61 @@
#include "utils/timeout.hpp"
namespace repertory {
timeout::timeout(std::function<void()> timeout_callback,
timeout::timeout(callback_t timeout_callback,
std::chrono::system_clock::duration duration)
: timeout_killed_(duration == 0s) {
: duration_(duration),
timeout_callback_(std::move(timeout_callback)),
timeout_killed_(duration <= std::chrono::system_clock::duration::zero()) {
if (timeout_killed_) {
return;
}
timeout_thread_ =
std::make_unique<std::thread>([this, duration, timeout_callback]() {
unique_mutex_lock lock(timeout_mutex_);
if (not timeout_killed_) {
timeout_notify_.wait_for(lock, duration);
if (not timeout_killed_) {
timeout_callback();
}
}
});
timeout_thread_ = std::make_unique<std::thread>([this]() {
std::unique_lock<std::mutex> loc_lock(timeout_mutex_);
while (not timeout_killed_) {
auto res = timeout_notify_.wait_for(loc_lock, duration_);
if (res != std::cv_status::timeout) {
continue;
}
if (timeout_killed_) {
return;
}
timeout_killed_ = true;
loc_lock.unlock();
try {
timeout_callback_();
} catch (...) {
}
return;
}
});
}
timeout::~timeout() { disable(); }
void timeout::disable() {
if (timeout_killed_) {
unique_mutex_lock lock(timeout_mutex_);
std::unique_ptr<std::thread> timeout_thread{nullptr};
std::swap(timeout_thread, timeout_thread_);
if (not timeout_thread) {
timeout_notify_.notify_all();
return;
}
timeout_killed_ = true;
unique_mutex_lock lock(timeout_mutex_);
timeout_notify_.notify_all();
lock.unlock();
timeout_thread_->join();
timeout_thread_.reset();
timeout_thread->join();
}
void timeout::reset() {
mutex_lock lock(timeout_mutex_);
timeout_notify_.notify_all();
}
} // namespace repertory