Improving libcron performance (#9)

Co-authored-by: Heinz-Peter Liechtenecker <h.liechtenecker@fh-kaernten.at>
Co-authored-by: Per Malmberg <PerMalmberg@users.noreply.github.com>
This commit is contained in:
Heinz-Peter Liechtenecker 2020-09-26 13:32:54 +02:00 committed by GitHub
parent 7ef39558a1
commit f3fddf5f19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 481 additions and 92 deletions

View File

@ -23,6 +23,8 @@ while(true)
} }
``` ```
In case there is a lot of time between you call `add_schedule` and `tick`, you can call `recalculate_schedule`.
The callback must have the following signature: The callback must have the following signature:
``` ```
@ -45,6 +47,46 @@ cron.add_schedule("Hello from Cron", "* * * * * ?", [=](auto& i) {
}); });
``` ```
- `libcron::TaskInformation::get_name` gives you the name of the current Task. This allows to add attach the same callback to multiple schedules:
```
libcron::Cron cron;
auto f = [](auto& i) {
if (i.get_name() == "Task 1")
{
do_work_task_1();
}
else if (i.get_name() == "Task 2")
{
do_work_task_2();
}
};
cron.add_schedule("Task 1", "* * * * * ?", f);
cron.add_schedule("Task 2", "* * * * * ?", f);
```
## Adding multiple tasks with individual schedules at once
libcron::cron::add_schedule needs to sort the underlying container each time you add a schedule. To improve performance when adding many tasks by only sorting once, there is a convinient way to pass either a `std::map<std::string, std::string>`, a `std::vector<std::pair<std::string, std::string>>`, a `std::vector<std::tuple<std::string, std::string>>` or a `std::unordered_map<std::string, std::string>` to `add_schedule`, where the first element corresponds to the task name and the second element to the task schedule. Only if all schedules in the container are valid, they will be added to `libcron::Cron`. The return type is a `std::tuple<bool, std::string, std::string>`, where the boolean is `true` if the schedules have been added or false otherwise. If the schedules have not been added, the second element in the tuple corresponds to the task-name with the given invalid schedule. If there are multiple invalid schedules in the container, `add_schedule` will abort at the first invalid element:
```
std::map<std::string, std::string> name_schedule_map;
for(int i = 1; i <= 1000; i++)
{
name_schedule_map["Task-" + std::to_string(i)] = "* * * * * ?";
}
name_schedule_map["Task-1000"] = "invalid";
auto res = c1.add_schedule(name_schedule_map, [](auto&) { });
if (std::get<0>(res) == false)
{
std::cout << "Task " << std::get<1>(res)
<< "has an invalid schedule: "
<< std::get<2>(res) << std::endl;
}
```
## Removing schedules from `libcron::Cron` ## Removing schedules from `libcron::Cron`

View File

@ -3,8 +3,15 @@ project(libcron)
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
# Deactivate Iterator-Debugging on Windows
option(LIBCRON_DEACTIVATE_ITERATOR_DEBUGGING "Build with iterator-debugging (MSVC only)." OFF)
if( MSVC ) if( MSVC )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4")
if (LIBCRON_DEACTIVATE_ITERATOR_DEBUGGING)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_HAS_ITERATOR_DEBUGGING=0")
endif()
else() else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic")
endif() endif()

View File

