refactoring
This commit is contained in:
parent
dfa5e0e005
commit
aeedd114a6
@ -33,27 +33,31 @@
|
|||||||
#include "utils/rocksdb_utils.hpp"
|
#include "utils/rocksdb_utils.hpp"
|
||||||
#include "utils/unix/unix_utils.hpp"
|
#include "utils/unix/unix_utils.hpp"
|
||||||
|
|
||||||
namespace repertory {
|
namespace {
|
||||||
static auto create_resume_entry(const i_open_file &file) -> json {
|
[[nodiscard]] auto create_resume_entry(const repertory::i_open_file &file)
|
||||||
|
-> json {
|
||||||
return {
|
return {
|
||||||
{"chunk_size", file.get_chunk_size()},
|
{"chunk_size", file.get_chunk_size()},
|
||||||
{"path", file.get_api_path()},
|
{"path", file.get_api_path()},
|
||||||
{"read_state", utils::string::from_dynamic_bitset(file.get_read_state())},
|
{"read_state",
|
||||||
|
repertory::utils::string::from_dynamic_bitset(file.get_read_state())},
|
||||||
{"source", file.get_source_path()},
|
{"source", file.get_source_path()},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
static void restore_resume_entry(const json &resume_entry,
|
void restore_resume_entry(const json &resume_entry, std::string &api_path,
|
||||||
std::string &api_path, std::size_t &chunk_size,
|
std::size_t &chunk_size,
|
||||||
boost::dynamic_bitset<> &read_state,
|
boost::dynamic_bitset<> &read_state,
|
||||||
std::string &source_path) {
|
std::string &source_path) {
|
||||||
api_path = resume_entry["path"].get<std::string>();
|
api_path = resume_entry["path"].get<std::string>();
|
||||||
chunk_size = resume_entry["chunk_size"].get<std::size_t>();
|
chunk_size = resume_entry["chunk_size"].get<std::size_t>();
|
||||||
read_state = utils::string::to_dynamic_bitset(
|
read_state = repertory::utils::string::to_dynamic_bitset(
|
||||||
resume_entry["read_state"].get<std::string>());
|
resume_entry["read_state"].get<std::string>());
|
||||||
source_path = resume_entry["source"].get<std::string>();
|
source_path = resume_entry["source"].get<std::string>();
|
||||||
}
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
namespace repertory {
|
||||||
file_manager::file_manager(app_config &config, i_provider &provider)
|
file_manager::file_manager(app_config &config, i_provider &provider)
|
||||||
: config_(config), provider_(provider) {
|
: config_(config), provider_(provider) {
|
||||||
if (not provider_.is_direct_only()) {
|
if (not provider_.is_direct_only()) {
|
||||||
@ -96,12 +100,12 @@ void file_manager::close(std::uint64_t handle) {
|
|||||||
void file_manager::close_all(const std::string &api_path) {
|
void file_manager::close_all(const std::string &api_path) {
|
||||||
recur_mutex_lock file_lock(open_file_mtx_);
|
recur_mutex_lock file_lock(open_file_mtx_);
|
||||||
std::vector<std::uint64_t> handles;
|
std::vector<std::uint64_t> handles;
|
||||||
auto iter = open_file_lookup_.find(api_path);
|
auto file_iter = open_file_lookup_.find(api_path);
|
||||||
if (iter == open_file_lookup_.end()) {
|
if (file_iter == open_file_lookup_.end()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto closeable_file = iter->second;
|
auto closeable_file = file_iter->second;
|
||||||
|
|
||||||
handles = closeable_file->get_handles();
|
handles = closeable_file->get_handles();
|
||||||
for (auto handle : handles) {
|
for (auto handle : handles) {
|
||||||
@ -213,21 +217,20 @@ auto file_manager::get_next_handle() -> std::uint64_t {
|
|||||||
|
|
||||||
auto file_manager::get_open_file_by_handle(std::uint64_t handle) const
|
auto file_manager::get_open_file_by_handle(std::uint64_t handle) const
|
||||||
-> std::shared_ptr<i_closeable_open_file> {
|
-> std::shared_ptr<i_closeable_open_file> {
|
||||||
auto iter = std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(),
|
auto file_iter =
|
||||||
[&handle](const auto &item) -> bool {
|
std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(),
|
||||||
return item.second->has_handle(handle);
|
[&handle](const auto &item) -> bool {
|
||||||
});
|
return item.second->has_handle(handle);
|
||||||
return iter == open_file_lookup_.end() ? nullptr : iter->second;
|
});
|
||||||
|
return (file_iter == open_file_lookup_.end()) ? nullptr : file_iter->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto file_manager::get_open_file_count(const std::string &api_path) const
|
auto file_manager::get_open_file_count(const std::string &api_path) const
|
||||||
-> std::size_t {
|
-> std::size_t {
|
||||||
auto iter = open_file_lookup_.find(api_path);
|
auto file_iter = open_file_lookup_.find(api_path);
|
||||||
if (iter != open_file_lookup_.end()) {
|
return (file_iter == open_file_lookup_.end())
|
||||||
return iter->second->get_open_file_count();
|
? 0U
|
||||||
}
|
: file_iter->second->get_open_file_count();
|
||||||
|
|
||||||
return 0U;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto file_manager::get_open_file(std::uint64_t handle, bool write_supported,
|
auto file_manager::get_open_file(std::uint64_t handle, bool write_supported,
|
||||||
@ -284,10 +287,10 @@ auto file_manager::get_open_handle_count() const -> std::size_t {
|
|||||||
auto file_manager::get_stored_downloads() const -> std::vector<json> {
|
auto file_manager::get_stored_downloads() const -> std::vector<json> {
|
||||||
std::vector<json> ret;
|
std::vector<json> ret;
|
||||||
if (not provider_.is_direct_only()) {
|
if (not provider_.is_direct_only()) {
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
ret.emplace_back(json::parse(iterator->value().ToString()));
|
ret.emplace_back(json::parse(db_iter->value().ToString()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -321,10 +324,10 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
|
|||||||
upload_lock.unlock();
|
upload_lock.unlock();
|
||||||
|
|
||||||
{
|
{
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
const auto parts = utils::string::split(iterator->key().ToString(), ':');
|
const auto parts = utils::string::split(db_iter->key().ToString(), ':');
|
||||||
if (parts.at(1U) == api_path) {
|
if (parts.at(1U) == api_path) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -332,12 +335,11 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
recur_mutex_lock open_lock(open_file_mtx_);
|
recur_mutex_lock open_lock(open_file_mtx_);
|
||||||
auto iter = open_file_lookup_.find(api_path);
|
auto file_iter = open_file_lookup_.find(api_path);
|
||||||
if (iter != open_file_lookup_.end()) {
|
return (file_iter == open_file_lookup_.end())
|
||||||
return iter->second->is_modified() || not iter->second->is_complete();
|
? false
|
||||||
}
|
: file_iter->second->is_modified() ||
|
||||||
|
not file_iter->second->is_complete();
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
auto file_manager::open(const std::string &api_path, bool directory,
|
auto file_manager::open(const std::string &api_path, bool directory,
|
||||||
@ -359,9 +361,9 @@ auto file_manager::open(const std::string &api_path, bool directory,
|
|||||||
file = cur_file;
|
file = cur_file;
|
||||||
};
|
};
|
||||||
|
|
||||||
auto iter = open_file_lookup_.find(api_path);
|
auto file_iter = open_file_lookup_.find(api_path);
|
||||||
if (iter != open_file_lookup_.end()) {
|
if (file_iter != open_file_lookup_.end()) {
|
||||||
create_and_add_handle(iter->second);
|
create_and_add_handle(file_iter->second);
|
||||||
return api_error::success;
|
return api_error::success;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -429,8 +431,9 @@ void file_manager::queue_upload(const std::string &api_path,
|
|||||||
|
|
||||||
auto file_manager::remove_file(const std::string &api_path) -> api_error {
|
auto file_manager::remove_file(const std::string &api_path) -> api_error {
|
||||||
recur_mutex_lock open_lock(open_file_mtx_);
|
recur_mutex_lock open_lock(open_file_mtx_);
|
||||||
auto iter = open_file_lookup_.find(api_path);
|
auto file_iter = open_file_lookup_.find(api_path);
|
||||||
if (iter != open_file_lookup_.end() && iter->second->is_modified()) {
|
if (file_iter != open_file_lookup_.end() &&
|
||||||
|
file_iter->second->is_modified()) {
|
||||||
return api_error::file_in_use;
|
return api_error::file_in_use;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -484,37 +487,36 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
|
|||||||
lock = std::make_unique<mutex_lock>(upload_mtx_);
|
lock = std::make_unique<mutex_lock>(upload_mtx_);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto iter = upload_lookup_.find(api_path);
|
auto file_iter = upload_lookup_.find(api_path);
|
||||||
if (iter == upload_lookup_.end()) {
|
if (file_iter == upload_lookup_.end()) {
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
const auto parts = utils::string::split(iterator->key().ToString(), ':');
|
const auto parts = utils::string::split(db_iter->key().ToString(), ':');
|
||||||
if (parts.at(1U) == api_path) {
|
if (parts.at(1U) == api_path) {
|
||||||
if (db_->Delete(rocksdb::WriteOptions(), upload_family_,
|
if (db_->Delete(rocksdb::WriteOptions(), upload_family_, db_iter->key())
|
||||||
iterator->key())
|
|
||||||
.ok()) {
|
.ok()) {
|
||||||
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
||||||
iterator->key());
|
db_iter->key());
|
||||||
}
|
}
|
||||||
event_system::instance().raise<file_upload_removed>(
|
event_system::instance().raise<file_upload_removed>(
|
||||||
api_path, iterator->value().ToString());
|
api_path, db_iter->value().ToString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
const auto parts = utils::string::split(iterator->key().ToString(), ':');
|
const auto parts = utils::string::split(db_iter->key().ToString(), ':');
|
||||||
if (parts.at(1U) == api_path) {
|
if (parts.at(1U) == api_path) {
|
||||||
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
||||||
iterator->key());
|
db_iter->key());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
event_system::instance().raise<file_upload_removed>(
|
event_system::instance().raise<file_upload_removed>(
|
||||||
api_path, iter->second->get_source_path());
|
api_path, file_iter->second->get_source_path());
|
||||||
|
|
||||||
iter->second->cancel();
|
file_iter->second->cancel();
|
||||||
upload_lookup_.erase(api_path);
|
upload_lookup_.erase(api_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -525,8 +527,8 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
|
|||||||
|
|
||||||
void file_manager::swap_renamed_items(std::string from_api_path,
|
void file_manager::swap_renamed_items(std::string from_api_path,
|
||||||
std::string to_api_path) {
|
std::string to_api_path) {
|
||||||
const auto iter = open_file_lookup_.find(from_api_path);
|
auto file_iter = open_file_lookup_.find(from_api_path);
|
||||||
if (iter != open_file_lookup_.end()) {
|
if (file_iter != open_file_lookup_.end()) {
|
||||||
open_file_lookup_[to_api_path] = open_file_lookup_[from_api_path];
|
open_file_lookup_[to_api_path] = open_file_lookup_[from_api_path];
|
||||||
open_file_lookup_.erase(from_api_path);
|
open_file_lookup_.erase(from_api_path);
|
||||||
open_file_lookup_[to_api_path]->set_api_path(to_api_path);
|
open_file_lookup_[to_api_path]->set_api_path(to_api_path);
|
||||||
@ -730,26 +732,26 @@ void file_manager::start() {
|
|||||||
|
|
||||||
std::vector<active_item> active_items{};
|
std::vector<active_item> active_items{};
|
||||||
|
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
const auto parts = utils::string::split(iterator->key().ToString(), ':');
|
const auto parts = utils::string::split(db_iter->key().ToString(), ':');
|
||||||
active_items.emplace_back(
|
active_items.emplace_back(
|
||||||
active_item{parts.at(1U), iterator->value().ToString()});
|
active_item{parts.at(1U), db_iter->value().ToString()});
|
||||||
}
|
}
|
||||||
for (const auto &active_item : active_items) {
|
for (const auto &active_item : active_items) {
|
||||||
queue_upload(active_item.api_path, active_item.source_path, false);
|
queue_upload(active_item.api_path, active_item.source_path, false);
|
||||||
}
|
}
|
||||||
active_items.clear();
|
active_items.clear();
|
||||||
|
|
||||||
iterator = std::unique_ptr<rocksdb::Iterator>(
|
db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
std::string api_path;
|
std::string api_path;
|
||||||
std::string source_path;
|
std::string source_path;
|
||||||
std::size_t chunk_size{};
|
std::size_t chunk_size{};
|
||||||
boost::dynamic_bitset<> read_state;
|
boost::dynamic_bitset<> read_state;
|
||||||
restore_resume_entry(json::parse(iterator->value().ToString()), api_path,
|
restore_resume_entry(json::parse(db_iter->value().ToString()), api_path,
|
||||||
chunk_size, read_state, source_path);
|
chunk_size, read_state, source_path);
|
||||||
|
|
||||||
filesystem_item fsi{};
|
filesystem_item fsi{};
|
||||||
@ -860,14 +862,13 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
|
|||||||
const auto err = api_error_from_string(evt.get_result().get<std::string>());
|
const auto err = api_error_from_string(evt.get_result().get<std::string>());
|
||||||
switch (err) {
|
switch (err) {
|
||||||
case api_error::success: {
|
case api_error::success: {
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
|
||||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
|
||||||
const auto parts =
|
const auto parts = utils::string::split(db_iter->key().ToString(), ':');
|
||||||
utils::string::split(iterator->key().ToString(), ':');
|
|
||||||
if (parts.at(1U) == evt.get_api_path().get<std::string>()) {
|
if (parts.at(1U) == evt.get_api_path().get<std::string>()) {
|
||||||
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
||||||
iterator->key());
|
db_iter->key());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -905,14 +906,14 @@ void file_manager::upload_handler() {
|
|||||||
unique_mutex_lock upload_lock(upload_mtx_);
|
unique_mutex_lock upload_lock(upload_mtx_);
|
||||||
if (not stop_requested_) {
|
if (not stop_requested_) {
|
||||||
if (upload_lookup_.size() < config_.get_max_upload_count()) {
|
if (upload_lookup_.size() < config_.get_max_upload_count()) {
|
||||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
auto db_iter = std::unique_ptr<rocksdb::Iterator>(
|
||||||
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
|
||||||
iterator->SeekToFirst();
|
db_iter->SeekToFirst();
|
||||||
if (iterator->Valid()) {
|
if (db_iter->Valid()) {
|
||||||
const auto parts =
|
const auto parts =
|
||||||
utils::string::split(iterator->key().ToString(), ':');
|
utils::string::split(db_iter->key().ToString(), ':');
|
||||||
const auto api_path = parts.at(1U);
|
const auto api_path = parts.at(1U);
|
||||||
const auto source_path = iterator->value().ToString();
|
const auto source_path = db_iter->value().ToString();
|
||||||
|
|
||||||
filesystem_item fsi{};
|
filesystem_item fsi{};
|
||||||
auto res = provider_.get_filesystem_item(api_path, false, fsi);
|
auto res = provider_.get_filesystem_item(api_path, false, fsi);
|
||||||
@ -926,10 +927,10 @@ void file_manager::upload_handler() {
|
|||||||
upload_lookup_[fsi.api_path] =
|
upload_lookup_[fsi.api_path] =
|
||||||
std::make_unique<upload>(fsi, provider_);
|
std::make_unique<upload>(fsi, provider_);
|
||||||
if (db_->Delete(rocksdb::WriteOptions(), upload_family_,
|
if (db_->Delete(rocksdb::WriteOptions(), upload_family_,
|
||||||
iterator->key())
|
db_iter->key())
|
||||||
.ok()) {
|
.ok()) {
|
||||||
db_->Put(rocksdb::WriteOptions(), upload_active_family_,
|
db_->Put(rocksdb::WriteOptions(), upload_active_family_,
|
||||||
iterator->key(), iterator->value());
|
db_iter->key(), db_iter->value());
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
default: {
|
default: {
|
||||||
@ -940,7 +941,7 @@ void file_manager::upload_handler() {
|
|||||||
} break;
|
} break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
iterator.reset();
|
db_iter.reset();
|
||||||
upload_notify_.wait(upload_lock);
|
upload_notify_.wait(upload_lock);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user