This commit is contained in:
Scott E. Graves 2024-12-07 07:12:52 -06:00
parent 18c5948e3f
commit f0ddbe7a8c
3 changed files with 34 additions and 26 deletions

View File

@ -23,16 +23,12 @@
#define REPERTORY_INCLUDE_UTILS_TASKS_HPP_ #define REPERTORY_INCLUDE_UTILS_TASKS_HPP_
#include "common.hpp" #include "common.hpp"
#include "types/repertory.hpp"
#include <memory>
namespace repertory { namespace repertory {
class app_config; class app_config;
class tasks final { class tasks final {
public: public:
static constexpr const auto default_delay_ms{10U};
struct task final { struct task final {
std::function<void(const stop_type &task_stopped)> action; std::function<void(const stop_type &task_stopped)> action;
}; };
@ -47,7 +43,17 @@ public:
using task_ptr = std::shared_ptr<i_task>; using task_ptr = std::shared_ptr<i_task>;
private: private:
class task_wait : public i_task { class task_wait final : public i_task {
public:
task_wait() = default;
task_wait(const task_wait &) = delete;
task_wait(task_wait &&) = delete;
~task_wait() override { set_result(false); }
auto operator=(const task_wait &) -> task_wait & = delete;
auto operator=(task_wait &&) -> task_wait & = delete;
private: private:
bool complete{false}; bool complete{false};
mutable std::mutex mtx; mutable std::mutex mtx;
@ -63,10 +69,6 @@ private:
struct scheduled_task final { struct scheduled_task final {
task item; task item;
std::uint16_t delay_ms{
default_delay_ms,
};
std::shared_ptr<task_wait> wait{ std::shared_ptr<task_wait> wait{
std::make_shared<task_wait>(), std::make_shared<task_wait>(),
}; };

View File

@ -507,6 +507,7 @@ void base_provider::process_removed_files(std::deque<removed_item> removed_list,
} }
void base_provider::process_removed_items(const stop_type &stop_requested) { void base_provider::process_removed_items(const stop_type &stop_requested) {
event_system::instance().raise<polling_item_begin>("duh");
auto list = db3_->get_api_path_list(); auto list = db3_->get_api_path_list();
[[maybe_unused]] auto res = [[maybe_unused]] auto res =
std::all_of(list.begin(), list.end(), [&](auto &&api_path) -> bool { std::all_of(list.begin(), list.end(), [&](auto &&api_path) -> bool {
@ -515,7 +516,7 @@ void base_provider::process_removed_items(const stop_type &stop_requested) {
} }
tasks::instance().schedule({ tasks::instance().schedule({
[this, api_path](auto &&stop_requested2) { [this, api_path](auto &&task_stopped) {
api_meta_map meta{}; api_meta_map meta{};
if (get_item_meta(api_path, meta) != api_error::success) { if (get_item_meta(api_path, meta) != api_error::success) {
return; return;
@ -555,12 +556,13 @@ void base_provider::process_removed_items(const stop_type &stop_requested) {
{ {
removed_item{api_path, false, meta[META_SOURCE]}, removed_item{api_path, false, meta[META_SOURCE]},
}, },
stop_requested2); task_stopped);
}, },
}); });
return true; return not stop_requested;
}); });
event_system::instance().raise<polling_item_end>("duh");
} }
void base_provider::remove_deleted_items(const stop_type &stop_requested) { void base_provider::remove_deleted_items(const stop_type &stop_requested) {

View File

@ -48,21 +48,23 @@ auto tasks::task_wait::wait() const -> bool {
} }
auto tasks::schedule(task item) -> task_ptr { auto tasks::schedule(task item) -> task_ptr {
++count_;
while (not stop_requested_ && (count_ >= task_threads_.size())) {
std::this_thread::sleep_for(10ms);
}
scheduled_task runnable{item}; scheduled_task runnable{item};
unique_mutex_lock lock(mutex_); unique_mutex_lock lock(mutex_);
++count_; if (stop_requested_) {
notify_.notify_all();
return runnable.wait;
}
tasks_.push_back(runnable); tasks_.push_back(runnable);
notify_.notify_all(); notify_.notify_all();
lock.unlock(); lock.unlock();
while (not stop_requested_ && count_ >= (task_threads_.size() * 4U)) {
lock.lock();
notify_.wait(lock);
notify_.notify_all();
lock.unlock();
}
return runnable.wait; return runnable.wait;
} }
@ -72,9 +74,11 @@ void tasks::start(app_config *config) {
return; return;
} }
count_ = 0U;
config_ = config; config_ = config;
count_ = 0U;
stop_requested_ = false; stop_requested_ = false;
tasks_.clear();
for (std::uint32_t idx = 0U; idx < std::thread::hardware_concurrency(); for (std::uint32_t idx = 0U; idx < std::thread::hardware_concurrency();
++idx) { ++idx) {
task_threads_.emplace_back( task_threads_.emplace_back(
@ -95,6 +99,11 @@ void tasks::stop() {
lock.unlock(); lock.unlock();
task_threads_.clear(); task_threads_.clear();
lock.lock();
tasks_.clear();
notify_.notify_all();
lock.unlock();
} }
void tasks::task_thread() { void tasks::task_thread() {
@ -134,11 +143,6 @@ void tasks::task_thread() {
runnable.item.action(stop_requested_); runnable.item.action(stop_requested_);
runnable.wait->set_result(true); runnable.wait->set_result(true);
if (stop_requested_) {
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(runnable.delay_ms));
--count_; --count_;
} catch (const std::exception &e) { } catch (const std::exception &e) {
runnable.wait->set_result(false); runnable.wait->set_result(false);