@ -2,11 +2,14 @@
#include <string> #include <string>
#include <chrono> #include <chrono>
#include <queue>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <map>
#include <unordered_map>
#include <vector>
#include "Task.h" #include "Task.h"
#include "CronClock.h" #include "CronClock.h"
#include "TaskQueue.h"
namespace libcron namespace libcron
{ {
@ -32,11 +35,16 @@ namespace libcron
template<typename ClockType, typename LockType> template<typename ClockType, typename LockType>
std::ostream& operator<<(std::ostream& stream, const Cron<ClockType, LockType>& c); std::ostream& operator<<(std::ostream& stream, const Cron<ClockType, LockType>& c);
template<typename ClockType = libcron::LocalClock, typename LockType = libcron::NullLock> template<typename ClockType = libcron::LocalClock,
typename LockType = libcron::NullLock>
class Cron class Cron
{ {
public: public:
bool add_schedule(std::string name, const std::string& schedule, Task::TaskFunction work); bool add_schedule(std::string name, const std::string& schedule, Task::TaskFunction work);
template<typename Schedules = std::map<std::string, std::string>>
std::tuple<bool, std::string, std::string>
add_schedule(const Schedules& name_schedule_map, Task::TaskFunction work);
void clear_schedules(); void clear_schedules();
void remove_schedule(const std::string& name); void remove_schedule(const std::string& name);
@ -63,75 +71,23 @@ namespace libcron
return clock; return clock;
} }
void recalculate_schedule()
{
for (auto& t : tasks.get_tasks())
{
using namespace std::chrono_literals;
// Ensure that next schedule is in the future
t.calculate_next(clock.now() + 1s);
}
}
void get_time_until_expiry_for_tasks( void get_time_until_expiry_for_tasks(
std::vector<std::tuple<std::string, std::chrono::system_clock::duration>>& status) const; std::vector<std::tuple<std::string, std::chrono::system_clock::duration>>& status) const;
friend std::ostream& operator<<<>(std::ostream& stream, const Cron<ClockType, LockType>& c); friend std::ostream& operator<<<>(std::ostream& stream, const Cron<ClockType, LockType>& c);
private: private:
class Queue TaskQueue<LockType> tasks{};
// Priority queue placing smallest (i.e. nearest in time) items on top.
: public std::priority_queue<Task, std::vector<Task>, std::greater<>>
{
public:
// Inherit to allow access to the container.
const std::vector<Task>& get_tasks() const
{
return c;
}
std::vector<Task>& get_tasks()
{
return c;
}
void clear()
{
lock.lock();
while (!empty())
pop();
lock.unlock();
}
template<typename T>
void remove(T to_remove)
{
lock.lock();
/* Copy current elements */
std::vector<Task> temp{};
std::swap(temp, c);
/* Refill with elements ensuring correct order by calling push */
for (const auto& task : temp)
{
if (task != to_remove)
push(task);
}
lock.unlock();
}
void lock_queue()
{
/* Do not allow to manipulate the Queue */
lock.lock();
}
void release_queue()
{
/* Allow Access to the Queue Manipulating-Functions */
lock.unlock();
}
private:
LockType lock;
};
Queue tasks{};
ClockType clock{}; ClockType clock{};
bool first_tick = true; bool first_tick = true;
std::chrono::system_clock::time_point last_tick{}; std::chrono::system_clock::time_point last_tick{};
@ -149,6 +105,7 @@ namespace libcron
if (t.calculate_next(clock.now())) if (t.calculate_next(clock.now()))
{ {
tasks.push(t); tasks.push(t);
tasks.sort();
} }
tasks.release_queue(); tasks.release_queue();
} }
@ -156,6 +113,50 @@ namespace libcron
return res; return res;
} }
template<typename ClockType, typename LockType>
template<typename Schedules>
std::tuple<bool, std::string, std::string>
Cron<ClockType, LockType>::add_schedule(const Schedules& name_schedule_map, Task::TaskFunction work)
{
bool is_valid = true;
std::tuple<bool, std::string, std::string> res{false, "", ""};
std::vector<Task> tasks_to_add;
tasks_to_add.reserve(name_schedule_map.size());
for (auto it = name_schedule_map.begin(); is_valid && it != name_schedule_map.end(); ++it)
{
const auto& [name, schedule] = *it;
auto cron = CronData::create(schedule);
is_valid = cron.is_valid();
if (is_valid)
{
Task t{std::move(name), CronSchedule{cron}, work };
if (t.calculate_next(clock.now()))
{
tasks_to_add.push_back(std::move(t));
}
}
else
{
std::get<1>(res) = name;
std::get<2>(res) = schedule;
}
}
// Only add tasks and sort once if all elements in the map where valid
if (is_valid && tasks_to_add.size() > 0)
{
tasks.lock_queue();
tasks.push(tasks_to_add);
tasks.sort();
tasks.release_queue();
}
std::get<0>(res) = is_valid;
return res;
}
template<typename ClockType, typename LockType> template<typename ClockType, typename LockType>
void Cron<ClockType, LockType>::clear_schedules() void Cron<ClockType, LockType>::clear_schedules()
{ {
@ -241,30 +242,31 @@ namespace libcron
last_tick = now; last_tick = now;
std::vector<Task> executed{}; if (!tasks.empty())
while (!tasks.empty()
&& tasks.top().is_expired(now))
{ {
executed.push_back(tasks.top()); for (size_t i = 0; i < tasks.size(); i++)
tasks.pop(); {
auto& t = executed[executed.size() - 1]; if (tasks.at(i).is_expired(now))
{
auto& t = tasks.at(i);
t.execute(now); t.execute(now);
}
res = executed.size();
// Place executed tasks back onto the priority queue.
std::for_each(executed.begin(), executed.end(), [this, &now](Task& task)
{
// Must calculate new schedules using second after 'now', otherwise
// we'll run the same task over and over if it takes less than 1s to execute.
using namespace std::chrono_literals; using namespace std::chrono_literals;
if (task.calculate_next(now + 1s)) if (!t.calculate_next(now + 1s))
{ {
tasks.push(task); tasks.remove(t);
}
res++;
}
}
// Only sort if at least one task was executed
if (res > 0)
{
tasks.sort();
}
} }
});
tasks.release_queue(); tasks.release_queue();
return res; return res;

View File

@ -4,6 +4,7 @@
#include <regex> #include <regex>
#include <string> #include <string>
#include <vector> #include <vector>
#include <unordered_map>
#include <libcron/TimeTypes.h> #include <libcron/TimeTypes.h>
namespace libcron namespace libcron
@ -126,6 +127,7 @@ namespace libcron
static const std::vector<std::string> month_names; static const std::vector<std::string> month_names;
static const std::vector<std::string> day_names; static const std::vector<std::string> day_names;
static std::unordered_map<std::string, CronData> cache;
template<typename T> template<typename T>
void add_full_range(std::set<T>& set); void add_full_range(std::set<T>& set);

View File

@ -13,6 +13,7 @@ namespace libcron
public: public:
virtual ~TaskInformation() = default; virtual ~TaskInformation() = default;
virtual std::chrono::system_clock::duration get_delay() const = 0; virtual std::chrono::system_clock::duration get_delay() const = 0;
virtual std::string get_name() const = 0;
}; };
class Task : public TaskInformation class Task : public TaskInformation
@ -50,12 +51,17 @@ namespace libcron
return next_schedule > other.next_schedule; return next_schedule > other.next_schedule;
} }
bool operator<(const Task& other) const
{
return next_schedule < other.next_schedule;
}
bool is_expired(std::chrono::system_clock::time_point now) const; bool is_expired(std::chrono::system_clock::time_point now) const;
std::chrono::system_clock::duration std::chrono::system_clock::duration
time_until_expiry(std::chrono::system_clock::time_point now) const; time_until_expiry(std::chrono::system_clock::time_point now) const;
std::string get_name() const std::string get_name() const override
{ {
return name; return name;
} }

