fix crash
This commit is contained in:
parent
b64477fd7e
commit
310e436163
@ -245,16 +245,17 @@ private: \
|
||||
|
||||
#define E_SUBSCRIBE(name, callback) \
|
||||
event_consumers_.emplace_back(std::make_shared<repertory::event_consumer>( \
|
||||
#name, [this](const event &e) { callback(e); }))
|
||||
#name, [this](const event &evt) { callback(evt); }))
|
||||
|
||||
#define E_SUBSCRIBE_EXACT(name, callback) \
|
||||
event_consumers_.emplace_back(std::make_shared<repertory::event_consumer>( \
|
||||
#name, \
|
||||
[this](const event &e) { callback(dynamic_cast<const name &>(e)); }))
|
||||
#name, [this](const event &evt) { \
|
||||
callback(dynamic_cast<const name &>(evt)); \
|
||||
}))
|
||||
|
||||
#define E_SUBSCRIBE_ALL(callback) \
|
||||
event_consumers_.emplace_back(std::make_shared<repertory::event_consumer>( \
|
||||
[this](const event &e) { callback(e); }))
|
||||
[this](const event &evt) { callback(evt); }))
|
||||
} // namespace repertory
|
||||
|
||||
#endif // INCLUDE_EVENTS_EVENT_SYSTEM_HPP_
|
||||
|
@ -470,8 +470,8 @@ private:
|
||||
|
||||
auto open(const std::string &api_path, bool directory,
|
||||
const open_file_data &ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f,
|
||||
std::shared_ptr<i_closeable_open_file> of) -> api_error;
|
||||
std::shared_ptr<i_open_file> &file,
|
||||
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error;
|
||||
|
||||
void queue_upload(const std::string &api_path, const std::string &source_path,
|
||||
bool no_lock);
|
||||
@ -480,7 +480,7 @@ private:
|
||||
|
||||
void swap_renamed_items(std::string from_api_path, std::string to_api_path);
|
||||
|
||||
void upload_completed(const file_upload_completed &e);
|
||||
void upload_completed(const file_upload_completed &evt);
|
||||
|
||||
void upload_handler();
|
||||
|
||||
@ -490,14 +490,14 @@ public:
|
||||
auto handle_file_rename(const std::string &from_api_path,
|
||||
const std::string &to_api_path) -> api_error;
|
||||
|
||||
void queue_upload(const i_open_file &o) override;
|
||||
void queue_upload(const i_open_file &file) override;
|
||||
|
||||
void remove_resume(const std::string &api_path,
|
||||
const std::string &source_path) override;
|
||||
|
||||
void remove_upload(const std::string &api_path) override;
|
||||
|
||||
void store_resume(const i_open_file &o) override;
|
||||
void store_resume(const i_open_file &file) override;
|
||||
|
||||
public:
|
||||
void close(std::uint64_t handle);
|
||||
@ -506,7 +506,7 @@ public:
|
||||
|
||||
[[nodiscard]] auto create(const std::string &api_path, api_meta_map &meta,
|
||||
open_file_data ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f) -> api_error;
|
||||
std::shared_ptr<i_open_file> &file) -> api_error;
|
||||
|
||||
[[nodiscard]] auto evict_file(const std::string &api_path) -> bool override;
|
||||
|
||||
@ -514,7 +514,7 @@ public:
|
||||
-> directory_item_list override;
|
||||
|
||||
[[nodiscard]] auto get_open_file(std::uint64_t handle, bool write_supported,
|
||||
std::shared_ptr<i_open_file> &f) -> bool;
|
||||
std::shared_ptr<i_open_file> &file) -> bool;
|
||||
|
||||
[[nodiscard]] auto get_open_file_count() const -> std::size_t;
|
||||
|
||||
@ -533,11 +533,11 @@ public:
|
||||
#ifdef REPERTORY_TESTING
|
||||
[[nodiscard]] auto open(std::shared_ptr<i_closeable_open_file> of,
|
||||
const open_file_data &ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f) -> api_error;
|
||||
std::shared_ptr<i_open_file> &file) -> api_error;
|
||||
#endif
|
||||
[[nodiscard]] auto open(const std::string &api_path, bool directory,
|
||||
const open_file_data &ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f) -> api_error;
|
||||
std::shared_ptr<i_open_file> &file) -> api_error;
|
||||
|
||||
[[nodiscard]] auto remove_file(const std::string &api_path) -> api_error;
|
||||
|
||||
|
@ -34,12 +34,12 @@
|
||||
#include "utils/unix/unix_utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
static auto create_resume_entry(const i_open_file &o) -> json {
|
||||
static auto create_resume_entry(const i_open_file &file) -> json {
|
||||
return {
|
||||
{"chunk_size", o.get_chunk_size()},
|
||||
{"path", o.get_api_path()},
|
||||
{"read_state", utils::string::from_dynamic_bitset(o.get_read_state())},
|
||||
{"source", o.get_source_path()},
|
||||
{"chunk_size", file.get_chunk_size()},
|
||||
{"path", file.get_api_path()},
|
||||
{"read_state", utils::string::from_dynamic_bitset(file.get_read_state())},
|
||||
{"source", file.get_source_path()},
|
||||
};
|
||||
}
|
||||
|
||||
@ -110,16 +110,18 @@ void file_manager::close_all(const std::string &api_path) {
|
||||
unique_recur_mutex_lock file_lock(open_file_mtx_);
|
||||
std::vector<std::uint64_t> handles;
|
||||
auto iter = open_file_lookup_.find(api_path);
|
||||
if (iter != open_file_lookup_.end()) {
|
||||
handles = iter->second->get_handles();
|
||||
if (iter == open_file_lookup_.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
handles = iter->second->get_handles();
|
||||
|
||||
for (auto &handle : handles) {
|
||||
open_file_lookup_[api_path]->remove(handle);
|
||||
open_handle_lookup_.erase(handle);
|
||||
}
|
||||
|
||||
auto closeable_file = open_file_lookup_.at(api_path);
|
||||
auto file = open_file_lookup_.at(api_path);
|
||||
open_file_lookup_.erase(api_path);
|
||||
file_lock.unlock();
|
||||
}
|
||||
@ -128,9 +130,10 @@ void file_manager::close_timed_out_files() {
|
||||
unique_recur_mutex_lock file_lock(open_file_mtx_);
|
||||
auto closeable_list = std::accumulate(
|
||||
open_file_lookup_.begin(), open_file_lookup_.end(),
|
||||
std::vector<std::string>{}, [](auto items, const auto &kv) -> auto {
|
||||
if (kv.second->get_open_file_count() == 0U && kv.second->can_close()) {
|
||||
items.emplace_back(kv.first);
|
||||
std::vector<std::string>{}, [](auto items, const auto &item) -> auto {
|
||||
if (item.second->get_open_file_count() == 0U &&
|
||||
item.second->can_close()) {
|
||||
items.emplace_back(item.first);
|
||||
}
|
||||
return items;
|
||||
});
|
||||
@ -153,14 +156,14 @@ void file_manager::close_timed_out_files() {
|
||||
|
||||
auto file_manager::create(const std::string &api_path, api_meta_map &meta,
|
||||
open_file_data ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f) -> api_error {
|
||||
std::shared_ptr<i_open_file> &file) -> api_error {
|
||||
recur_mutex_lock file_lock(open_file_mtx_);
|
||||
auto res = provider_.create_file(api_path, meta);
|
||||
if (res != api_error::success && res != api_error::item_exists) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return open(api_path, false, ofd, handle, f);
|
||||
return open(api_path, false, ofd, handle, file);
|
||||
}
|
||||
|
||||
auto file_manager::evict_file(const std::string &api_path) -> bool {
|
||||
@ -221,7 +224,7 @@ auto file_manager::get_directory_items(const std::string &api_path) const
|
||||
}
|
||||
|
||||
auto file_manager::get_next_handle() -> std::uint64_t {
|
||||
if (++next_handle_ == 0u) {
|
||||
if (++next_handle_ == 0U) {
|
||||
next_handle_++;
|
||||
}
|
||||
|
||||
@ -231,36 +234,37 @@ auto file_manager::get_next_handle() -> std::uint64_t {
|
||||
auto file_manager::get_open_file_count(const std::string &api_path) const
|
||||
-> std::size_t {
|
||||
recur_mutex_lock open_lock(open_file_mtx_);
|
||||
auto it = open_file_lookup_.find(api_path);
|
||||
if (it != open_file_lookup_.end()) {
|
||||
return it->second->get_open_file_count();
|
||||
auto iter = open_file_lookup_.find(api_path);
|
||||
if (iter != open_file_lookup_.end()) {
|
||||
return iter->second->get_open_file_count();
|
||||
}
|
||||
|
||||
return 0u;
|
||||
return 0U;
|
||||
}
|
||||
|
||||
auto file_manager::get_open_file(std::uint64_t handle, bool write_supported,
|
||||
std::shared_ptr<i_open_file> &f) -> bool {
|
||||
std::shared_ptr<i_open_file> &file) -> bool {
|
||||
recur_mutex_lock open_lock(open_file_mtx_);
|
||||
auto it = open_handle_lookup_.find(handle);
|
||||
if (it == open_handle_lookup_.end()) {
|
||||
auto iter = open_handle_lookup_.find(handle);
|
||||
if (iter == open_handle_lookup_.end()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
auto of = open_file_lookup_.at(it->second->get_api_path());
|
||||
if (write_supported && not of->is_write_supported()) {
|
||||
auto new_f = std::make_shared<open_file>(
|
||||
auto current_file = open_file_lookup_.at(iter->second->get_api_path());
|
||||
if (write_supported && not current_file->is_write_supported()) {
|
||||
auto writeable_file = std::make_shared<open_file>(
|
||||
utils::encryption::encrypting_reader::get_data_chunk_size(),
|
||||
config_.get_enable_chunk_download_timeout()
|
||||
? config_.get_chunk_downloader_timeout_secs()
|
||||
: 0U,
|
||||
of->get_filesystem_item(), of->get_open_data(), provider_, *this);
|
||||
open_file_lookup_[of->get_api_path()] = new_f;
|
||||
f = new_f;
|
||||
current_file->get_filesystem_item(), current_file->get_open_data(),
|
||||
provider_, *this);
|
||||
open_file_lookup_[current_file->get_api_path()] = writeable_file;
|
||||
file = writeable_file;
|
||||
return true;
|
||||
}
|
||||
|
||||
f = of;
|
||||
file = current_file;
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -274,8 +278,8 @@ auto file_manager::get_open_files() const
|
||||
std::unordered_map<std::string, std::size_t> ret;
|
||||
|
||||
recur_mutex_lock open_lock(open_file_mtx_);
|
||||
for (const auto &kv : open_file_lookup_) {
|
||||
ret[kv.first] = kv.second->get_open_file_count();
|
||||
for (const auto &item : open_file_lookup_) {
|
||||
ret[item.first] = item.second->get_open_file_count();
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -348,21 +352,21 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
|
||||
|
||||
auto file_manager::open(const std::string &api_path, bool directory,
|
||||
const open_file_data &ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f) -> api_error {
|
||||
return open(api_path, directory, ofd, handle, f, nullptr);
|
||||
std::shared_ptr<i_open_file> &file) -> api_error {
|
||||
return open(api_path, directory, ofd, handle, file, nullptr);
|
||||
}
|
||||
|
||||
auto file_manager::open(const std::string &api_path, bool directory,
|
||||
const open_file_data &ofd, std::uint64_t &handle,
|
||||
std::shared_ptr<i_open_file> &f,
|
||||
std::shared_ptr<i_closeable_open_file> of)
|
||||
std::shared_ptr<i_open_file> &file,
|
||||
std::shared_ptr<i_closeable_open_file> closeable_file)
|
||||
-> api_error {
|
||||
const auto create_and_add_handle =
|
||||
[&](std::shared_ptr<i_closeable_open_file> cur_file) {
|
||||
handle = get_next_handle();
|
||||
cur_file->add(handle, ofd);
|
||||
open_handle_lookup_[handle] = cur_file.get();
|
||||
f = cur_file;
|
||||
file = cur_file;
|
||||
};
|
||||
|
||||
recur_mutex_lock open_lock(open_file_mtx_);
|
||||
@ -388,22 +392,22 @@ auto file_manager::open(const std::string &api_path, bool directory,
|
||||
}
|
||||
}
|
||||
|
||||
if (not of) {
|
||||
of = std::make_shared<open_file>(
|
||||
if (not closeable_file) {
|
||||
closeable_file = std::make_shared<open_file>(
|
||||
utils::encryption::encrypting_reader::get_data_chunk_size(),
|
||||
config_.get_enable_chunk_download_timeout()
|
||||
? config_.get_chunk_downloader_timeout_secs()
|
||||
: 0U,
|
||||
fsi, provider_, *this);
|
||||
}
|
||||
open_file_lookup_[api_path] = of;
|
||||
create_and_add_handle(of);
|
||||
open_file_lookup_[api_path] = closeable_file;
|
||||
create_and_add_handle(closeable_file);
|
||||
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
void file_manager::queue_upload(const i_open_file &o) {
|
||||
return queue_upload(o.get_api_path(), o.get_source_path(), false);
|
||||
void file_manager::queue_upload(const i_open_file &file) {
|
||||
return queue_upload(file.get_api_path(), file.get_source_path(), false);
|
||||
}
|
||||
|
||||
void file_manager::queue_upload(const std::string &api_path,
|
||||
@ -412,9 +416,9 @@ void file_manager::queue_upload(const std::string &api_path,
|
||||
return;
|
||||
}
|
||||
|
||||
std::unique_ptr<mutex_lock> l;
|
||||
std::unique_ptr<mutex_lock> lock;
|
||||
if (not no_lock) {
|
||||
l = std::make_unique<mutex_lock>(upload_mtx_);
|
||||
lock = std::make_unique<mutex_lock>(upload_mtx_);
|
||||
}
|
||||
remove_upload(api_path, true);
|
||||
|
||||
@ -532,8 +536,8 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
|
||||
|
||||
void file_manager::swap_renamed_items(std::string from_api_path,
|
||||
std::string to_api_path) {
|
||||
const auto it = open_file_lookup_.find(from_api_path);
|
||||
if (it != open_file_lookup_.end()) {
|
||||
const auto iter = open_file_lookup_.find(from_api_path);
|
||||
if (iter != open_file_lookup_.end()) {
|
||||
open_file_lookup_[to_api_path] = open_file_lookup_[from_api_path];
|
||||
open_file_lookup_.erase(from_api_path);
|
||||
open_file_lookup_[to_api_path]->set_api_path(to_api_path);
|
||||
@ -547,7 +551,7 @@ auto file_manager::rename_directory(const std::string &from_api_path,
|
||||
return api_error::not_implemented;
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock l(open_file_mtx_);
|
||||
unique_recur_mutex_lock lock(open_file_mtx_);
|
||||
// Ensure source directory exists
|
||||
bool exists{};
|
||||
auto res = provider_.is_directory(from_api_path, exists);
|
||||
@ -628,7 +632,7 @@ auto file_manager::rename_file(const std::string &from_api_path,
|
||||
return api_error::item_exists;
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock l(open_file_mtx_);
|
||||
unique_recur_mutex_lock lock(open_file_mtx_);
|
||||
|
||||
// Don't rename if source is directory
|
||||
bool exists{};
|
||||
@ -762,13 +766,13 @@ void file_manager::start() {
|
||||
std::uint64_t file_size{};
|
||||
if (utils::file::get_file_size(fsi.source_path, file_size)) {
|
||||
if (file_size == fsi.size) {
|
||||
auto f = std::make_shared<open_file>(
|
||||
auto file = std::make_shared<open_file>(
|
||||
chunk_size,
|
||||
config_.get_enable_chunk_download_timeout()
|
||||
? config_.get_chunk_downloader_timeout_secs()
|
||||
: 0U,
|
||||
fsi, provider_, read_state, *this);
|
||||
open_file_lookup_[api_path] = f;
|
||||
open_file_lookup_[api_path] = file;
|
||||
event_system::instance().raise<download_restored>(
|
||||
fsi.api_path, fsi.source_path);
|
||||
} else {
|
||||
@ -824,8 +828,8 @@ void file_manager::stop() {
|
||||
open_handle_lookup_.clear();
|
||||
|
||||
upload_lock.lock();
|
||||
for (auto &kv : upload_lookup_) {
|
||||
kv.second->stop();
|
||||
for (auto &item : upload_lookup_) {
|
||||
item.second->stop();
|
||||
}
|
||||
upload_notify_.notify_all();
|
||||
upload_lock.unlock();
|
||||
@ -843,27 +847,28 @@ void file_manager::stop() {
|
||||
}
|
||||
}
|
||||
|
||||
void file_manager::store_resume(const i_open_file &o) {
|
||||
void file_manager::store_resume(const i_open_file &file) {
|
||||
if (provider_.is_direct_only()) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto res = db_->Put(rocksdb::WriteOptions(), default_family_,
|
||||
o.get_api_path(), create_resume_entry(o).dump());
|
||||
const auto res =
|
||||
db_->Put(rocksdb::WriteOptions(), default_family_, file.get_api_path(),
|
||||
create_resume_entry(file).dump());
|
||||
if (res.ok()) {
|
||||
event_system::instance().raise<download_stored>(o.get_api_path(),
|
||||
o.get_source_path());
|
||||
event_system::instance().raise<download_stored>(file.get_api_path(),
|
||||
file.get_source_path());
|
||||
} else {
|
||||
event_system::instance().raise<download_stored_failed>(
|
||||
o.get_api_path(), o.get_source_path(), res.ToString());
|
||||
file.get_api_path(), file.get_source_path(), res.ToString());
|
||||
}
|
||||
}
|
||||
|
||||
void file_manager::upload_completed(const file_upload_completed &e) {
|
||||
void file_manager::upload_completed(const file_upload_completed &evt) {
|
||||
unique_mutex_lock upload_lock(upload_mtx_);
|
||||
|
||||
if (not utils::string::to_bool(e.get_cancelled().get<std::string>())) {
|
||||
const auto err = api_error_from_string(e.get_result().get<std::string>());
|
||||
if (not utils::string::to_bool(evt.get_cancelled().get<std::string>())) {
|
||||
const auto err = api_error_from_string(evt.get_result().get<std::string>());
|
||||
switch (err) {
|
||||
case api_error::success: {
|
||||
auto iterator = std::unique_ptr<rocksdb::Iterator>(
|
||||
@ -871,7 +876,7 @@ void file_manager::upload_completed(const file_upload_completed &e) {
|
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
|
||||
const auto parts =
|
||||
utils::string::split(iterator->key().ToString(), ':');
|
||||
if (parts[1U] == e.get_api_path().get<std::string>()) {
|
||||
if (parts[1U] == evt.get_api_path().get<std::string>()) {
|
||||
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
|
||||
iterator->key());
|
||||
break;
|
||||
@ -879,29 +884,29 @@ void file_manager::upload_completed(const file_upload_completed &e) {
|
||||
}
|
||||
} break;
|
||||
case api_error::upload_stopped: {
|
||||
event_system::instance().raise<file_upload_retry>(e.get_api_path(),
|
||||
e.get_source(), err);
|
||||
queue_upload(e.get_api_path(), e.get_source(), true);
|
||||
event_system::instance().raise<file_upload_retry>(evt.get_api_path(),
|
||||
evt.get_source(), err);
|
||||
queue_upload(evt.get_api_path(), evt.get_source(), true);
|
||||
upload_notify_.wait_for(upload_lock, 5s);
|
||||
} break;
|
||||
default: {
|
||||
bool exists{};
|
||||
auto res = provider_.is_file(e.get_api_path(), exists);
|
||||
auto res = provider_.is_file(evt.get_api_path(), exists);
|
||||
if ((res == api_error::success && not exists) ||
|
||||
not utils::file::is_file(e.get_source())) {
|
||||
event_system::instance().raise<file_upload_not_found>(e.get_api_path(),
|
||||
e.get_source());
|
||||
remove_upload(e.get_api_path(), true);
|
||||
not utils::file::is_file(evt.get_source())) {
|
||||
event_system::instance().raise<file_upload_not_found>(
|
||||
evt.get_api_path(), evt.get_source());
|
||||
remove_upload(evt.get_api_path(), true);
|
||||
return;
|
||||
}
|
||||
|
||||
event_system::instance().raise<file_upload_retry>(e.get_api_path(),
|
||||
e.get_source(), err);
|
||||
queue_upload(e.get_api_path(), e.get_source(), true);
|
||||
event_system::instance().raise<file_upload_retry>(evt.get_api_path(),
|
||||
evt.get_source(), err);
|
||||
queue_upload(evt.get_api_path(), evt.get_source(), true);
|
||||
upload_notify_.wait_for(upload_lock, 5s);
|
||||
} break;
|
||||
}
|
||||
upload_lookup_.erase(e.get_api_path());
|
||||
upload_lookup_.erase(evt.get_api_path());
|
||||
}
|
||||
upload_notify_.notify_all();
|
||||
}
|
||||
@ -917,7 +922,7 @@ void file_manager::upload_handler() {
|
||||
if (iterator->Valid()) {
|
||||
const auto parts =
|
||||
utils::string::split(iterator->key().ToString(), ':');
|
||||
const auto api_path = parts[1u];
|
||||
const auto api_path = parts.at(1U);
|
||||
const auto source_path = iterator->value().ToString();
|
||||
|
||||
filesystem_item fsi{};
|
||||
@ -926,7 +931,7 @@ void file_manager::upload_handler() {
|
||||
case api_error::item_not_found: {
|
||||
event_system::instance().raise<file_upload_not_found>(api_path,
|
||||
source_path);
|
||||
remove_upload(parts[1u], true);
|
||||
remove_upload(parts.at(1U), true);
|
||||
} break;
|
||||
case api_error::success: {
|
||||
upload_lookup_[fsi.api_path] =
|
||||
@ -959,14 +964,14 @@ void file_manager::upload_handler() {
|
||||
|
||||
void file_manager::update_used_space(std::uint64_t &used_space) const {
|
||||
recur_mutex_lock open_lock(open_file_mtx_);
|
||||
for (const auto &of : open_file_lookup_) {
|
||||
for (const auto &item : open_file_lookup_) {
|
||||
std::uint64_t file_size{};
|
||||
auto res = provider_.get_file_size(of.second->get_api_path(), file_size);
|
||||
auto res = provider_.get_file_size(item.second->get_api_path(), file_size);
|
||||
if ((res == api_error::success) &&
|
||||
(file_size != of.second->get_file_size()) &&
|
||||
(file_size != item.second->get_file_size()) &&
|
||||
(used_space >= file_size)) {
|
||||
used_space -= file_size;
|
||||
used_space += of.second->get_file_size();
|
||||
used_space += item.second->get_file_size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user