refactor
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
Scott E. Graves 2024-12-06 13:55:05 -06:00
parent 1e2fd53b86
commit 4a7c76cc1c
4 changed files with 93 additions and 52 deletions

View File

@ -29,8 +29,11 @@ class app_config;
class tasks final {
public:
static constexpr const auto default_delay_ms{10U};
struct task_item final {
std::function<void(const stop_type &stop_requested)> action;
std::uint16_t delay_ms{default_delay_ms};
};
public:
@ -52,6 +55,7 @@ public:
private:
app_config *config_{nullptr};
std::atomic<std::uint64_t> count_{0U};
std::mutex mutex_;
std::condition_variable notify_;
std::mutex start_stop_mutex_;

View File

@ -49,8 +49,8 @@ void base_provider::add_all_items(const stop_type &stop_requested) {
}
auto base_provider::create_api_file(std::string path, std::string key,
std::uint64_t size, std::uint64_t file_time)
-> api_file {
std::uint64_t size,
std::uint64_t file_time) -> api_file {
api_file file{};
file.api_path = utils::path::create_api_path(path);
file.api_parent = utils::path::get_parent_api_path(file.api_path);
@ -82,8 +82,8 @@ auto base_provider::create_api_file(std::string path, std::uint64_t size,
}
auto base_provider::create_directory_clone_source_meta(
const std::string &source_api_path, const std::string &api_path)
-> api_error {
const std::string &source_api_path,
const std::string &api_path) -> api_error {
REPERTORY_USES_FUNCTION_NAME();
bool exists{};
@ -180,8 +180,8 @@ auto base_provider::create_directory(const std::string &api_path,
return set_item_meta(api_path, meta);
}
auto base_provider::create_file(const std::string &api_path, api_meta_map &meta)
-> api_error {
auto base_provider::create_file(const std::string &api_path,
api_meta_map &meta) -> api_error {
REPERTORY_USES_FUNCTION_NAME();
bool exists{};
@ -238,9 +238,8 @@ auto base_provider::create_file(const std::string &api_path, api_meta_map &meta)
return api_error::error;
}
auto base_provider::get_api_path_from_source(const std::string &source_path,
std::string &api_path) const
-> api_error {
auto base_provider::get_api_path_from_source(
const std::string &source_path, std::string &api_path) const -> api_error {
REPERTORY_USES_FUNCTION_NAME();
if (source_path.empty()) {
@ -253,9 +252,8 @@ auto base_provider::get_api_path_from_source(const std::string &source_path,
return db3_->get_api_path(source_path, api_path);
}
auto base_provider::get_directory_items(const std::string &api_path,
directory_item_list &list) const
-> api_error {
auto base_provider::get_directory_items(
const std::string &api_path, directory_item_list &list) const -> api_error {
REPERTORY_USES_FUNCTION_NAME();
bool exists{};
@ -319,10 +317,9 @@ auto base_provider::get_file_size(const std::string &api_path,
return api_error::success;
}
auto base_provider::get_filesystem_item(const std::string &api_path,
bool directory,
filesystem_item &fsi) const
-> api_error {
auto base_provider::get_filesystem_item(
const std::string &api_path, bool directory,
filesystem_item &fsi) const -> api_error {
bool exists{};
auto res = is_directory(api_path, exists);
if (res != api_error::success) {
@ -355,10 +352,9 @@ auto base_provider::get_filesystem_item(const std::string &api_path,
return api_error::success;
}
auto base_provider::get_filesystem_item_and_file(const std::string &api_path,
api_file &file,
filesystem_item &fsi) const
-> api_error {
auto base_provider::get_filesystem_item_and_file(
const std::string &api_path, api_file &file,
filesystem_item &fsi) const -> api_error {
auto res = get_file(api_path, file);
if (res != api_error::success) {
return res;
@ -526,23 +522,25 @@ void base_provider::process_removed_items(const stop_type &stop_requested) {
}
if (utils::string::to_bool(meta[META_DIRECTORY])) {
bool exists{};
if (is_directory(api_path, exists) != api_error::success) {
return;
}
if (exists) {
return;
}
process_removed_directories(
{
removed_item{api_path, true, ""},
},
stop_requested2);
return;
}
// bool exists{};
// if (is_directory(api_path, exists) != api_error::success) {
// return;
// }
//
// if (exists) {
// return;
// }
//
// // process_removed_directories(
// // {
// // removed_item{api_path, true, ""},
// // },
// // stop_requested2);
//
// return;
// }
bool exists{};
if (is_file(api_path, exists) != api_error::success) {

View File

@ -163,6 +163,8 @@ auto s3_provider::create_file_extra(const std::string &api_path,
auto s3_provider::create_path_directories(
const std::string &api_path, const std::string &key) const -> api_error {
REPERTORY_USES_FUNCTION_NAME();
if (api_path == "/") {
return api_error::success;
}
@ -177,6 +179,8 @@ auto s3_provider::create_path_directories(
return api_error::error;
}
auto cfg = get_config().get_s3_config();
std::string cur_key{'/'};
std::string cur_path{'/'};
for (std::size_t idx = 0U; idx < path_parts.size(); ++idx) {
@ -187,12 +191,43 @@ auto s3_provider::create_path_directories(
cur_path = utils::path::create_api_path(
utils::path::combine(cur_path, {path_parts.at(idx)}));
auto exists{false};
auto res = is_directory(cur_path, exists);
if (res != api_error::success) {
return res;
}
if (not exists) {
curl::requests::http_put_file put_file{};
put_file.allow_timeout = true;
put_file.aws_service = "aws:amz:" + cfg.region + ":s3";
put_file.path = (is_encrypted ? cur_key : cur_path) + '/';
stop_type stop_requested{false};
long response_code{};
if (not get_comm().make_request(put_file, response_code,
stop_requested)) {
utils::error::raise_api_path_error(function_name, cur_path,
api_error::comm_error,
"failed to create directory object");
return api_error::comm_error;
}
if (response_code != http_error_codes::ok) {
utils::error::raise_api_path_error(function_name, cur_path,
response_code,
"failed to create directory object");
return api_error::comm_error;
}
}
api_meta_map meta{};
auto res = get_item_meta(cur_path, meta);
res = get_item_meta(cur_path, meta);
if (res == api_error::item_not_found) {
auto dir = create_api_file(cur_path, cur_key, 0U,
get_last_modified(true, cur_path));
get_api_item_added()(true, dir);
continue;
}

View File

@ -28,15 +28,17 @@ tasks tasks::instance_;
void tasks::schedule(task_item task) {
unique_mutex_lock lock(mutex_);
while (not stop_requested_ && tasks_.size() > (task_threads_.size() * 4U)) {
notify_.wait(lock);
}
if (not stop_requested_) {
tasks_.push_back(std::move(task));
}
++count_;
tasks_.push_back(std::move(task));
notify_.notify_all();
lock.unlock();
while (not stop_requested_ && count_ >= (task_threads_.size() * 4U)) {
lock.lock();
notify_.wait(lock);
notify_.notify_all();
lock.unlock();
}
}
void tasks::start(app_config *config) {
@ -45,6 +47,7 @@ void tasks::start(app_config *config) {
return;
}
count_ = 0U;
config_ = config;
stop_requested_ = false;
for (std::uint32_t idx = 0U; idx < std::thread::hardware_concurrency();
@ -84,19 +87,15 @@ void tasks::task_thread() {
while (not stop_requested_) {
lock.lock();
while (not stop_requested_ && tasks_.empty()) {
notify_.wait(lock);
}
if (stop_requested_) {
release();
return;
}
if (tasks_.empty()) {
notify_.wait(lock);
if (stop_requested_) {
release();
return;
}
}
if (tasks_.empty()) {
release();
continue;
@ -108,9 +107,14 @@ void tasks::task_thread() {
try {
task.action(stop_requested_);
std::this_thread::sleep_for(std::chrono::milliseconds(task.delay_ms));
--count_;
} catch (const std::exception &e) {
utils::error::raise_error(function_name, e, "failed to execute task");
}
lock.lock();
release();
}
}
} // namespace repertory