View File

@ -0,0 +1,115 @@
#pragma once
#include <vector>
#include <map>
#include <unordered_map>
#include <vector>
#include "Task.h"
namespace libcron
{
template<typename LockType>
class TaskQueue
{
public:
const std::vector<Task>& get_tasks() const
{
return c;
}
std::vector<Task>& get_tasks()
{
return c;
}
size_t size() const noexcept
{
return c.size();
}
bool empty() const noexcept
{
return c.empty();
}
void push(Task& t)
{
c.push_back(std::move(t));
}
void push(Task&& t)
{
c.push_back(std::move(t));
}
void push(std::vector<Task>& tasks_to_insert)
{
c.reserve(c.size() + tasks_to_insert.size());
c.insert(c.end(), std::make_move_iterator(tasks_to_insert.begin()), std::make_move_iterator(tasks_to_insert.end()));
}
const Task& top() const
{
return c[0];
}
Task& at(const size_t i)
{
return c[i];
}
void sort()
{
std::sort(c.begin(), c.end(), std::less<>());
}
void clear()
{
lock.lock();
c.clear();
lock.unlock();
}
void remove(Task& to_remove)
{
auto it = std::find_if(c.begin(), c.end(), [&to_remove] (const Task& to_compare) {
return to_remove.get_name() == to_compare;
});
if (it != c.end())
{
c.erase(it);
}
}
void remove(std::string to_remove)
{
lock.lock();
auto it = std::find_if(c.begin(), c.end(), [&to_remove] (const Task& to_compare) {
return to_remove == to_compare;
});
if (it != c.end())
{
c.erase(it);
}
lock.unlock();
}
void lock_queue()
{
/* Do not allow to manipulate the Queue */
lock.lock();
}
void release_queue()
{
/* Allow Access to the Queue Manipulating-Functions */
lock.unlock();
}
private:
LockType lock;
std::vector<Task> c;
};
}

