fix runaway thread
All checks were successful
BlockStorage/repertory_linux_builds/pipeline/head This commit looks good
BlockStorage/repertory_osx_builds/pipeline/head This commit looks good

This commit is contained in:
Scott E. Graves 2024-02-03 10:41:38 -06:00
parent 99533a9687
commit a024f81e5d
2 changed files with 59 additions and 49 deletions

View File

@ -1,5 +1,5 @@
#!/bin/bash #!/bin/bash
#
NUM_JOBS=${MY_NUM_JOBS} NUM_JOBS=${MY_NUM_JOBS}
if [[ -z "${NUM_JOBS}" ]]; then if [[ -z "${NUM_JOBS}" ]]; then
NUM_JOBS=$(getconf _NPROCESSORS_ONLN 2> /dev/null || getconf NPROCESSORS_ONLN 2> /dev/null || echo 1) NUM_JOBS=$(getconf _NPROCESSORS_ONLN 2> /dev/null || getconf NPROCESSORS_ONLN 2> /dev/null || echo 1)

View File

@ -1037,65 +1037,75 @@ void file_manager::upload_handler() {
constexpr const auto *function_name = static_cast<const char *>(__FUNCTION__); constexpr const auto *function_name = static_cast<const char *>(__FUNCTION__);
while (not stop_requested_) { while (not stop_requested_) {
auto should_wait{true};
unique_mutex_lock upload_lock(upload_mtx_); unique_mutex_lock upload_lock(upload_mtx_);
if (not stop_requested_) { if (stop_requested_) {
if (upload_lookup_.size() < config_.get_max_upload_count()) { upload_notify_.notify_all();
auto result = db::db_select{*db_.get(), upload_table} continue;
.order_by("api_path", true) }
.limit(1)
.go();
try {
std::optional<db::db_select::row> row;
if (result.get_row(row) && row.has_value()) {
auto api_path =
row->get_column("api_path").get_value<std::string>();
auto source_path =
row->get_column("source_path").get_value<std::string>();
filesystem_item fsi{}; if (upload_lookup_.size() < config_.get_max_upload_count()) {
auto res = provider_.get_filesystem_item(api_path, false, fsi); auto result = db::db_select{*db_.get(), upload_table}
switch (res) { .order_by("api_path", true)
case api_error::item_not_found: { .limit(1)
event_system::instance().raise<file_upload_not_found>( .go();
api_path, source_path); try {
remove_upload(api_path, true); std::optional<db::db_select::row> row;
} break; if (result.get_row(row) && row.has_value()) {
auto api_path = row->get_column("api_path").get_value<std::string>();
auto source_path =
row->get_column("source_path").get_value<std::string>();
case api_error::success: { filesystem_item fsi{};
upload_lookup_[fsi.api_path] = auto res = provider_.get_filesystem_item(api_path, false, fsi);
std::make_unique<upload>(fsi, provider_); switch (res) {
auto del_res = db::db_select{*db_.get(), upload_table} case api_error::item_not_found: {
.delete_query() should_wait = false;
.where("api_path") //
.equals(api_path) event_system::instance().raise<file_upload_not_found>(api_path,
source_path);
remove_upload(api_path, true);
} break;
case api_error::success: {
should_wait = false;
upload_lookup_[fsi.api_path] =
std::make_unique<upload>(fsi, provider_);
auto del_res = db::db_select{*db_.get(), upload_table}
.delete_query()
.where("api_path")
.equals(api_path)
.go();
if (del_res.ok()) {
auto ins_res = db::db_insert{*db_.get(), upload_active_table}
.column_value("api_path", api_path)
.column_value("source_path", source_path)
.go(); .go();
if (del_res.ok()) { if (not ins_res.ok()) {
auto ins_res = db::db_insert{*db_.get(), upload_active_table} utils::error::raise_api_path_error(
.column_value("api_path", api_path) function_name, api_path, source_path,
.column_value("source_path", source_path) "failed to add to upload_active table");
.go();
if (not ins_res.ok()) {
utils::error::raise_api_path_error(
function_name, api_path, source_path,
"failed to add to upload_active table");
}
} }
} break;
default: {
event_system::instance().raise<file_upload_retry>(
api_path, source_path, res);
queue_upload(api_path, source_path, true);
upload_notify_.wait_for(upload_lock, 5s);
} break;
} }
} break;
default: {
event_system::instance().raise<file_upload_retry>(api_path,
source_path, res);
queue_upload(api_path, source_path, true);
} break;
} }
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
} }
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
} }
} }
if (should_wait) {
upload_notify_.wait_for(upload_lock, 5s);
}
upload_notify_.notify_all(); upload_notify_.notify_all();
} }
} }