/* Copyright <2018-2023> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ #include "file_manager/file_manager.hpp" #include "app_config.hpp" #include "file_manager/events.hpp" #include "providers/i_provider.hpp" #include "types/repertory.hpp" #include "utils/encrypting_reader.hpp" #include "utils/error_utils.hpp" #include "utils/file_utils.hpp" #include "utils/path_utils.hpp" #include "utils/polling.hpp" #include "utils/rocksdb_utils.hpp" #include "utils/unix/unix_utils.hpp" namespace repertory { static auto create_resume_entry(const i_open_file &file) -> json { return { {"chunk_size", file.get_chunk_size()}, {"path", file.get_api_path()}, {"read_state", utils::string::from_dynamic_bitset(file.get_read_state())}, {"source", file.get_source_path()}, }; } static void restore_resume_entry(const json &resume_entry, std::string &api_path, std::size_t &chunk_size, boost::dynamic_bitset<> &read_state, std::string &source_path) { api_path = resume_entry["path"].get(); chunk_size = resume_entry["chunk_size"].get(); read_state = utils::string::to_dynamic_bitset( resume_entry["read_state"].get()); source_path = resume_entry["source"].get(); } file_manager::file_manager(app_config &config, i_provider &provider) : config_(config), provider_(provider) { if (not provider_.is_direct_only()) { auto families = std::vector(); families.emplace_back(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions()); families.emplace_back("upload", rocksdb::ColumnFamilyOptions()); families.emplace_back("upload_active", rocksdb::ColumnFamilyOptions()); auto handles = std::vector(); utils::db::create_rocksdb(config, "upload_db", families, handles, db_); std::size_t idx{}; default_family_ = handles[idx++]; upload_family_ = handles[idx++]; upload_active_family_ = handles[idx++]; E_SUBSCRIBE_EXACT(file_upload_completed, [this](const file_upload_completed &completed) { this->upload_completed(completed); }); } } file_manager::~file_manager() { stop(); E_CONSUMER_RELEASE(); db_.reset(); } void file_manager::close(std::uint64_t handle) { unique_recur_mutex_lock file_lock(open_file_mtx_); auto closeable_file = get_open_file_by_handle(handle); if (closeable_file) { closeable_file->remove(handle); if (closeable_file->can_close()) { auto api_path = closeable_file->get_api_path(); open_file_lookup_.erase(api_path); file_lock.unlock(); closeable_file->close(); } } } void file_manager::close_all(const std::string &api_path) { unique_recur_mutex_lock file_lock(open_file_mtx_); std::vector handles; auto iter = open_file_lookup_.find(api_path); if (iter == open_file_lookup_.end()) { return; } handles = iter->second->get_handles(); for (auto &handle : handles) { open_file_lookup_[api_path]->remove(handle); } auto file = open_file_lookup_.at(api_path); open_file_lookup_.erase(api_path); file_lock.unlock(); file->close(); } void file_manager::close_timed_out_files() { unique_recur_mutex_lock file_lock(open_file_mtx_); auto closeable_list = std::accumulate( open_file_lookup_.begin(), open_file_lookup_.end(), std::vector{}, [](auto items, const auto &item) -> auto { if (item.second->get_open_file_count() == 0U && item.second->can_close()) { items.emplace_back(item.first); } return items; }); std::vector> open_files{}; for (const auto &api_path : closeable_list) { auto closeable_file = open_file_lookup_.at(api_path); open_file_lookup_.erase(api_path); open_files.push_back(closeable_file); } closeable_list.clear(); file_lock.unlock(); for (auto &closeable_file : open_files) { closeable_file->close(); event_system::instance().raise( closeable_file->get_api_path()); } } auto file_manager::create(const std::string &api_path, api_meta_map &meta, open_file_data ofd, std::uint64_t &handle, std::shared_ptr &file) -> api_error { recur_mutex_lock file_lock(open_file_mtx_); auto res = provider_.create_file(api_path, meta); if (res != api_error::success && res != api_error::item_exists) { return res; } return open(api_path, false, ofd, handle, file); } auto file_manager::evict_file(const std::string &api_path) -> bool { if (provider_.is_direct_only()) { return false; } recur_mutex_lock open_lock(open_file_mtx_); if (is_processing(api_path)) { return false; } if (get_open_file_count(api_path) != 0U) { return false; } std::string pinned; auto res = provider_.get_item_meta(api_path, META_PINNED, pinned); if (res != api_error::success && res != api_error::item_not_found) { utils::error::raise_api_path_error(__FUNCTION__, api_path, res, "failed to get pinned status"); return false; } if (not pinned.empty() && utils::string::to_bool(pinned)) { return false; } std::string source_path{}; res = provider_.get_item_meta(api_path, META_SOURCE, source_path); if (res != api_error::success) { utils::error::raise_api_path_error(__FUNCTION__, api_path, res, "failed to get source path"); return false; } if (source_path.empty()) { return false; } auto removed = utils::file::retry_delete_file(source_path); if (removed) { event_system::instance().raise(api_path, source_path); } return removed; } auto file_manager::get_directory_items(const std::string &api_path) const -> directory_item_list { directory_item_list list{}; auto res = provider_.get_directory_items(api_path, list); if (res != api_error::success) { utils::error::raise_api_path_error(__FUNCTION__, api_path, res, "failed to get directory list"); } return list; } auto file_manager::get_next_handle() -> std::uint64_t { if (++next_handle_ == 0U) { next_handle_++; } return next_handle_; } auto file_manager::get_open_file_by_handle(std::uint64_t handle) const -> std::shared_ptr { auto iter = std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(), [&handle](const auto &item) -> bool { return item.second->has_handle(handle); }); return iter == open_file_lookup_.end() ? nullptr : iter->second; } auto file_manager::get_open_file_count(const std::string &api_path) const -> std::size_t { auto iter = open_file_lookup_.find(api_path); if (iter != open_file_lookup_.end()) { return iter->second->get_open_file_count(); } return 0U; } auto file_manager::get_open_file(std::uint64_t handle, bool write_supported, std::shared_ptr &file) -> bool { recur_mutex_lock open_lock(open_file_mtx_); auto file_ptr = get_open_file_by_handle(handle); if (not file_ptr) { return false; } if (write_supported && not file_ptr->is_write_supported()) { auto writeable_file = std::make_shared( utils::encryption::encrypting_reader::get_data_chunk_size(), config_.get_enable_chunk_download_timeout() ? config_.get_chunk_downloader_timeout_secs() : 0U, file_ptr->get_filesystem_item(), file_ptr->get_open_data(), provider_, *this); open_file_lookup_[file_ptr->get_api_path()] = writeable_file; file = writeable_file; return true; } file = file_ptr; return true; } auto file_manager::get_open_file_count() const -> std::size_t { recur_mutex_lock open_lock(open_file_mtx_); return open_file_lookup_.size(); } auto file_manager::get_open_files() const -> std::unordered_map { std::unordered_map ret; recur_mutex_lock open_lock(open_file_mtx_); for (const auto &item : open_file_lookup_) { ret[item.first] = item.second->get_open_file_count(); } return ret; } auto file_manager::get_open_handle_count() const -> std::size_t { recur_mutex_lock open_lock(open_file_mtx_); return std::accumulate( open_file_lookup_.begin(), open_file_lookup_.end(), std::size_t(0U), [](std::size_t count, const auto &item) -> std::size_t { return count + item.second->get_open_file_count(); }); } auto file_manager::get_stored_downloads() const -> std::vector { std::vector ret; if (not provider_.is_direct_only()) { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), default_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { ret.emplace_back(json::parse(iterator->value().ToString())); } } return ret; } auto file_manager::handle_file_rename(const std::string &from_api_path, const std::string &to_api_path) -> api_error { auto ret = api_error::file_in_use; if ((ret = provider_.rename_file(from_api_path, to_api_path)) == api_error::success) { swap_renamed_items(from_api_path, to_api_path); } return ret; } auto file_manager::has_no_open_file_handles() const -> bool { return get_open_handle_count() == 0U; } auto file_manager::is_processing(const std::string &api_path) const -> bool { if (provider_.is_direct_only()) { return false; } unique_mutex_lock upload_lock(upload_mtx_); if (upload_lookup_.find(api_path) != upload_lookup_.end()) { return true; } upload_lock.unlock(); { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); if (parts.at(1U) == api_path) { return true; } } } recur_mutex_lock open_lock(open_file_mtx_); auto iter = open_file_lookup_.find(api_path); if (iter != open_file_lookup_.end()) { return iter->second->is_modified() || not iter->second->is_complete(); } return false; } auto file_manager::open(const std::string &api_path, bool directory, const open_file_data &ofd, std::uint64_t &handle, std::shared_ptr &file) -> api_error { recur_mutex_lock open_lock(open_file_mtx_); return open(api_path, directory, ofd, handle, file, nullptr); } auto file_manager::open(const std::string &api_path, bool directory, const open_file_data &ofd, std::uint64_t &handle, std::shared_ptr &file, std::shared_ptr closeable_file) -> api_error { const auto create_and_add_handle = [&](std::shared_ptr cur_file) { handle = get_next_handle(); cur_file->add(handle, ofd); file = cur_file; }; auto iter = open_file_lookup_.find(api_path); if (iter != open_file_lookup_.end()) { create_and_add_handle(iter->second); return api_error::success; } filesystem_item fsi{}; auto res = provider_.get_filesystem_item(api_path, directory, fsi); if (res != api_error::success) { return res; } if (fsi.source_path.empty()) { fsi.source_path = utils::path::combine(config_.get_cache_directory(), {utils::create_uuid_string()}); if ((res = provider_.set_item_meta(fsi.api_path, META_SOURCE, fsi.source_path)) != api_error::success) { return res; } } if (not closeable_file) { closeable_file = std::make_shared( utils::encryption::encrypting_reader::get_data_chunk_size(), config_.get_enable_chunk_download_timeout() ? config_.get_chunk_downloader_timeout_secs() : 0U, fsi, provider_, *this); } open_file_lookup_[api_path] = closeable_file; create_and_add_handle(closeable_file); return api_error::success; } void file_manager::queue_upload(const i_open_file &file) { return queue_upload(file.get_api_path(), file.get_source_path(), false); } void file_manager::queue_upload(const std::string &api_path, const std::string &source_path, bool no_lock) { if (provider_.is_direct_only()) { return; } std::unique_ptr lock; if (not no_lock) { lock = std::make_unique(upload_mtx_); } remove_upload(api_path, true); auto res = db_->Put( rocksdb::WriteOptions(), upload_family_, std::to_string(utils::get_file_time_now()) + ":" + api_path, source_path); if (res.ok()) { remove_resume(api_path, source_path); event_system::instance().raise(api_path, source_path); } else { event_system::instance().raise(api_path, source_path, res.ToString()); } if (not no_lock) { upload_notify_.notify_all(); } } auto file_manager::remove_file(const std::string &api_path) -> api_error { recur_mutex_lock open_lock(open_file_mtx_); auto iter = open_file_lookup_.find(api_path); if (iter != open_file_lookup_.end() && iter->second->is_modified()) { return api_error::file_in_use; } filesystem_item fsi{}; auto res = provider_.get_filesystem_item(api_path, false, fsi); if (res != api_error::success) { return res; } remove_upload(api_path); close_all(api_path); if ((res = provider_.remove_file(api_path)) != api_error::success) { return res; } if (not utils::file::retry_delete_file(fsi.source_path)) { utils::error::raise_api_path_error( __FUNCTION__, fsi.api_path, fsi.source_path, utils::get_last_error_code(), "failed to delete source"); return api_error::success; } return api_error::success; } void file_manager::remove_resume(const std::string &api_path, const std::string &source_path) { auto res = db_->Delete(rocksdb::WriteOptions(), default_family_, api_path); if (res.ok()) { event_system::instance().raise(api_path, source_path); } else { utils::error::raise_api_path_error(__FUNCTION__, api_path, source_path, res.code(), "failed to remove resume entry"); } } void file_manager::remove_upload(const std::string &api_path) { remove_upload(api_path, false); } void file_manager::remove_upload(const std::string &api_path, bool no_lock) { if (provider_.is_direct_only()) { return; } std::unique_ptr lock; if (not no_lock) { lock = std::make_unique(upload_mtx_); } auto iter = upload_lookup_.find(api_path); if (iter == upload_lookup_.end()) { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); if (parts.at(1U) == api_path) { if (db_->Delete(rocksdb::WriteOptions(), upload_family_, iterator->key()) .ok()) { db_->Delete(rocksdb::WriteOptions(), upload_active_family_, iterator->key()); } event_system::instance().raise( api_path, iterator->value().ToString()); } } } else { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); if (parts.at(1U) == api_path) { db_->Delete(rocksdb::WriteOptions(), upload_active_family_, iterator->key()); } } event_system::instance().raise( api_path, iter->second->get_source_path()); iter->second->cancel(); upload_lookup_.erase(api_path); } if (not no_lock) { upload_notify_.notify_all(); } } void file_manager::swap_renamed_items(std::string from_api_path, std::string to_api_path) { const auto iter = open_file_lookup_.find(from_api_path); if (iter != open_file_lookup_.end()) { open_file_lookup_[to_api_path] = open_file_lookup_[from_api_path]; open_file_lookup_.erase(from_api_path); open_file_lookup_[to_api_path]->set_api_path(to_api_path); } } auto file_manager::rename_directory(const std::string &from_api_path, const std::string &to_api_path) -> api_error { if (not provider_.is_rename_supported()) { return api_error::not_implemented; } unique_recur_mutex_lock lock(open_file_mtx_); // Ensure source directory exists bool exists{}; auto res = provider_.is_directory(from_api_path, exists); if (res != api_error::success) { return res; } if (not exists) { return api_error::directory_not_found; } // Ensure destination directory does not exist res = provider_.is_directory(to_api_path, exists); if (res != api_error::success) { return res; } if (exists) { return api_error::directory_exists; } // Ensure destination is not a file res = provider_.is_file(from_api_path, exists); if (res != api_error::success) { return res; } if (exists) { return api_error::item_exists; } // Create destination directory res = provider_.create_directory_clone_source_meta(from_api_path, to_api_path); if (res != api_error::success) { return res; } directory_item_list list{}; res = provider_.get_directory_items(from_api_path, list); if (res != api_error::success) { return res; } // Rename all items - directories MUST BE returned first for (std::size_t i = 0U; (res == api_error::success) && (i < list.size()); i++) { const auto &api_path = list[i].api_path; if ((api_path != ".") && (api_path != "..")) { const auto old_api_path = api_path; const auto new_api_path = utils::path::create_api_path(utils::path::combine( to_api_path, {old_api_path.substr(from_api_path.size())})); res = list[i].directory ? rename_directory(old_api_path, new_api_path) : rename_file(old_api_path, new_api_path, false); } } if (res != api_error::success) { return res; } res = provider_.remove_directory(from_api_path); if (res != api_error::success) { return res; } swap_renamed_items(from_api_path, to_api_path); return api_error::success; } auto file_manager::rename_file(const std::string &from_api_path, const std::string &to_api_path, bool overwrite) -> api_error { if (not provider_.is_rename_supported()) { return api_error::not_implemented; } // Don't rename if paths are the same if (from_api_path == to_api_path) { return api_error::item_exists; } unique_recur_mutex_lock lock(open_file_mtx_); // Don't rename if source is directory bool exists{}; auto res = provider_.is_directory(from_api_path, exists); if (res != api_error::success) { return res; } if (exists) { return api_error::directory_exists; } // Don't rename if source does not exist res = provider_.is_file(from_api_path, exists); if (res != api_error::success) { return res; } if (not exists) { return api_error::item_not_found; } // Don't rename if destination is directory res = provider_.is_directory(to_api_path, exists); if (res != api_error::success) { return res; } if (exists) { res = api_error::directory_exists; } // Check allow overwrite if file exists bool dest_exists{}; res = provider_.is_file(to_api_path, dest_exists); if (res != api_error::success) { return res; } if (not overwrite && dest_exists) { return api_error::item_exists; } // Don't rename if destination file has open handles if (get_open_file_count(to_api_path) != 0U) { return api_error::file_in_use; } // Don't rename if destination file is uploading or downloading if (is_processing(to_api_path)) { return api_error::file_in_use; } // Handle destination file exists (should overwrite) if (dest_exists) { filesystem_item fsi{}; res = provider_.get_filesystem_item(to_api_path, false, fsi); if (res != api_error::success) { return res; } std::uint64_t file_size{}; if (not utils::file::get_file_size(fsi.source_path, file_size)) { return api_error::os_error; } res = provider_.remove_file(to_api_path); if ((res == api_error::success) || (res == api_error::item_not_found)) { if (not utils::file::retry_delete_file(fsi.source_path)) { utils::error::raise_api_path_error( __FUNCTION__, fsi.api_path, fsi.source_path, utils::get_last_error_code(), "failed to delete source path"); } return handle_file_rename(from_api_path, to_api_path); } return res; } // Check destination parent directory exists res = provider_.is_directory(utils::path::get_parent_api_path(to_api_path), exists); if (res != api_error::success) { return res; } if (not exists) { return api_error::directory_not_found; } return handle_file_rename(from_api_path, to_api_path); } void file_manager::start() { if (provider_.is_direct_only()) { stop_requested_ = false; return; } if (not upload_thread_) { stop_requested_ = false; struct active_item { std::string api_path; std::string source_path; }; std::vector active_items{}; auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); active_items.emplace_back( active_item{parts.at(1U), iterator->value().ToString()}); } for (const auto &active_item : active_items) { queue_upload(active_item.api_path, active_item.source_path, false); } active_items.clear(); iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), default_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { std::string api_path; std::string source_path; std::size_t chunk_size{}; boost::dynamic_bitset<> read_state; restore_resume_entry(json::parse(iterator->value().ToString()), api_path, chunk_size, read_state, source_path); filesystem_item fsi{}; auto res = provider_.get_filesystem_item(api_path, false, fsi); if (res == api_error::success) { if (source_path == fsi.source_path) { std::uint64_t file_size{}; if (utils::file::get_file_size(fsi.source_path, file_size)) { if (file_size == fsi.size) { auto closeable_file = std::make_shared( chunk_size, config_.get_enable_chunk_download_timeout() ? config_.get_chunk_downloader_timeout_secs() : 0U, fsi, provider_, read_state, *this); open_file_lookup_[api_path] = closeable_file; event_system::instance().raise( fsi.api_path, fsi.source_path); } else { event_system::instance().raise( fsi.api_path, fsi.source_path, "file size mismatch|expected|" + std::to_string(fsi.size) + "|actual|" + std::to_string(file_size)); } } else { event_system::instance().raise( fsi.api_path, fsi.source_path, "failed to get file size: " + std::to_string(utils::get_last_error_code())); } } else { event_system::instance().raise( fsi.api_path, fsi.source_path, "source path mismatch|expected|" + source_path + "|actual|" + fsi.source_path); } } else { event_system::instance().raise( api_path, source_path, "failed to get filesystem item|" + api_error_to_string(res)); } } upload_thread_ = std::make_unique([this] { upload_handler(); }); polling::instance().set_callback( {"timed_out_close", polling::frequency::second, [this]() { this->close_timed_out_files(); }}); event_system::instance().raise("file_manager"); } } void file_manager::stop() { if (not stop_requested_) { event_system::instance().raise("file_manager"); polling::instance().remove_callback("timed_out_close"); stop_requested_ = true; unique_mutex_lock upload_lock(upload_mtx_); upload_notify_.notify_all(); upload_lock.unlock(); if (upload_thread_) { upload_thread_->join(); upload_thread_.reset(); } open_file_lookup_.clear(); upload_lock.lock(); for (auto &item : upload_lookup_) { item.second->stop(); } upload_notify_.notify_all(); upload_lock.unlock(); while (not upload_lookup_.empty()) { upload_lock.lock(); if (not upload_lookup_.empty()) { upload_notify_.wait_for(upload_lock, 1s); } upload_notify_.notify_all(); upload_lock.unlock(); } event_system::instance().raise("file_manager"); } } void file_manager::store_resume(const i_open_file &file) { if (provider_.is_direct_only()) { return; } const auto res = db_->Put(rocksdb::WriteOptions(), default_family_, file.get_api_path(), create_resume_entry(file).dump()); if (res.ok()) { event_system::instance().raise(file.get_api_path(), file.get_source_path()); } else { event_system::instance().raise( file.get_api_path(), file.get_source_path(), res.ToString()); } } void file_manager::upload_completed(const file_upload_completed &evt) { unique_mutex_lock upload_lock(upload_mtx_); if (not utils::string::to_bool(evt.get_cancelled().get())) { const auto err = api_error_from_string(evt.get_result().get()); switch (err) { case api_error::success: { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_)); for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); if (parts.at(1U) == evt.get_api_path().get()) { db_->Delete(rocksdb::WriteOptions(), upload_active_family_, iterator->key()); break; } } } break; case api_error::upload_stopped: { event_system::instance().raise(evt.get_api_path(), evt.get_source(), err); queue_upload(evt.get_api_path(), evt.get_source(), true); upload_notify_.wait_for(upload_lock, 5s); } break; default: { bool exists{}; auto res = provider_.is_file(evt.get_api_path(), exists); if ((res == api_error::success && not exists) || not utils::file::is_file(evt.get_source())) { event_system::instance().raise( evt.get_api_path(), evt.get_source()); remove_upload(evt.get_api_path(), true); return; } event_system::instance().raise(evt.get_api_path(), evt.get_source(), err); queue_upload(evt.get_api_path(), evt.get_source(), true); upload_notify_.wait_for(upload_lock, 5s); } break; } upload_lookup_.erase(evt.get_api_path()); } upload_notify_.notify_all(); } void file_manager::upload_handler() { while (not stop_requested_) { unique_mutex_lock upload_lock(upload_mtx_); if (not stop_requested_) { if (upload_lookup_.size() < config_.get_max_upload_count()) { auto iterator = std::unique_ptr( db_->NewIterator(rocksdb::ReadOptions(), upload_family_)); iterator->SeekToFirst(); if (iterator->Valid()) { const auto parts = utils::string::split(iterator->key().ToString(), ':'); const auto api_path = parts.at(1U); const auto source_path = iterator->value().ToString(); filesystem_item fsi{}; auto res = provider_.get_filesystem_item(api_path, false, fsi); switch (res) { case api_error::item_not_found: { event_system::instance().raise(api_path, source_path); remove_upload(parts.at(1U), true); } break; case api_error::success: { upload_lookup_[fsi.api_path] = std::make_unique(fsi, provider_); if (db_->Delete(rocksdb::WriteOptions(), upload_family_, iterator->key()) .ok()) { db_->Put(rocksdb::WriteOptions(), upload_active_family_, iterator->key(), iterator->value()); } } break; default: { event_system::instance().raise(api_path, source_path, res); queue_upload(api_path, source_path, true); upload_notify_.wait_for(upload_lock, 5s); } break; } } else { iterator.reset(); upload_notify_.wait(upload_lock); } } else { upload_notify_.wait(upload_lock); } } upload_notify_.notify_all(); } } void file_manager::update_used_space(std::uint64_t &used_space) const { recur_mutex_lock open_lock(open_file_mtx_); for (const auto &item : open_file_lookup_) { std::uint64_t file_size{}; auto res = provider_.get_file_size(item.second->get_api_path(), file_size); if ((res == api_error::success) && (file_size != item.second->get_file_size()) && (used_space >= file_size)) { used_space -= file_size; used_space += item.second->get_file_size(); } } } } // namespace repertory