This commit is contained in:
2025-01-22 12:58:02 -06:00
parent 29fb395758
commit 57a591c51e
2 changed files with 35 additions and 30 deletions

View File

@ -134,14 +134,12 @@ private:
const event_type &event) { const event_type &event) {
std::deque<std::future<void>> futures; std::deque<std::future<void>> futures;
recur_mutex_lock consumer_lock(consumer_mutex_); recur_mutex_lock consumer_lock(consumer_mutex_);
if (event_consumers_.find(name) != event_consumers_.end()) {
for (auto *consumer : event_consumers_[name]) { for (auto *consumer : event_consumers_[name]) {
futures.emplace_back( futures.emplace_back(
std::async(std::launch::async, [consumer, &event]() { std::async(std::launch::async, [consumer, &event]() {
consumer->notify_event(event); consumer->notify_event(event);
})); }));
} }
}
while (not futures.empty()) { while (not futures.empty()) {
futures.front().get(); futures.front().get();

View File

@ -38,7 +38,10 @@ void single_thread_service_base::notify_all() const {
void single_thread_service_base::start() { void single_thread_service_base::start() {
mutex_lock lock(mtx_); mutex_lock lock(mtx_);
if (not thread_) { if (thread_) {
return;
}
stop_requested_ = false; stop_requested_ = false;
on_start(); on_start();
thread_ = std::make_unique<std::thread>([this]() { thread_ = std::make_unique<std::thread>([this]() {
@ -47,24 +50,28 @@ void single_thread_service_base::start() {
service_function(); service_function();
} }
}); });
}
} }
void single_thread_service_base::stop() { void single_thread_service_base::stop() {
if (thread_) {
event_system::instance().raise<service_shutdown_begin>(service_name_);
unique_mutex_lock lock(mtx_); unique_mutex_lock lock(mtx_);
if (thread_) { if (not thread_) {
return;
}
event_system::instance().raise<service_shutdown_begin>(service_name_);
std::unique_ptr<std::thread> thread{nullptr};
std::swap(thread, thread_);
stop_requested_ = true; stop_requested_ = true;
notify_.notify_all(); notify_.notify_all();
lock.unlock(); lock.unlock();
thread_->join(); thread->join();
thread_.reset(); thread.reset();
on_stop(); on_stop();
}
event_system::instance().raise<service_shutdown_end>(service_name_); event_system::instance().raise<service_shutdown_end>(service_name_);
}
} }
} // namespace repertory } // namespace repertory