Merge branch 'development' of https://git.fifthgrid.com/blockstorage/repertory into development
This commit is contained in:
		| @@ -1,5 +1,5 @@ | ||||
| #!/bin/bash | ||||
| # | ||||
|  | ||||
| NUM_JOBS=${MY_NUM_JOBS} | ||||
| if [[ -z "${NUM_JOBS}" ]]; then | ||||
|   NUM_JOBS=$(getconf _NPROCESSORS_ONLN 2> /dev/null || getconf NPROCESSORS_ONLN 2> /dev/null || echo 1) | ||||
|   | ||||
| @@ -1037,65 +1037,75 @@ void file_manager::upload_handler() { | ||||
|   constexpr const auto *function_name = static_cast<const char *>(__FUNCTION__); | ||||
|  | ||||
|   while (not stop_requested_) { | ||||
|     auto should_wait{true}; | ||||
|     unique_mutex_lock upload_lock(upload_mtx_); | ||||
|     if (not stop_requested_) { | ||||
|       if (upload_lookup_.size() < config_.get_max_upload_count()) { | ||||
|         auto result = db::db_select{*db_.get(), upload_table} | ||||
|                           .order_by("api_path", true) | ||||
|                           .limit(1) | ||||
|                           .go(); | ||||
|         try { | ||||
|           std::optional<db::db_select::row> row; | ||||
|           if (result.get_row(row) && row.has_value()) { | ||||
|             auto api_path = | ||||
|                 row->get_column("api_path").get_value<std::string>(); | ||||
|             auto source_path = | ||||
|                 row->get_column("source_path").get_value<std::string>(); | ||||
|     if (stop_requested_) { | ||||
|       upload_notify_.notify_all(); | ||||
|       continue; | ||||
|     } | ||||
|  | ||||
|             filesystem_item fsi{}; | ||||
|             auto res = provider_.get_filesystem_item(api_path, false, fsi); | ||||
|             switch (res) { | ||||
|             case api_error::item_not_found: { | ||||
|               event_system::instance().raise<file_upload_not_found>( | ||||
|                   api_path, source_path); | ||||
|               remove_upload(api_path, true); | ||||
|             } break; | ||||
|     if (upload_lookup_.size() < config_.get_max_upload_count()) { | ||||
|       auto result = db::db_select{*db_.get(), upload_table} | ||||
|                         .order_by("api_path", true) | ||||
|                         .limit(1) | ||||
|                         .go(); | ||||
|       try { | ||||
|         std::optional<db::db_select::row> row; | ||||
|         if (result.get_row(row) && row.has_value()) { | ||||
|           auto api_path = row->get_column("api_path").get_value<std::string>(); | ||||
|           auto source_path = | ||||
|               row->get_column("source_path").get_value<std::string>(); | ||||
|  | ||||
|             case api_error::success: { | ||||
|               upload_lookup_[fsi.api_path] = | ||||
|                   std::make_unique<upload>(fsi, provider_); | ||||
|               auto del_res = db::db_select{*db_.get(), upload_table} | ||||
|                                  .delete_query() | ||||
|                                  .where("api_path") | ||||
|                                  .equals(api_path) | ||||
|           filesystem_item fsi{}; | ||||
|           auto res = provider_.get_filesystem_item(api_path, false, fsi); | ||||
|           switch (res) { | ||||
|           case api_error::item_not_found: { | ||||
|             should_wait = false; | ||||
|             // | ||||
|             event_system::instance().raise<file_upload_not_found>(api_path, | ||||
|                                                                   source_path); | ||||
|             remove_upload(api_path, true); | ||||
|           } break; | ||||
|  | ||||
|           case api_error::success: { | ||||
|             should_wait = false; | ||||
|  | ||||
|             upload_lookup_[fsi.api_path] = | ||||
|                 std::make_unique<upload>(fsi, provider_); | ||||
|             auto del_res = db::db_select{*db_.get(), upload_table} | ||||
|                                .delete_query() | ||||
|                                .where("api_path") | ||||
|                                .equals(api_path) | ||||
|                                .go(); | ||||
|             if (del_res.ok()) { | ||||
|               auto ins_res = db::db_insert{*db_.get(), upload_active_table} | ||||
|                                  .column_value("api_path", api_path) | ||||
|                                  .column_value("source_path", source_path) | ||||
|                                  .go(); | ||||
|               if (del_res.ok()) { | ||||
|                 auto ins_res = db::db_insert{*db_.get(), upload_active_table} | ||||
|                                    .column_value("api_path", api_path) | ||||
|                                    .column_value("source_path", source_path) | ||||
|                                    .go(); | ||||
|                 if (not ins_res.ok()) { | ||||
|                   utils::error::raise_api_path_error( | ||||
|                       function_name, api_path, source_path, | ||||
|                       "failed to add to upload_active table"); | ||||
|                 } | ||||
|               if (not ins_res.ok()) { | ||||
|                 utils::error::raise_api_path_error( | ||||
|                     function_name, api_path, source_path, | ||||
|                     "failed to add to upload_active table"); | ||||
|               } | ||||
|             } break; | ||||
|  | ||||
|             default: { | ||||
|               event_system::instance().raise<file_upload_retry>( | ||||
|                   api_path, source_path, res); | ||||
|               queue_upload(api_path, source_path, true); | ||||
|               upload_notify_.wait_for(upload_lock, 5s); | ||||
|             } break; | ||||
|             } | ||||
|           } break; | ||||
|  | ||||
|           default: { | ||||
|             event_system::instance().raise<file_upload_retry>(api_path, | ||||
|                                                               source_path, res); | ||||
|             queue_upload(api_path, source_path, true); | ||||
|           } break; | ||||
|           } | ||||
|         } catch (const std::exception &ex) { | ||||
|           utils::error::raise_error(function_name, ex, "query error"); | ||||
|         } | ||||
|       } catch (const std::exception &ex) { | ||||
|         utils::error::raise_error(function_name, ex, "query error"); | ||||
|       } | ||||
|     } | ||||
|  | ||||
|     if (should_wait) { | ||||
|       upload_notify_.wait_for(upload_lock, 5s); | ||||
|     } | ||||
|  | ||||
|     upload_notify_.notify_all(); | ||||
|   } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user