packet comm refactor and fixes

This commit is contained in:
2025-09-26 08:04:18 -05:00
parent 3a3e73f776
commit ce13149776
4 changed files with 94 additions and 61 deletions

View File

@@ -25,6 +25,7 @@
#include "utils/common.hpp"
namespace repertory::comm {
inline static constexpr std::uint32_t max_packet_bytes{32U * 1024U * 1024U};
inline constexpr const std::uint8_t max_read_attempts{5U};
inline constexpr const std::uint16_t packet_nonce_size{256U};
inline constexpr const std::uint16_t server_handshake_timeout_ms{3000U};

View File

@@ -1,17 +1,13 @@
/*
Copyright <2018-2025> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
copies of the Software, and to permit persons to do so, subject to the
following conditions: The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software. THE
SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
@@ -24,6 +20,7 @@
#include "comm/packet/packet.hpp"
#include "types/remote.hpp"
#include "utils/atomic.hpp"
using boost::asio::ip::tcp;
@@ -47,24 +44,26 @@ public:
auto operator=(packet_client &&) -> packet_client & = delete;
private:
mutable boost::asio::io_context io_context_;
remote::remote_config cfg_;
std::string unique_id_;
mutable boost::asio::io_context io_context_;
atomic<std::string> unique_id_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type>
work_guard_;
private:
bool allow_connections_{true};
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type
std::atomic<bool> allow_connections_{true};
atomic<boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type>
resolve_results_;
std::mutex clients_mutex_;
std::vector<std::shared_ptr<client>> clients_;
std::vector<std::thread> service_threads_;
private:
static void close(client &cli);
static void close(client &cli) noexcept;
void close_all();
void connect(client &cli);
[[nodiscard]] auto connect(client &cli) -> bool;
[[nodiscard]] auto get_client() -> std::shared_ptr<client>;

View File

@@ -1,17 +1,13 @@
/*
Copyright <2018-2025> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
copies of the Software, and to permit persons to do so, subject to the
following conditions: The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software. THE
SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
@@ -26,45 +22,49 @@
#include "types/repertory.hpp"
#include "utils/collection.hpp"
#include "utils/common.hpp"
#include "utils/config.hpp"
#include "utils/error_utils.hpp"
#include "utils/utils.hpp"
#include "version.hpp"
#include <utils/config.hpp>
using namespace repertory::comm;
namespace repertory {
packet_client::packet_client(remote::remote_config cfg)
: cfg_(std::move(cfg)), unique_id_(utils::create_uuid_string()) {
: cfg_(std::move(cfg)),
io_context_(),
unique_id_(utils::create_uuid_string()),
work_guard_(boost::asio::make_work_guard(io_context_)) {
for (std::uint8_t idx = 0U; idx < cfg.max_connections; ++idx) {
service_threads_.emplace_back([this]() { io_context_.run(); });
}
}
packet_client::~packet_client() {
allow_connections_ = false;
close_all();
REPERTORY_USES_FUNCTION_NAME();
for (std::size_t idx = 0U; idx < service_threads_.size(); ++idx) {
io_context_.stop();
allow_connections_ = false;
try {
close_all();
} catch (...) {
}
work_guard_.reset();
io_context_.stop();
for (auto &thread : service_threads_) {
thread.join();
if (thread.joinable()) {
thread.join();
}
}
}
void packet_client::close(client &cli) {
REPERTORY_USES_FUNCTION_NAME();
try {
cli.socket.shutdown(boost::asio::socket_base::shutdown_both);
boost::system::error_code err;
[[maybe_unused]] auto res = cli.socket.close(err);
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "connection handshake failed");
}
void packet_client::close(client &cli) noexcept {
boost::system::error_code err1;
cli.socket.shutdown(boost::asio::socket_base::shutdown_both, err1);
boost::system::error_code err2;
cli.socket.close(err2);
}
void packet_client::close_all() {
@@ -72,9 +72,7 @@ void packet_client::close_all() {
for (auto &cli : clients_) {
close(*cli);
}
clients_.clear();
io_context_.restart();
resolve_results_ = {};
unique_id_ = utils::create_uuid_string();
}
@@ -119,11 +117,13 @@ auto packet_client::check_version(std::uint32_t client_version,
return api_error::error;
}
void packet_client::connect(client &cli) {
auto packet_client::connect(client &cli) -> bool {
try {
resolve();
connect_with_deadline(io_context_, cli.socket, resolve_results_,
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type cached =
resolve_results_;
connect_with_deadline(io_context_, cli.socket, cached,
std::chrono::milliseconds(cfg_.conn_timeout_ms));
cli.socket.set_option(boost::asio::ip::tcp::no_delay(true));
@@ -133,7 +133,7 @@ void packet_client::connect(client &cli) {
std::uint32_t min_version{};
if (not handshake(cli, io_context_, min_version)) {
close(cli);
return;
return false;
}
packet response;
@@ -141,6 +141,7 @@ void packet_client::connect(client &cli) {
if (res != 0) {
throw std::runtime_error(fmt::format("read packet failed|err|{}", res));
}
return true;
} catch (...) {
close(cli);
resolve_results_ = {};
@@ -151,17 +152,19 @@ void packet_client::connect(client &cli) {
auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
REPERTORY_USES_FUNCTION_NAME();
unique_mutex_lock clients_lock(clients_mutex_);
if (not allow_connections_) {
return nullptr;
}
try {
unique_mutex_lock clients_lock(clients_mutex_);
if (clients_.empty()) {
clients_lock.unlock();
auto cli = std::make_shared<client>(io_context_);
connect(*cli);
if (!connect(*cli)) {
return nullptr;
}
return cli;
}
@@ -169,7 +172,7 @@ auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
utils::collection::remove_element(clients_, cli);
return cli;
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "connection handshake failed");
utils::error::raise_error(function_name, e, "get_client failed");
}
return nullptr;
@@ -190,7 +193,8 @@ auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
data_buffer buffer;
{
packet tmp;
tmp.encode(utils::get_version_number(REPERTORY_MIN_REMOTE_VERSION));
tmp.encode(utils::get_version_number(project_get_version()));
tmp.encode(~utils::get_version_number(project_get_version()));
tmp.encode(utils::generate_random_string(packet_nonce_size));
tmp.to_buffer(buffer);
}
@@ -198,11 +202,22 @@ auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
read_exact_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
std::chrono::milliseconds(cfg_.recv_timeout_ms));
packet response(buffer);
auto res = response.decode(min_version);
if (res != 0) {
throw std::runtime_error("failed to decode server version");
}
std::uint32_t min_version_check{};
res = response.decode(min_version_check);
if (res != 0) {
throw std::runtime_error("failed to decode server version");
}
if (min_version_check != ~min_version) {
throw std::runtime_error("failed to decode server version");
}
response.encrypt(cfg_.encryption_token, false);
response.to_buffer(buffer);
@@ -210,7 +225,7 @@ auto packet_client::handshake(client &cli, boost::asio::io_context &ctx,
std::chrono::milliseconds(cfg_.send_timeout_ms));
return true;
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "handlshake failed");
utils::error::raise_error(function_name, e, "handshake failed");
}
return false;
@@ -241,6 +256,11 @@ auto packet_client::read_packet(client &cli, boost::asio::io_context &ctx,
std::uint32_t to_read{};
std::memcpy(&to_read, buffer.data(), sizeof(to_read));
boost::endian::big_to_native_inplace(to_read);
if (to_read == 0U || to_read > comm::max_packet_bytes) {
return utils::from_api_error(api_error::comm_error);
}
buffer.resize(to_read);
read_exact_with_deadline(ctx, cli.socket, boost::asio::buffer(buffer),
@@ -255,7 +275,9 @@ auto packet_client::read_packet(client &cli, boost::asio::io_context &ctx,
}
void packet_client::resolve() {
if (not resolve_results_.empty()) {
boost::asio::ip::basic_resolver<boost::asio::ip::tcp>::results_type cached =
resolve_results_;
if (not cached.empty()) {
return;
}
@@ -282,12 +304,14 @@ auto packet_client::send(std::string_view method, packet &request,
REPERTORY_USES_FUNCTION_NAME();
auto success = false;
packet::error_type ret = utils::from_api_error(api_error::error);
request.encode_top(method);
request.encode_top(utils::get_thread_id());
request.encode_top(unique_id_);
request.encode_top(PACKET_SERVICE_FLAGS);
request.encode_top(std::string{project_get_version()});
auto ret = utils::from_api_error(api_error::error);
auto base_request = request;
base_request.encode_top(method);
base_request.encode_top(utils::get_thread_id());
base_request.encode_top(unique_id_.load());
base_request.encode_top(PACKET_SERVICE_FLAGS);
base_request.encode_top(std::string{project_get_version()});
for (std::uint8_t retry = 1U;
allow_connections_ && not success && (retry <= max_read_attempts);
@@ -295,12 +319,16 @@ auto packet_client::send(std::string_view method, packet &request,
auto current_client = get_client();
if (current_client) {
try {
request.encode_top(current_client->nonce);
request.encrypt(cfg_.encryption_token);
auto current_request = base_request;
current_request.encode_top(current_client->nonce);
request = current_request;
current_request.encrypt(cfg_.encryption_token);
write_all_with_deadline(
io_context_, current_client->socket,
boost::asio::buffer(&request[0], request.get_size()),
boost::asio::buffer(&current_request[0],
current_request.get_size()),
std::chrono::milliseconds(cfg_.send_timeout_ms));
ret = read_packet(*current_client, response);
@@ -318,7 +346,6 @@ auto packet_client::send(std::string_view method, packet &request,
}
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "send failed");
close(*current_client);
if (allow_connections_ && (retry < max_read_attempts)) {
std::this_thread::sleep_for(1s);

View File

@@ -92,6 +92,7 @@ auto packet_server::handshake(std::shared_ptr<connection> conn) const -> bool {
data_buffer buffer;
packet request;
request.encode(utils::get_version_number(REPERTORY_MIN_REMOTE_VERSION));
request.encode(~utils::get_version_number(REPERTORY_MIN_REMOTE_VERSION));
request.encode(conn->nonce);
request.to_buffer(buffer);
auto to_read{buffer.size() + utils::encryption::encryption_header_size};
@@ -276,6 +277,11 @@ void packet_server::read_packet(std::shared_ptr<connection> conn,
}
};
if (data_size > comm::max_packet_bytes) {
throw std::runtime_error(
fmt::format("packet too large|size|{}", data_size));
}
auto should_send_response = true;
auto response = std::make_shared<packet>();
conn->buffer.resize(data_size);