View File

@ -15,11 +15,22 @@ namespace libcron
const std::vector<std::string> CronData::month_names{ "JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC" }; const std::vector<std::string> CronData::month_names{ "JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC" };
const std::vector<std::string> CronData::day_names{ "SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT" }; const std::vector<std::string> CronData::day_names{ "SUN", "MON", "TUE", "WED", "THU", "FRI", "SAT" };
std::unordered_map<std::string, CronData> CronData::cache{};
CronData CronData::create(const std::string& cron_expression) CronData CronData::create(const std::string& cron_expression)
{ {
CronData c; CronData c;
auto found = cache.find(cron_expression);
if (found == cache.end())
{
c.parse(cron_expression); c.parse(cron_expression);
cache[cron_expression] = c;
}
else
{
c = found->second;
}
return c; return c;
} }

View File

@ -3,8 +3,15 @@ project(cron_test)
set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD 17)
# Deactivate Iterator-Debugging on Windows
option(LIBCRON_DEACTIVATE_ITERATOR_DEBUGGING "Build with iterator-debugging (MSVC only)." OFF)
if( MSVC ) if( MSVC )
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /W4")
if (LIBCRON_DEACTIVATE_ITERATOR_DEBUGGING)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_HAS_ITERATOR_DEBUGGING=0")
endif()
else() else()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra -Wpedantic")
endif() endif()

View File

