Refactored polling to be more accurate on scheduling tasks

This commit is contained in:
Scott E. Graves 2025-02-14 08:35:12 -06:00
parent fc573e165b
commit 1b4c1db44d
2 changed files with 41 additions and 30 deletions

View File

@ -13,6 +13,7 @@
### Changes from v2.0.3-rc
* Continue documentation updates
* Refactored polling to be more accurate on scheduling tasks
## v2.0.3-rc

View File

@ -38,47 +38,57 @@ void polling::frequency_thread(
std::function<std::uint32_t()> get_frequency_seconds, frequency freq) {
REPERTORY_USES_FUNCTION_NAME();
auto last_run = std::chrono::system_clock::time_point::min();
while (not get_stop_requested()) {
unique_mutex_lock lock(mutex_);
auto futures = std::accumulate(
items_.begin(), items_.end(), std::deque<tasks::task_ptr>{},
[this, &freq](auto &&list, auto &&item) -> auto {
if (item.second.freq != freq) {
auto elapsed = std::chrono::system_clock::now() - last_run;
auto max_elapsed = std::chrono::seconds(get_frequency_seconds());
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed) >=
max_elapsed) {
unique_mutex_lock lock(mutex_);
auto futures = std::accumulate(
items_.begin(), items_.end(), std::deque<tasks::task_ptr>{},
[this, &freq](auto &&list, auto &&item) -> auto {
if (item.second.freq != freq) {
return list;
}
auto future = tasks::instance().schedule({
[this, &freq, item](auto &&task_stopped) {
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_begin>(
function_name, item.first);
}
item.second.action(task_stopped);
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_end>(
function_name, item.first);
}
},
});
list.emplace_back(future);
return list;
}
auto future = tasks::instance().schedule({
[this, &freq, item](auto &&task_stopped) {
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_begin>(
function_name, item.first);
}
item.second.action(task_stopped);
if (config_->get_event_level() == event_level::trace ||
freq != frequency::second) {
event_system::instance().raise<polling_item_end>(
function_name, item.first);
}
},
});
lock.unlock();
list.emplace_back(future);
return list;
});
lock.unlock();
while (not futures.empty()) {
futures.front()->wait();
futures.pop_front();
}
while (not futures.empty()) {
futures.front()->wait();
futures.pop_front();
last_run = std::chrono::system_clock::now();
elapsed = std::chrono::seconds(0U);
}
unique_mutex_lock lock(mutex_);
if (get_stop_requested()) {
return;
}
lock.lock();
notify_.wait_for(lock, std::chrono::seconds(get_frequency_seconds()));
notify_.wait_for(lock, max_elapsed - elapsed);
}
}