refactor
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
Scott E. Graves 2024-11-30 18:57:42 -06:00
parent e1939d4d11
commit 4c70641d8f
2 changed files with 18 additions and 17 deletions

View File

@ -68,7 +68,7 @@ void packet_server::add_client(connection &conn, const std::string &client_id) {
void packet_server::initialize(const uint16_t &port, uint8_t pool_size) { void packet_server::initialize(const uint16_t &port, uint8_t pool_size) {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
pool_size = std::max(uint8_t(1U), pool_size); pool_size = std::max(std::uint8_t(1U), pool_size);
server_thread_ = std::make_unique<std::thread>([this, port, pool_size]() { server_thread_ = std::make_unique<std::thread>([this, port, pool_size]() {
tcp::acceptor acceptor(io_context_); tcp::acceptor acceptor(io_context_);
try { try {

View File

@ -30,27 +30,28 @@ void polling::frequency_thread(
std::function<std::uint32_t()> get_frequency_seconds, frequency freq) { std::function<std::uint32_t()> get_frequency_seconds, frequency freq) {
while (not stop_requested_) { while (not stop_requested_) {
std::deque<std::future<void>> futures; std::deque<std::future<void>> futures;
unique_mutex_lock l(mutex_); unique_mutex_lock lock(mutex_);
if (not stop_requested_ && if (not stop_requested_ &&
notify_.wait_for(l, std::chrono::seconds(get_frequency_seconds())) == notify_.wait_for(lock, std::chrono::seconds(get_frequency_seconds())) ==
std::cv_status::timeout) { std::cv_status::timeout) {
for (const auto &kv : items_) { for (const auto &item : items_) {
if (kv.second.freq == freq) { if (item.second.freq == freq) {
futures.emplace_back( futures.emplace_back(
std::async(std::launch::async, [this, &freq, kv]() -> void { std::async(std::launch::async, [this, &freq, item]() -> void {
if (config_->get_event_level() == event_level::trace || if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) { freq != frequency::second) {
event_system::instance().raise<polling_item_begin>(kv.first); event_system::instance().raise<polling_item_begin>(
item.first);
} }
kv.second.action(); item.second.action();
if (config_->get_event_level() == event_level::trace || if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) { freq != frequency::second) {
event_system::instance().raise<polling_item_end>(kv.first); event_system::instance().raise<polling_item_end>(item.first);
} }
})); }));
} }
} }
l.unlock(); lock.unlock();
while (not futures.empty()) { while (not futures.empty()) {
futures.front().wait(); futures.front().wait();
@ -61,17 +62,17 @@ void polling::frequency_thread(
} }
void polling::remove_callback(const std::string &name) { void polling::remove_callback(const std::string &name) {
mutex_lock l(mutex_); mutex_lock lock(mutex_);
items_.erase(name); items_.erase(name);
} }
void polling::set_callback(const polling_item &pi) { void polling::set_callback(const polling_item &item) {
mutex_lock l(mutex_); mutex_lock lock(mutex_);
items_[pi.name] = pi; items_[item.name] = item;
} }
void polling::start(app_config *config) { void polling::start(app_config *config) {
mutex_lock l(start_stop_mutex_); mutex_lock lock(start_stop_mutex_);
if (not high_frequency_thread_) { if (not high_frequency_thread_) {
event_system::instance().raise<service_started>("polling"); event_system::instance().raise<service_started>("polling");
config_ = config; config_ = config;
@ -100,12 +101,12 @@ void polling::start(app_config *config) {
void polling::stop() { void polling::stop() {
if (high_frequency_thread_) { if (high_frequency_thread_) {
event_system::instance().raise<service_shutdown_begin>("polling"); event_system::instance().raise<service_shutdown_begin>("polling");
mutex_lock l(start_stop_mutex_); mutex_lock lock(start_stop_mutex_);
if (high_frequency_thread_) { if (high_frequency_thread_) {
{ {
stop_requested_ = true; stop_requested_ = true;
mutex_lock l2(mutex_); mutex_lock lock2(mutex_);
notify_.notify_all(); notify_.notify_all();
} }
high_frequency_thread_->join(); high_frequency_thread_->join();