@ -1,5 +1,6 @@
#include <catch.hpp> #include <catch.hpp>
#include <libcron/include/libcron/Cron.h> #include <libcron/include/libcron/Cron.h>
#include <libcron/externals/date/include/date/date.h>
#include <thread> #include <thread>
#include <iostream> #include <iostream>
@ -209,6 +210,7 @@ SCENARIO("Task priority")
} }
AND_WHEN("Waiting based on the time given by the Cron instance") AND_WHEN("Waiting based on the time given by the Cron instance")
{ {
auto msec = std::chrono::duration_cast<std::chrono::milliseconds>(c.time_until_next());
std::this_thread::sleep_for(c.time_until_next()); std::this_thread::sleep_for(c.time_until_next());
c.tick(); c.tick();
@ -446,3 +448,198 @@ SCENARIO("Tasks can be added and removed from the scheduler")
} }
} }
} }
SCENARIO("Testing CRON-Tick Performance")
{
GIVEN("A Cron instance with no task")
{
using namespace std::chrono_literals;
using clock = std::chrono::high_resolution_clock;
using std::chrono::milliseconds;
using std::chrono::duration_cast;
Cron<TestClock> c1{};
auto& cron_clock1 = c1.get_clock();
Cron<TestClock> c2{};
auto& cron_clock2 = c2.get_clock();
Cron<TestClock> c3{};
auto& cron_clock3 = c3.get_clock();
int count1 = 0;
int count2 = 0;
int count3 = 0;
WHEN("Creating 1000 CronData Objects")
{
std::string cron_job = "* * * * * ?";
auto begin_cron_data = clock::now();
for(int i = 1; i <= 1000; i++)
{
auto cron = CronData::create(cron_job);
}
auto end_cron_data = clock::now();
auto msec_cron_data = duration_cast<milliseconds>(end_cron_data - begin_cron_data);
// Hopefully Creating a lot of Cron Objects does not take more than a second
REQUIRE(msec_cron_data <= 1000ms);
}
WHEN("Adding 1000 Tasks where with an invalid CRON-String with std::map<std::string, std::string>")
{
std::map<std::string, std::string> name_schedule_map;
for(int i = 1; i <= 1000; i++)
{
name_schedule_map["Task-" + std::to_string(i)] = "* * * * * ?";
}
name_schedule_map["Task-1000"] = "invalid";
auto res = c1.add_schedule(name_schedule_map,
[](auto&) { });
REQUIRE_FALSE(std::get<0>(res));
REQUIRE(std::get<1>(res) == "Task-1000");
REQUIRE(std::get<2>(res) == "invalid");
}
WHEN("Adding a std::vector<std::tuple<std::string, std::string>>")
{
std::vector<std::tuple<std::string, std::string>> name_schedule_map;
for(int i = 1; i <= 1000; i++)
{
name_schedule_map.push_back(std::make_tuple("Task-" + std::to_string(i), "* * * * * ?"));
}
auto res = c1.add_schedule(name_schedule_map,
[](auto&) { });
REQUIRE(std::get<0>(res));
}
WHEN("Adding a std::vector<std::pair<std::string, std::string>>")
{
std::vector<std::pair<std::string, std::string>> name_schedule_map;
for(int i = 1; i <= 1000; i++)
{
name_schedule_map.push_back(std::make_pair("Task-" + std::to_string(i), "* * * * * ?"));
}
auto res = c1.add_schedule(name_schedule_map,
[](auto&) { });
REQUIRE(std::get<0>(res));
}
WHEN("Adding a std::unordered_map<std::string, std::string>")
{
std::unordered_map<std::string, std::string> name_schedule_map;
for(int i = 1; i <= 1000; i++)
{
name_schedule_map["Task-" + std::to_string(i)] = "* * * * * ?";
}
auto res = c1.add_schedule(name_schedule_map,
[](auto&) { });
REQUIRE(std::get<0>(res));
}
WHEN("Adding 1000 Tasks to two Cron-Objects expiring after 1 second calling add_schedule")
{
auto begin_add_sequential = clock::now();
for(int i = 1; i <= 1000; i++)
{
REQUIRE(c1.add_schedule("Task-" + std::to_string(i), "* * * * * ?",
[&count1](auto&)
{
count1++;
})
);
REQUIRE(c2.add_schedule("Task-" + std::to_string(i), "* * * * * ?",
[&count2](auto&)
{
count2++;
})
);
}
auto end_add_sequential = clock::now();
std::map<std::string, std::string> name_schedule_map{};
for(int i = 1; i <= 1000; i++)
{
name_schedule_map["Task-" + std::to_string(i)] = "* * * * * ?";
}
auto begin_add_batch = clock::now();
REQUIRE(std::get<0>(c3.add_schedule(name_schedule_map,
[&count3](auto&)
{
count3++;
}))
);
auto end_add_batch = clock::now();
auto time_sequential = duration_cast<milliseconds>(end_add_sequential - begin_add_sequential)/2;
auto time_batch = duration_cast<milliseconds>(end_add_batch - begin_add_batch);
// This should hopefully take only a few second?
REQUIRE(time_sequential < 10000ms);
REQUIRE(time_batch < 5000ms);
REQUIRE(time_batch < time_sequential);
REQUIRE(c1.count() == 1000);
REQUIRE(c2.count() == 1000);
REQUIRE(c3.count() == 1000);
THEN("Execute all Tasks 10 Times")
{
milliseconds msec1;
auto t1 = std::thread([&cron_clock1, &c1, &msec1]() {
auto begin_tick = clock::now();
for(auto i = 0; i < 10; ++i)
{
cron_clock1.add(std::chrono::seconds{1});
c1.tick();
}
auto end_tick = clock::now();
msec1 = duration_cast<milliseconds>(end_tick - begin_tick)/10;
});
milliseconds msec2;
auto t2 = std::thread([&cron_clock2, &c2, &msec2]() {
auto begin_tick = clock::now();
for(auto i = 0; i < 10; ++i)
{
cron_clock2.add(std::chrono::seconds{1});
c2.tick();
}
auto end_tick = clock::now();
msec2 = duration_cast<milliseconds>(end_tick - begin_tick)/10;
});
milliseconds msec3;
auto t3 = std::thread([&cron_clock3, &c3, &msec3]() {
auto begin_tick = clock::now();
for(auto i = 0; i < 10; ++i)
{
cron_clock3.add(std::chrono::seconds{1});
c3.tick();
}
auto end_tick = clock::now();
msec3 = duration_cast<milliseconds>(end_tick - begin_tick)/10;
});
t1.join();
t2.join();
t3.join();
REQUIRE(count1 == 10000);
REQUIRE(count2 == 10000);
REQUIRE(count3 == 10000);
// Hopefully executing a more or less empty task does only take some milliseconds
REQUIRE(msec1 <= 10ms);
REQUIRE(msec2 <= 10ms);
REQUIRE(msec3 <= 10ms);
}
}
}
}