@@ -1,160 +1,160 @@
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/client_pool.hpp"
|
||||
|
||||
#include "events/event_system.hpp"
|
||||
#include "events/events.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
void client_pool::pool::execute(
|
||||
std::uint64_t thread_id, const worker_callback &worker,
|
||||
const worker_complete_callback &worker_complete) {
|
||||
const auto index = thread_id % pool_queues_.size();
|
||||
auto wi = std::make_shared<work_item>(worker, worker_complete);
|
||||
auto &pool_queue = pool_queues_[index];
|
||||
|
||||
unique_mutex_lock queue_lock(pool_queue->mutex);
|
||||
pool_queue->queue.emplace_back(wi);
|
||||
pool_queue->notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
}
|
||||
|
||||
client_pool::pool::pool(std::uint8_t pool_size) {
|
||||
event_system::instance().raise<service_started>("client_pool");
|
||||
thread_index_ = 0u;
|
||||
for (std::uint8_t i = 0u; i < pool_size; i++) {
|
||||
pool_queues_.emplace_back(std::make_unique<work_queue>());
|
||||
}
|
||||
|
||||
for (std::size_t i = 0u; i < pool_queues_.size(); i++) {
|
||||
pool_threads_.emplace_back([this]() {
|
||||
const auto thread_index = thread_index_++;
|
||||
|
||||
auto &pool_queue = pool_queues_[thread_index];
|
||||
auto &queue = pool_queue->queue;
|
||||
auto &queue_mutex = pool_queue->mutex;
|
||||
auto &queue_notify = pool_queue->notify;
|
||||
|
||||
unique_mutex_lock queue_lock(queue_mutex);
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
while (not shutdown_) {
|
||||
queue_lock.lock();
|
||||
if (queue.empty()) {
|
||||
queue_notify.wait(queue_lock);
|
||||
}
|
||||
|
||||
while (not queue.empty()) {
|
||||
auto item = queue.front();
|
||||
queue.pop_front();
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
|
||||
try {
|
||||
const auto result = item->work();
|
||||
item->work_complete(result);
|
||||
} catch (const std::exception &e) {
|
||||
item->work_complete(utils::from_api_error(api_error::error));
|
||||
utils::error::raise_error(__FUNCTION__, e,
|
||||
"exception occurred in work item");
|
||||
}
|
||||
|
||||
queue_lock.lock();
|
||||
}
|
||||
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
}
|
||||
|
||||
queue_lock.lock();
|
||||
while (not queue.empty()) {
|
||||
auto wi = queue.front();
|
||||
queue.pop_front();
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
|
||||
wi->work_complete(utils::from_api_error(api_error::download_stopped));
|
||||
|
||||
queue_lock.lock();
|
||||
}
|
||||
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void client_pool::pool::shutdown() {
|
||||
shutdown_ = true;
|
||||
|
||||
for (auto &pool_queue : pool_queues_) {
|
||||
unique_mutex_lock l(pool_queue->mutex);
|
||||
pool_queue->notify.notify_all();
|
||||
}
|
||||
|
||||
for (auto &thread : pool_threads_) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
pool_queues_.clear();
|
||||
pool_threads_.clear();
|
||||
}
|
||||
|
||||
void client_pool::execute(const std::string &client_id, std::uint64_t thread_id,
|
||||
const worker_callback &worker,
|
||||
const worker_complete_callback &worker_complete) {
|
||||
unique_mutex_lock pool_lock(pool_mutex_);
|
||||
if (shutdown_) {
|
||||
pool_lock.unlock();
|
||||
throw std::runtime_error("Client pool is shutdown");
|
||||
}
|
||||
|
||||
if (not pool_lookup_[client_id]) {
|
||||
pool_lookup_[client_id] = std::make_shared<pool>(pool_size_);
|
||||
}
|
||||
|
||||
pool_lookup_[client_id]->execute(thread_id, worker, worker_complete);
|
||||
pool_lock.unlock();
|
||||
}
|
||||
|
||||
void client_pool::remove_client(const std::string &client_id) {
|
||||
mutex_lock pool_lock(pool_mutex_);
|
||||
pool_lookup_.erase(client_id);
|
||||
}
|
||||
|
||||
void client_pool::shutdown() {
|
||||
if (not shutdown_) {
|
||||
event_system::instance().raise<service_shutdown_begin>("client_pool");
|
||||
unique_mutex_lock pool_lock(pool_mutex_);
|
||||
if (not shutdown_) {
|
||||
shutdown_ = true;
|
||||
for (auto &kv : pool_lookup_) {
|
||||
kv.second->shutdown();
|
||||
}
|
||||
pool_lookup_.clear();
|
||||
}
|
||||
pool_lock.unlock();
|
||||
event_system::instance().raise<service_shutdown_end>("client_pool");
|
||||
}
|
||||
}
|
||||
} // namespace repertory
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/client_pool.hpp"
|
||||
|
||||
#include "events/event_system.hpp"
|
||||
#include "events/events.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
void client_pool::pool::execute(
|
||||
std::uint64_t thread_id, const worker_callback &worker,
|
||||
const worker_complete_callback &worker_complete) {
|
||||
const auto index = thread_id % pool_queues_.size();
|
||||
auto job = std::make_shared<work_item>(worker, worker_complete);
|
||||
auto &pool_queue = pool_queues_[index];
|
||||
|
||||
unique_mutex_lock queue_lock(pool_queue->mutex);
|
||||
pool_queue->queue.emplace_back(job);
|
||||
pool_queue->notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
}
|
||||
|
||||
client_pool::pool::pool(std::uint8_t pool_size) {
|
||||
event_system::instance().raise<service_started>("client_pool");
|
||||
|
||||
for (std::uint8_t i = 0U; i < pool_size; i++) {
|
||||
pool_queues_.emplace_back(std::make_unique<work_queue>());
|
||||
}
|
||||
|
||||
for (std::size_t i = 0U; i < pool_queues_.size(); i++) {
|
||||
pool_threads_.emplace_back([this]() {
|
||||
const auto thread_index = thread_index_++;
|
||||
|
||||
auto &pool_queue = pool_queues_[thread_index];
|
||||
auto &queue = pool_queue->queue;
|
||||
auto &queue_mutex = pool_queue->mutex;
|
||||
auto &queue_notify = pool_queue->notify;
|
||||
|
||||
unique_mutex_lock queue_lock(queue_mutex);
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
while (not shutdown_) {
|
||||
queue_lock.lock();
|
||||
if (queue.empty()) {
|
||||
queue_notify.wait(queue_lock);
|
||||
}
|
||||
|
||||
while (not queue.empty()) {
|
||||
auto item = queue.front();
|
||||
queue.pop_front();
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
|
||||
try {
|
||||
const auto result = item->work();
|
||||
item->work_complete(result);
|
||||
} catch (const std::exception &e) {
|
||||
item->work_complete(utils::from_api_error(api_error::error));
|
||||
utils::error::raise_error(__FUNCTION__, e,
|
||||
"exception occurred in work item");
|
||||
}
|
||||
|
||||
queue_lock.lock();
|
||||
}
|
||||
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
}
|
||||
|
||||
queue_lock.lock();
|
||||
while (not queue.empty()) {
|
||||
auto job = queue.front();
|
||||
queue.pop_front();
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
|
||||
job->work_complete(utils::from_api_error(api_error::download_stopped));
|
||||
|
||||
queue_lock.lock();
|
||||
}
|
||||
|
||||
queue_notify.notify_all();
|
||||
queue_lock.unlock();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void client_pool::pool::shutdown() {
|
||||
shutdown_ = true;
|
||||
|
||||
for (auto &pool_queue : pool_queues_) {
|
||||
mutex_lock lock(pool_queue->mutex);
|
||||
pool_queue->notify.notify_all();
|
||||
}
|
||||
|
||||
for (auto &thread : pool_threads_) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
pool_queues_.clear();
|
||||
pool_threads_.clear();
|
||||
}
|
||||
|
||||
void client_pool::execute(const std::string &client_id, std::uint64_t thread_id,
|
||||
const worker_callback &worker,
|
||||
const worker_complete_callback &worker_complete) {
|
||||
unique_mutex_lock pool_lock(pool_mutex_);
|
||||
if (shutdown_) {
|
||||
pool_lock.unlock();
|
||||
throw std::runtime_error("Client pool is shutdown");
|
||||
}
|
||||
|
||||
if (not pool_lookup_[client_id]) {
|
||||
pool_lookup_[client_id] = std::make_shared<pool>(pool_size_);
|
||||
}
|
||||
|
||||
pool_lookup_[client_id]->execute(thread_id, worker, worker_complete);
|
||||
pool_lock.unlock();
|
||||
}
|
||||
|
||||
void client_pool::remove_client(const std::string &client_id) {
|
||||
mutex_lock pool_lock(pool_mutex_);
|
||||
pool_lookup_.erase(client_id);
|
||||
}
|
||||
|
||||
void client_pool::shutdown() {
|
||||
if (not shutdown_) {
|
||||
event_system::instance().raise<service_shutdown_begin>("client_pool");
|
||||
unique_mutex_lock pool_lock(pool_mutex_);
|
||||
if (not shutdown_) {
|
||||
shutdown_ = true;
|
||||
for (auto &pool_entry : pool_lookup_) {
|
||||
pool_entry.second->shutdown();
|
||||
}
|
||||
pool_lookup_.clear();
|
||||
}
|
||||
pool_lock.unlock();
|
||||
event_system::instance().raise<service_shutdown_end>("client_pool");
|
||||
}
|
||||
}
|
||||
} // namespace repertory
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,247 +1,247 @@
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/packet_client.hpp"
|
||||
|
||||
#include "events/events.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
#include "utils/timeout.hpp"
|
||||
|
||||
namespace repertory {
|
||||
// clang-format off
|
||||
E_SIMPLE2(packet_client_timeout, error, true,
|
||||
std::string, event_name, en, E_STRING,
|
||||
std::string, message, msg, E_STRING
|
||||
);
|
||||
// clang-format on
|
||||
|
||||
packet_client::packet_client(std::string host_name_or_ip,
|
||||
std::uint8_t max_connections, std::uint16_t port,
|
||||
std::uint16_t receive_timeout,
|
||||
std::uint16_t send_timeout,
|
||||
std::string encryption_token)
|
||||
: io_context_(),
|
||||
host_name_or_ip_(std::move(host_name_or_ip)),
|
||||
max_connections_(max_connections ? max_connections : 20u),
|
||||
port_(port),
|
||||
receive_timeout_(receive_timeout),
|
||||
send_timeout_(send_timeout),
|
||||
encryption_token_(std::move(encryption_token)),
|
||||
unique_id_(utils::create_uuid_string()) {}
|
||||
|
||||
packet_client::~packet_client() {
|
||||
allow_connections_ = false;
|
||||
close_all();
|
||||
io_context_.stop();
|
||||
}
|
||||
|
||||
void packet_client::close(client &cli) const {
|
||||
try {
|
||||
boost::system::error_code ec;
|
||||
cli.socket.close(ec);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
void packet_client::close_all() {
|
||||
unique_mutex_lock clients_lock(clients_mutex_);
|
||||
for (auto &c : clients_) {
|
||||
close(*c.get());
|
||||
}
|
||||
|
||||
clients_.clear();
|
||||
unique_id_ = utils::create_uuid_string();
|
||||
}
|
||||
|
||||
void packet_client::connect(client &c) {
|
||||
try {
|
||||
resolve();
|
||||
boost::asio::connect(c.socket, resolve_results_);
|
||||
c.socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
||||
c.socket.set_option(boost::asio::socket_base::linger(false, 0));
|
||||
|
||||
packet response;
|
||||
const auto res = read_packet(c, response);
|
||||
if (res != 0) {
|
||||
throw std::runtime_error(std::to_string(res));
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
utils::error::raise_error(__FUNCTION__, e, "connection handshake failed");
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
|
||||
std::shared_ptr<client> ret;
|
||||
|
||||
unique_mutex_lock clients_lock(clients_mutex_);
|
||||
if (allow_connections_) {
|
||||
if (clients_.empty()) {
|
||||
clients_lock.unlock();
|
||||
ret = std::make_shared<client>(io_context_);
|
||||
connect(*ret);
|
||||
} else {
|
||||
ret = clients_[0u];
|
||||
utils::remove_element_from(clients_, ret);
|
||||
clients_lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void packet_client::put_client(std::shared_ptr<client> &c) {
|
||||
mutex_lock clientsLock(clients_mutex_);
|
||||
if (clients_.size() < max_connections_) {
|
||||
clients_.emplace_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::read_packet(client &c, packet &response)
|
||||
-> packet::error_type {
|
||||
data_buffer buffer(sizeof(std::uint32_t));
|
||||
const auto read_buffer = [&]() {
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < buffer.size()) {
|
||||
const auto bytes_read = boost::asio::read(
|
||||
c.socket,
|
||||
boost::asio::buffer(&buffer[offset], buffer.size() - offset));
|
||||
if (bytes_read <= 0) {
|
||||
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_read);
|
||||
}
|
||||
};
|
||||
read_buffer();
|
||||
|
||||
const auto size = boost::endian::big_to_native(
|
||||
*reinterpret_cast<std::uint32_t *>(&buffer[0u]));
|
||||
buffer.resize(size);
|
||||
|
||||
read_buffer();
|
||||
response = std::move(buffer);
|
||||
|
||||
auto ret = response.decrypt(encryption_token_);
|
||||
if (ret == 0) {
|
||||
ret = response.decode(c.nonce);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void packet_client::resolve() {
|
||||
if (resolve_results_.empty()) {
|
||||
resolve_results_ = tcp::resolver(io_context_)
|
||||
.resolve({host_name_or_ip_, std::to_string(port_)});
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method,
|
||||
std::uint32_t &service_flags) -> packet::error_type {
|
||||
packet request;
|
||||
return send(method, request, service_flags);
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method, packet &request,
|
||||
std::uint32_t &service_flags) -> packet::error_type {
|
||||
packet response;
|
||||
return send(method, request, response, service_flags);
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method, packet &request,
|
||||
packet &response, std::uint32_t &service_flags)
|
||||
-> packet::error_type {
|
||||
auto success = false;
|
||||
packet::error_type ret = utils::from_api_error(api_error::error);
|
||||
request.encode_top(method);
|
||||
request.encode_top(utils::get_thread_id());
|
||||
request.encode_top(unique_id_);
|
||||
request.encode_top(PACKET_SERVICE_FLAGS);
|
||||
request.encode_top(get_repertory_version());
|
||||
|
||||
static const std::uint8_t max_attempts = 5u;
|
||||
for (std::uint8_t i = 1u;
|
||||
allow_connections_ && not success && (i <= max_attempts); i++) {
|
||||
auto c = get_client();
|
||||
if (c) {
|
||||
try {
|
||||
request.encode_top(c->nonce);
|
||||
request.encrypt(encryption_token_);
|
||||
|
||||
timeout request_timeout(
|
||||
[this, method, c]() {
|
||||
event_system::instance().raise<packet_client_timeout>("request",
|
||||
method);
|
||||
close(*c.get());
|
||||
},
|
||||
std::chrono::seconds(send_timeout_));
|
||||
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < request.get_size()) {
|
||||
const auto bytes_written = boost::asio::write(
|
||||
c->socket, boost::asio::buffer(&request[offset],
|
||||
request.get_size() - offset));
|
||||
if (bytes_written <= 0) {
|
||||
throw std::runtime_error("write failed|" +
|
||||
std::to_string(bytes_written));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_written);
|
||||
}
|
||||
request_timeout.disable();
|
||||
|
||||
timeout response_timeout(
|
||||
[this, method, c]() {
|
||||
event_system::instance().raise<packet_client_timeout>("response",
|
||||
method);
|
||||
close(*c.get());
|
||||
},
|
||||
std::chrono::seconds(receive_timeout_));
|
||||
|
||||
ret = read_packet(*c, response);
|
||||
response_timeout.disable();
|
||||
if (ret == 0) {
|
||||
if ((ret = response.decode(service_flags)) == 0) {
|
||||
packet::error_type res{};
|
||||
if ((ret = response.decode(res)) == 0) {
|
||||
ret = res;
|
||||
success = true;
|
||||
put_client(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
utils::error::raise_error(__FUNCTION__, e, "send failed");
|
||||
close_all();
|
||||
if (allow_connections_ && (i < max_attempts)) {
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not allow_connections_) {
|
||||
ret = utils::from_api_error(api_error::error);
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
|
||||
return CONVERT_STATUS_NOT_IMPLEMENTED(ret);
|
||||
}
|
||||
} // namespace repertory
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/packet_client.hpp"
|
||||
|
||||
#include "events/events.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
#include "utils/timeout.hpp"
|
||||
|
||||
namespace repertory {
|
||||
// clang-format off
|
||||
E_SIMPLE2(packet_client_timeout, error, true,
|
||||
std::string, event_name, en, E_STRING,
|
||||
std::string, message, msg, E_STRING
|
||||
);
|
||||
// clang-format on
|
||||
|
||||
packet_client::packet_client(std::string host_name_or_ip,
|
||||
std::uint8_t max_connections, std::uint16_t port,
|
||||
std::uint16_t receive_timeout,
|
||||
std::uint16_t send_timeout,
|
||||
std::string encryption_token)
|
||||
: io_context_(),
|
||||
host_name_or_ip_(std::move(host_name_or_ip)),
|
||||
max_connections_(max_connections ? max_connections : 20u),
|
||||
port_(port),
|
||||
receive_timeout_(receive_timeout),
|
||||
send_timeout_(send_timeout),
|
||||
encryption_token_(std::move(encryption_token)),
|
||||
unique_id_(utils::create_uuid_string()) {}
|
||||
|
||||
packet_client::~packet_client() {
|
||||
allow_connections_ = false;
|
||||
close_all();
|
||||
io_context_.stop();
|
||||
}
|
||||
|
||||
void packet_client::close(client &cli) const {
|
||||
try {
|
||||
boost::system::error_code ec;
|
||||
cli.socket.close(ec);
|
||||
} catch (...) {
|
||||
}
|
||||
}
|
||||
|
||||
void packet_client::close_all() {
|
||||
unique_mutex_lock clients_lock(clients_mutex_);
|
||||
for (auto &c : clients_) {
|
||||
close(*c.get());
|
||||
}
|
||||
|
||||
clients_.clear();
|
||||
unique_id_ = utils::create_uuid_string();
|
||||
}
|
||||
|
||||
void packet_client::connect(client &c) {
|
||||
try {
|
||||
resolve();
|
||||
boost::asio::connect(c.socket, resolve_results_);
|
||||
c.socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
||||
c.socket.set_option(boost::asio::socket_base::linger(false, 0));
|
||||
|
||||
packet response;
|
||||
const auto res = read_packet(c, response);
|
||||
if (res != 0) {
|
||||
throw std::runtime_error(std::to_string(res));
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
utils::error::raise_error(__FUNCTION__, e, "connection handshake failed");
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::get_client() -> std::shared_ptr<packet_client::client> {
|
||||
std::shared_ptr<client> ret;
|
||||
|
||||
unique_mutex_lock clients_lock(clients_mutex_);
|
||||
if (allow_connections_) {
|
||||
if (clients_.empty()) {
|
||||
clients_lock.unlock();
|
||||
ret = std::make_shared<client>(io_context_);
|
||||
connect(*ret);
|
||||
} else {
|
||||
ret = clients_[0u];
|
||||
utils::remove_element_from(clients_, ret);
|
||||
clients_lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void packet_client::put_client(std::shared_ptr<client> &c) {
|
||||
mutex_lock clientsLock(clients_mutex_);
|
||||
if (clients_.size() < max_connections_) {
|
||||
clients_.emplace_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::read_packet(client &c, packet &response)
|
||||
-> packet::error_type {
|
||||
data_buffer buffer(sizeof(std::uint32_t));
|
||||
const auto read_buffer = [&]() {
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < buffer.size()) {
|
||||
const auto bytes_read = boost::asio::read(
|
||||
c.socket,
|
||||
boost::asio::buffer(&buffer[offset], buffer.size() - offset));
|
||||
if (bytes_read <= 0) {
|
||||
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_read);
|
||||
}
|
||||
};
|
||||
read_buffer();
|
||||
|
||||
const auto size = boost::endian::big_to_native(
|
||||
*reinterpret_cast<std::uint32_t *>(&buffer[0u]));
|
||||
buffer.resize(size);
|
||||
|
||||
read_buffer();
|
||||
response = std::move(buffer);
|
||||
|
||||
auto ret = response.decrypt(encryption_token_);
|
||||
if (ret == 0) {
|
||||
ret = response.decode(c.nonce);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void packet_client::resolve() {
|
||||
if (resolve_results_.empty()) {
|
||||
resolve_results_ = tcp::resolver(io_context_)
|
||||
.resolve({host_name_or_ip_, std::to_string(port_)});
|
||||
}
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method,
|
||||
std::uint32_t &service_flags) -> packet::error_type {
|
||||
packet request;
|
||||
return send(method, request, service_flags);
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method, packet &request,
|
||||
std::uint32_t &service_flags) -> packet::error_type {
|
||||
packet response;
|
||||
return send(method, request, response, service_flags);
|
||||
}
|
||||
|
||||
auto packet_client::send(const std::string &method, packet &request,
|
||||
packet &response, std::uint32_t &service_flags)
|
||||
-> packet::error_type {
|
||||
auto success = false;
|
||||
packet::error_type ret = utils::from_api_error(api_error::error);
|
||||
request.encode_top(method);
|
||||
request.encode_top(utils::get_thread_id());
|
||||
request.encode_top(unique_id_);
|
||||
request.encode_top(PACKET_SERVICE_FLAGS);
|
||||
request.encode_top(get_repertory_version());
|
||||
|
||||
static const std::uint8_t max_attempts = 5u;
|
||||
for (std::uint8_t i = 1u;
|
||||
allow_connections_ && not success && (i <= max_attempts); i++) {
|
||||
auto c = get_client();
|
||||
if (c) {
|
||||
try {
|
||||
request.encode_top(c->nonce);
|
||||
request.encrypt(encryption_token_);
|
||||
|
||||
timeout request_timeout(
|
||||
[this, method, c]() {
|
||||
event_system::instance().raise<packet_client_timeout>("request",
|
||||
method);
|
||||
close(*c.get());
|
||||
},
|
||||
std::chrono::seconds(send_timeout_));
|
||||
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < request.get_size()) {
|
||||
const auto bytes_written = boost::asio::write(
|
||||
c->socket, boost::asio::buffer(&request[offset],
|
||||
request.get_size() - offset));
|
||||
if (bytes_written <= 0) {
|
||||
throw std::runtime_error("write failed|" +
|
||||
std::to_string(bytes_written));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_written);
|
||||
}
|
||||
request_timeout.disable();
|
||||
|
||||
timeout response_timeout(
|
||||
[this, method, c]() {
|
||||
event_system::instance().raise<packet_client_timeout>("response",
|
||||
method);
|
||||
close(*c.get());
|
||||
},
|
||||
std::chrono::seconds(receive_timeout_));
|
||||
|
||||
ret = read_packet(*c, response);
|
||||
response_timeout.disable();
|
||||
if (ret == 0) {
|
||||
if ((ret = response.decode(service_flags)) == 0) {
|
||||
packet::error_type res{};
|
||||
if ((ret = response.decode(res)) == 0) {
|
||||
ret = res;
|
||||
success = true;
|
||||
put_client(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
utils::error::raise_error(__FUNCTION__, e, "send failed");
|
||||
close_all();
|
||||
if (allow_connections_ && (i < max_attempts)) {
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (not allow_connections_) {
|
||||
ret = utils::from_api_error(api_error::error);
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
|
||||
return CONVERT_STATUS_NOT_IMPLEMENTED(ret);
|
||||
}
|
||||
} // namespace repertory
|
||||
|
||||
@@ -1,250 +1,250 @@
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/packet_server.hpp"
|
||||
|
||||
#include "comm/packet/packet.hpp"
|
||||
#include "events/event_system.hpp"
|
||||
#include "events/events.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
#include "utils/string_utils.hpp"
|
||||
#include "utils/utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
using std::thread;
|
||||
|
||||
packet_server::packet_server(std::uint16_t port, std::string token,
|
||||
std::uint8_t pool_size, closed_callback closed,
|
||||
message_handler_callback message_handler)
|
||||
: encryption_token_(std::move(token)),
|
||||
closed_(std::move(closed)),
|
||||
message_handler_(std::move(message_handler)) {
|
||||
initialize(port, pool_size);
|
||||
event_system::instance().raise<service_started>("packet_server");
|
||||
}
|
||||
|
||||
packet_server::~packet_server() {
|
||||
event_system::instance().raise<service_shutdown_begin>("packet_server");
|
||||
std::thread([this]() {
|
||||
for (std::size_t i = 0u; i < service_threads_.size(); i++) {
|
||||
io_context_.stop();
|
||||
}
|
||||
}).detach();
|
||||
|
||||
server_thread_->join();
|
||||
server_thread_.reset();
|
||||
event_system::instance().raise<service_shutdown_end>("packet_server");
|
||||
}
|
||||
|
||||
void packet_server::add_client(connection &c, const std::string &client_id) {
|
||||
c.client_id = client_id;
|
||||
|
||||
recur_mutex_lock connection_lock(connection_mutex_);
|
||||
if (connection_lookup_.find(client_id) == connection_lookup_.end()) {
|
||||
connection_lookup_[client_id] = 1u;
|
||||
} else {
|
||||
connection_lookup_[client_id]++;
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::initialize(const uint16_t &port, uint8_t pool_size) {
|
||||
pool_size = std::max(uint8_t(1u), pool_size);
|
||||
server_thread_ = std::make_unique<std::thread>([this, port, pool_size]() {
|
||||
tcp::acceptor acceptor(io_context_);
|
||||
try {
|
||||
const auto endpoint = tcp::endpoint(tcp::v4(), port);
|
||||
acceptor.open(endpoint.protocol());
|
||||
acceptor.set_option(socket_base::reuse_address(true));
|
||||
acceptor.bind(endpoint);
|
||||
acceptor.listen();
|
||||
} catch (const std::exception &e) {
|
||||
repertory::utils::error::raise_error(__FUNCTION__, e,
|
||||
"exception occurred");
|
||||
}
|
||||
listen_for_connection(acceptor);
|
||||
|
||||
for (std::uint8_t i = 0u; i < pool_size; i++) {
|
||||
service_threads_.emplace_back([this]() { io_context_.run(); });
|
||||
}
|
||||
|
||||
for (auto &th : service_threads_) {
|
||||
th.join();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void packet_server::listen_for_connection(tcp::acceptor &acceptor) {
|
||||
auto c = std::make_shared<packet_server::connection>(io_context_, acceptor);
|
||||
acceptor.async_accept(c->socket,
|
||||
boost::bind(&packet_server::on_accept, this, c,
|
||||
boost::asio::placeholders::error));
|
||||
}
|
||||
|
||||
void packet_server::on_accept(std::shared_ptr<connection> c,
|
||||
boost::system::error_code ec) {
|
||||
listen_for_connection(c->acceptor);
|
||||
if (ec) {
|
||||
utils::error::raise_error(__FUNCTION__, ec.message());
|
||||
std::this_thread::sleep_for(1s);
|
||||
} else {
|
||||
c->socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
||||
c->socket.set_option(boost::asio::socket_base::linger(false, 0));
|
||||
|
||||
c->generate_nonce();
|
||||
|
||||
packet response;
|
||||
send_response(c, 0, response);
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::read_header(std::shared_ptr<connection> c) {
|
||||
static const std::string function_name = __FUNCTION__;
|
||||
|
||||
c->buffer.resize(sizeof(std::uint32_t));
|
||||
boost::asio::async_read(
|
||||
c->socket, boost::asio::buffer(&c->buffer[0u], c->buffer.size()),
|
||||
[this, c](boost::system::error_code ec, std::size_t) {
|
||||
if (ec) {
|
||||
remove_client(*c);
|
||||
repertory::utils::error::raise_error(function_name, ec.message());
|
||||
} else {
|
||||
auto to_read = *reinterpret_cast<std::uint32_t *>(&c->buffer[0u]);
|
||||
boost::endian::big_to_native_inplace(to_read);
|
||||
read_packet(c, to_read);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void packet_server::read_packet(std::shared_ptr<connection> c,
|
||||
std::uint32_t data_size) {
|
||||
try {
|
||||
const auto read_buffer = [&]() {
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < c->buffer.size()) {
|
||||
const auto bytes_read = boost::asio::read(
|
||||
c->socket,
|
||||
boost::asio::buffer(&c->buffer[offset], c->buffer.size() - offset));
|
||||
if (bytes_read <= 0) {
|
||||
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_read);
|
||||
}
|
||||
};
|
||||
|
||||
auto should_send_response = true;
|
||||
auto response = std::make_shared<packet>();
|
||||
c->buffer.resize(data_size);
|
||||
read_buffer();
|
||||
|
||||
packet::error_type ret;
|
||||
auto request = std::make_shared<packet>(c->buffer);
|
||||
if (request->decrypt(encryption_token_) == 0) {
|
||||
std::string nonce;
|
||||
if ((ret = request->decode(nonce)) == 0) {
|
||||
if (nonce != c->nonce) {
|
||||
throw std::runtime_error("invalid nonce");
|
||||
}
|
||||
c->generate_nonce();
|
||||
|
||||
std::string version;
|
||||
if ((ret = request->decode(version)) == 0) {
|
||||
if (utils::compare_version_strings(
|
||||
version, REPERTORY_MIN_REMOTE_VERSION) >= 0) {
|
||||
std::uint32_t service_flags = 0u;
|
||||
DECODE_OR_IGNORE(request, service_flags);
|
||||
|
||||
std::string client_id;
|
||||
DECODE_OR_IGNORE(request, client_id);
|
||||
|
||||
std::uint64_t thread_id = 0u;
|
||||
DECODE_OR_IGNORE(request, thread_id);
|
||||
|
||||
std::string method;
|
||||
DECODE_OR_IGNORE(request, method);
|
||||
|
||||
if (ret == 0) {
|
||||
if (c->client_id.empty()) {
|
||||
add_client(*c, client_id);
|
||||
}
|
||||
|
||||
should_send_response = false;
|
||||
message_handler_(service_flags, client_id, thread_id, method,
|
||||
request.get(), *response,
|
||||
[this, c, request,
|
||||
response](const packet::error_type &result) {
|
||||
this->send_response(c, result, *response);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
ret = utils::from_api_error(api_error::incompatible_version);
|
||||
}
|
||||
} else {
|
||||
ret = utils::from_api_error(api_error::invalid_version);
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("invalid nonce");
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("decryption failed");
|
||||
}
|
||||
if (should_send_response) {
|
||||
send_response(c, ret, *response);
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
remove_client(*c);
|
||||
utils::error::raise_error(__FUNCTION__, e, "exception occurred");
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::remove_client(connection &c) {
|
||||
if (not c.client_id.empty()) {
|
||||
recur_mutex_lock connection_lock(connection_mutex_);
|
||||
if (not --connection_lookup_[c.client_id]) {
|
||||
connection_lookup_.erase(c.client_id);
|
||||
closed_(c.client_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::send_response(std::shared_ptr<connection> c,
|
||||
const packet::error_type &result,
|
||||
packet &response) {
|
||||
static const std::string function_name = __FUNCTION__;
|
||||
|
||||
response.encode_top(result);
|
||||
response.encode_top(PACKET_SERVICE_FLAGS);
|
||||
response.encode_top(c->nonce);
|
||||
response.encrypt(encryption_token_);
|
||||
response.transfer_into(c->buffer);
|
||||
|
||||
boost::asio::async_write(
|
||||
c->socket, boost::asio::buffer(c->buffer),
|
||||
[this, c](boost::system::error_code ec, std::size_t /*length*/) {
|
||||
if (ec) {
|
||||
remove_client(*c);
|
||||
utils::error::raise_error(function_name, ec.message());
|
||||
} else {
|
||||
read_header(c);
|
||||
}
|
||||
});
|
||||
}
|
||||
} // namespace repertory
|
||||
/*
|
||||
Copyright <2018-2023> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "comm/packet/packet_server.hpp"
|
||||
|
||||
#include "comm/packet/packet.hpp"
|
||||
#include "events/event_system.hpp"
|
||||
#include "events/events.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
#include "utils/string_utils.hpp"
|
||||
#include "utils/utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
using std::thread;
|
||||
|
||||
packet_server::packet_server(std::uint16_t port, std::string token,
|
||||
std::uint8_t pool_size, closed_callback closed,
|
||||
message_handler_callback message_handler)
|
||||
: encryption_token_(std::move(token)),
|
||||
closed_(std::move(closed)),
|
||||
message_handler_(std::move(message_handler)) {
|
||||
initialize(port, pool_size);
|
||||
event_system::instance().raise<service_started>("packet_server");
|
||||
}
|
||||
|
||||
packet_server::~packet_server() {
|
||||
event_system::instance().raise<service_shutdown_begin>("packet_server");
|
||||
std::thread([this]() {
|
||||
for (std::size_t i = 0u; i < service_threads_.size(); i++) {
|
||||
io_context_.stop();
|
||||
}
|
||||
}).detach();
|
||||
|
||||
server_thread_->join();
|
||||
server_thread_.reset();
|
||||
event_system::instance().raise<service_shutdown_end>("packet_server");
|
||||
}
|
||||
|
||||
void packet_server::add_client(connection &c, const std::string &client_id) {
|
||||
c.client_id = client_id;
|
||||
|
||||
recur_mutex_lock connection_lock(connection_mutex_);
|
||||
if (connection_lookup_.find(client_id) == connection_lookup_.end()) {
|
||||
connection_lookup_[client_id] = 1u;
|
||||
} else {
|
||||
connection_lookup_[client_id]++;
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::initialize(const uint16_t &port, uint8_t pool_size) {
|
||||
pool_size = std::max(uint8_t(1u), pool_size);
|
||||
server_thread_ = std::make_unique<std::thread>([this, port, pool_size]() {
|
||||
tcp::acceptor acceptor(io_context_);
|
||||
try {
|
||||
const auto endpoint = tcp::endpoint(tcp::v4(), port);
|
||||
acceptor.open(endpoint.protocol());
|
||||
acceptor.set_option(socket_base::reuse_address(true));
|
||||
acceptor.bind(endpoint);
|
||||
acceptor.listen();
|
||||
} catch (const std::exception &e) {
|
||||
repertory::utils::error::raise_error(__FUNCTION__, e,
|
||||
"exception occurred");
|
||||
}
|
||||
listen_for_connection(acceptor);
|
||||
|
||||
for (std::uint8_t i = 0u; i < pool_size; i++) {
|
||||
service_threads_.emplace_back([this]() { io_context_.run(); });
|
||||
}
|
||||
|
||||
for (auto &th : service_threads_) {
|
||||
th.join();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void packet_server::listen_for_connection(tcp::acceptor &acceptor) {
|
||||
auto c = std::make_shared<packet_server::connection>(io_context_, acceptor);
|
||||
acceptor.async_accept(c->socket,
|
||||
boost::bind(&packet_server::on_accept, this, c,
|
||||
boost::asio::placeholders::error));
|
||||
}
|
||||
|
||||
void packet_server::on_accept(std::shared_ptr<connection> c,
|
||||
boost::system::error_code ec) {
|
||||
listen_for_connection(c->acceptor);
|
||||
if (ec) {
|
||||
utils::error::raise_error(__FUNCTION__, ec.message());
|
||||
std::this_thread::sleep_for(1s);
|
||||
} else {
|
||||
c->socket.set_option(boost::asio::ip::tcp::no_delay(true));
|
||||
c->socket.set_option(boost::asio::socket_base::linger(false, 0));
|
||||
|
||||
c->generate_nonce();
|
||||
|
||||
packet response;
|
||||
send_response(c, 0, response);
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::read_header(std::shared_ptr<connection> c) {
|
||||
static const std::string function_name = __FUNCTION__;
|
||||
|
||||
c->buffer.resize(sizeof(std::uint32_t));
|
||||
boost::asio::async_read(
|
||||
c->socket, boost::asio::buffer(&c->buffer[0u], c->buffer.size()),
|
||||
[this, c](boost::system::error_code ec, std::size_t) {
|
||||
if (ec) {
|
||||
remove_client(*c);
|
||||
repertory::utils::error::raise_error(function_name, ec.message());
|
||||
} else {
|
||||
auto to_read = *reinterpret_cast<std::uint32_t *>(&c->buffer[0u]);
|
||||
boost::endian::big_to_native_inplace(to_read);
|
||||
read_packet(c, to_read);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void packet_server::read_packet(std::shared_ptr<connection> c,
|
||||
std::uint32_t data_size) {
|
||||
try {
|
||||
const auto read_buffer = [&]() {
|
||||
std::uint32_t offset = 0u;
|
||||
while (offset < c->buffer.size()) {
|
||||
const auto bytes_read = boost::asio::read(
|
||||
c->socket,
|
||||
boost::asio::buffer(&c->buffer[offset], c->buffer.size() - offset));
|
||||
if (bytes_read <= 0) {
|
||||
throw std::runtime_error("read failed|" + std::to_string(bytes_read));
|
||||
}
|
||||
offset += static_cast<std::uint32_t>(bytes_read);
|
||||
}
|
||||
};
|
||||
|
||||
auto should_send_response = true;
|
||||
auto response = std::make_shared<packet>();
|
||||
c->buffer.resize(data_size);
|
||||
read_buffer();
|
||||
|
||||
packet::error_type ret;
|
||||
auto request = std::make_shared<packet>(c->buffer);
|
||||
if (request->decrypt(encryption_token_) == 0) {
|
||||
std::string nonce;
|
||||
if ((ret = request->decode(nonce)) == 0) {
|
||||
if (nonce != c->nonce) {
|
||||
throw std::runtime_error("invalid nonce");
|
||||
}
|
||||
c->generate_nonce();
|
||||
|
||||
std::string version;
|
||||
if ((ret = request->decode(version)) == 0) {
|
||||
if (utils::compare_version_strings(
|
||||
version, REPERTORY_MIN_REMOTE_VERSION) >= 0) {
|
||||
std::uint32_t service_flags = 0u;
|
||||
DECODE_OR_IGNORE(request, service_flags);
|
||||
|
||||
std::string client_id;
|
||||
DECODE_OR_IGNORE(request, client_id);
|
||||
|
||||
std::uint64_t thread_id = 0u;
|
||||
DECODE_OR_IGNORE(request, thread_id);
|
||||
|
||||
std::string method;
|
||||
DECODE_OR_IGNORE(request, method);
|
||||
|
||||
if (ret == 0) {
|
||||
if (c->client_id.empty()) {
|
||||
add_client(*c, client_id);
|
||||
}
|
||||
|
||||
should_send_response = false;
|
||||
message_handler_(service_flags, client_id, thread_id, method,
|
||||
request.get(), *response,
|
||||
[this, c, request,
|
||||
response](const packet::error_type &result) {
|
||||
this->send_response(c, result, *response);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
ret = utils::from_api_error(api_error::incompatible_version);
|
||||
}
|
||||
} else {
|
||||
ret = utils::from_api_error(api_error::invalid_version);
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("invalid nonce");
|
||||
}
|
||||
} else {
|
||||
throw std::runtime_error("decryption failed");
|
||||
}
|
||||
if (should_send_response) {
|
||||
send_response(c, ret, *response);
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
remove_client(*c);
|
||||
utils::error::raise_error(__FUNCTION__, e, "exception occurred");
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::remove_client(connection &c) {
|
||||
if (not c.client_id.empty()) {
|
||||
recur_mutex_lock connection_lock(connection_mutex_);
|
||||
if (not --connection_lookup_[c.client_id]) {
|
||||
connection_lookup_.erase(c.client_id);
|
||||
closed_(c.client_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void packet_server::send_response(std::shared_ptr<connection> c,
|
||||
const packet::error_type &result,
|
||||
packet &response) {
|
||||
static const std::string function_name = __FUNCTION__;
|
||||
|
||||
response.encode_top(result);
|
||||
response.encode_top(PACKET_SERVICE_FLAGS);
|
||||
response.encode_top(c->nonce);
|
||||
response.encrypt(encryption_token_);
|
||||
response.transfer_into(c->buffer);
|
||||
|
||||
boost::asio::async_write(
|
||||
c->socket, boost::asio::buffer(c->buffer),
|
||||
[this, c](boost::system::error_code ec, std::size_t /*length*/) {
|
||||
if (ec) {
|
||||
remove_client(*c);
|
||||
utils::error::raise_error(function_name, ec.message());
|
||||
} else {
|
||||
read_header(c);
|
||||
}
|
||||
});
|
||||
}
|
||||
} // namespace repertory
|
||||
|
||||
Reference in New Issue
Block a user