2.0.0-rc (#9)
Some checks failed
BlockStorage/repertory_osx/pipeline/head This commit looks good
BlockStorage/repertory_windows/pipeline/head This commit looks good
BlockStorage/repertory/pipeline/head There was a failure building this commit
BlockStorage/repertory_linux_builds/pipeline/head This commit looks good
BlockStorage/repertory_osx_builds/pipeline/head There was a failure building this commit

### Issues

* \#1 \[bug\] Unable to mount S3 due to 'item_not_found' exception
* \#2 Require bucket name for S3 mounts
* \#3 \[bug\] File size is not being updated in S3 mount
* \#4 Upgrade to libfuse-3.x.x
* \#5 Switch to renterd for Sia support
* \#6 Switch to cpp-httplib to further reduce dependencies
* \#7 Remove global_data and calculate used disk space per provider
* \#8 Switch to libcurl for S3 mount support

### Changes from v1.x.x

* Added read-only encrypt provider
  * Pass-through mount point that transparently encrypts source data using `XChaCha20-Poly1305`
* Added S3 encryption support via `XChaCha20-Poly1305`
* Added replay protection to remote mounts
* Added support base64 writes in remote FUSE
* Created static linked Linux binaries for `amd64` and `aarch64` using `musl-libc`
* Removed legacy Sia renter support
* Removed Skynet support
* Fixed multiple remote mount WinFSP API issues on \*NIX servers
* Implemented chunked read and write
  * Writes for non-cached files are performed in chunks of 8Mib
* Removed `repertory-ui` support
* Removed `FreeBSD` support
* Switched to `libsodium` over `CryptoPP`
* Switched to `XChaCha20-Poly1305` for remote mounts
* Updated `GoogleTest` to v1.14.0
* Updated `JSON for Modern C++` to v3.11.2
* Updated `OpenSSL` to v1.1.1w
* Updated `RocksDB` to v8.5.3
* Updated `WinFSP` to 2023
* Updated `boost` to v1.78.0
* Updated `cURL` to v8.3.0
* Updated `zlib` to v1.3
* Use `upload_manager` for all providers
  * Adds a delay to uploads to prevent excessive API calls
  * Supports re-upload after mount restart for incomplete uploads
  * NOTE: Uploads for all providers are full file (no resume support)
    * Multipart upload support is planned for S3

Reviewed-on: #9
This commit is contained in:
2023-10-29 06:55:59 +00:00
parent 3ff46723b8
commit f43c41f88a
839 changed files with 98214 additions and 92959 deletions

View File

@ -0,0 +1,979 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "app_config.hpp"
#include "file_manager/events.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/encrypting_reader.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path_utils.hpp"
#include "utils/polling.hpp"
#include "utils/rocksdb_utils.hpp"
#include "utils/unix/unix_utils.hpp"
namespace repertory {
static auto create_resume_entry(const i_open_file &o) -> json {
return {
{"chunk_size", o.get_chunk_size()},
{"path", o.get_api_path()},
{"read_state", utils::string::from_dynamic_bitset(o.get_read_state())},
{"source", o.get_source_path()},
};
}
static void restore_resume_entry(const json &resume_entry,
std::string &api_path, std::size_t &chunk_size,
boost::dynamic_bitset<> &read_state,
std::string &source_path) {
api_path = resume_entry["path"].get<std::string>();
chunk_size = resume_entry["chunk_size"].get<std::size_t>();
read_state = utils::string::to_dynamic_bitset(
resume_entry["read_state"].get<std::string>());
source_path = resume_entry["source"].get<std::string>();
}
file_manager::file_manager(app_config &config, i_provider &provider)
: config_(config), provider_(provider) {
if (not provider_.is_direct_only()) {
auto families = std::vector<rocksdb::ColumnFamilyDescriptor>();
families.emplace_back(rocksdb::kDefaultColumnFamilyName,
rocksdb::ColumnFamilyOptions());
families.emplace_back("upload", rocksdb::ColumnFamilyOptions());
families.emplace_back("upload_active", rocksdb::ColumnFamilyOptions());
auto handles = std::vector<rocksdb::ColumnFamilyHandle *>();
utils::db::create_rocksdb(config, "upload_db", families, handles, db_);
std::size_t idx{};
default_family_ = handles[idx++];
upload_family_ = handles[idx++];
upload_active_family_ = handles[idx++];
E_SUBSCRIBE_EXACT(file_upload_completed,
[this](const file_upload_completed &evt) {
this->upload_completed(evt);
});
}
}
file_manager::~file_manager() {
stop();
E_CONSUMER_RELEASE();
db_.reset();
}
void file_manager::close(std::uint64_t handle) {
unique_recur_mutex_lock file_lock(open_file_mtx_);
auto it = open_handle_lookup_.find(handle);
if (it != open_handle_lookup_.end()) {
auto *cur_file = it->second;
open_handle_lookup_.erase(handle);
cur_file->remove(handle);
if (cur_file->can_close()) {
const auto api_path = cur_file->get_api_path();
cur_file = nullptr;
auto closeable_file = open_file_lookup_.at(api_path);
open_file_lookup_.erase(api_path);
file_lock.unlock();
closeable_file->close();
}
}
}
void file_manager::close_all(const std::string &api_path) {
recur_mutex_lock file_lock(open_file_mtx_);
std::vector<std::uint64_t> handles;
auto it = open_file_lookup_.find(api_path);
if (it != open_file_lookup_.end()) {
handles = it->second->get_handles();
}
for (auto &handle : handles) {
open_file_lookup_[api_path]->remove(handle);
open_handle_lookup_.erase(handle);
}
open_file_lookup_.erase(api_path);
}
void file_manager::close_timed_out_files() {
unique_recur_mutex_lock file_lock(open_file_mtx_);
auto closeable_list = std::accumulate(
open_file_lookup_.begin(), open_file_lookup_.end(),
std::vector<std::string>{}, [](auto items, const auto &kv) -> auto {
if (kv.second->get_open_file_count() == 0U && kv.second->can_close()) {
items.emplace_back(kv.first);
}
return items;
});
std::vector<std::shared_ptr<i_closeable_open_file>> open_files{};
for (const auto &api_path : closeable_list) {
auto closeable_file = open_file_lookup_.at(api_path);
open_file_lookup_.erase(api_path);
open_files.emplace_back(closeable_file);
}
closeable_list.clear();
file_lock.unlock();
for (auto &closeable_file : open_files) {
closeable_file->close();
event_system::instance().raise<download_timeout>(
closeable_file->get_api_path());
}
}
auto file_manager::create(const std::string &api_path, api_meta_map &meta,
open_file_data ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &f) -> api_error {
recur_mutex_lock file_lock(open_file_mtx_);
auto res = provider_.create_file(api_path, meta);
if (res != api_error::success && res != api_error::item_exists) {
return res;
}
return open(api_path, false, ofd, handle, f);
}
auto file_manager::evict_file(const std::string &api_path) -> bool {
if (provider_.is_direct_only()) {
return false;
}
recur_mutex_lock open_lock(open_file_mtx_);
if (is_processing(api_path)) {
return false;
}
if (get_open_file_count(api_path) != 0U) {
return false;
}
std::string pinned;
auto res = provider_.get_item_meta(api_path, META_PINNED, pinned);
if (res != api_error::success && res != api_error::item_not_found) {
utils::error::raise_api_path_error(__FUNCTION__, api_path, res,
"failed to get pinned status");
return false;
}
if (not pinned.empty() && utils::string::to_bool(pinned)) {
return false;
}
std::string source_path{};
res = provider_.get_item_meta(api_path, META_SOURCE, source_path);
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, api_path, res,
"failed to get source path");
return false;
}
if (source_path.empty()) {
return false;
}
auto removed = utils::file::retry_delete_file(source_path);
if (removed) {
event_system::instance().raise<filesystem_item_evicted>(api_path,
source_path);
}
return removed;
}
auto file_manager::get_directory_items(const std::string &api_path) const
-> directory_item_list {
directory_item_list list{};
auto res = provider_.get_directory_items(api_path, list);
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, api_path, res,
"failed to get directory list");
}
return list;
}
auto file_manager::get_next_handle() -> std::uint64_t {
if (++next_handle_ == 0u) {
next_handle_++;
}
return next_handle_;
}
auto file_manager::get_open_file_count(const std::string &api_path) const
-> std::size_t {
recur_mutex_lock open_lock(open_file_mtx_);
auto it = open_file_lookup_.find(api_path);
if (it != open_file_lookup_.end()) {
return it->second->get_open_file_count();
}
return 0u;
}
auto file_manager::get_open_file(std::uint64_t handle, bool write_supported,
std::shared_ptr<i_open_file> &f) -> bool {
recur_mutex_lock open_lock(open_file_mtx_);
auto it = open_handle_lookup_.find(handle);
if (it == open_handle_lookup_.end()) {
return false;
}
auto of = open_file_lookup_.at(it->second->get_api_path());
if (write_supported && not of->is_write_supported()) {
auto new_f = std::make_shared<open_file>(
utils::encryption::encrypting_reader::get_data_chunk_size(),
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
of->get_filesystem_item(), of->get_open_data(), provider_, *this);
open_file_lookup_[of->get_api_path()] = new_f;
f = new_f;
return true;
}
f = of;
return true;
}
auto file_manager::get_open_file_count() const -> std::size_t {
recur_mutex_lock open_lock(open_file_mtx_);
return open_file_lookup_.size();
}
auto file_manager::get_open_files() const
-> std::unordered_map<std::string, std::size_t> {
std::unordered_map<std::string, std::size_t> ret;
recur_mutex_lock open_lock(open_file_mtx_);
for (const auto &kv : open_file_lookup_) {
ret[kv.first] = kv.second->get_open_file_count();
}
return ret;
}
auto file_manager::get_open_handle_count() const -> std::size_t {
recur_mutex_lock open_lock(open_file_mtx_);
return open_handle_lookup_.size();
}
auto file_manager::get_stored_downloads() const -> std::vector<json> {
std::vector<json> ret;
if (not provider_.is_direct_only()) {
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
ret.emplace_back(json::parse(iterator->value().ToString()));
}
}
return ret;
}
auto file_manager::handle_file_rename(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
auto ret = api_error::file_in_use;
if ((ret = provider_.rename_file(from_api_path, to_api_path)) ==
api_error::success) {
swap_renamed_items(from_api_path, to_api_path);
}
return ret;
}
auto file_manager::has_no_open_file_handles() const -> bool {
recur_mutex_lock open_lock(open_file_mtx_);
return open_handle_lookup_.empty();
}
auto file_manager::is_processing(const std::string &api_path) const -> bool {
if (provider_.is_direct_only()) {
return false;
}
recur_mutex_lock open_lock(open_file_mtx_);
mutex_lock upload_lock(upload_mtx_);
if (upload_lookup_.find(api_path) != upload_lookup_.end()) {
return true;
}
{
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
const auto parts = utils::string::split(iterator->key().ToString(), ':');
if (parts[1u] == api_path) {
return true;
}
}
}
auto it = open_file_lookup_.find(api_path);
if (it != open_file_lookup_.end()) {
return it->second->is_modified() || not it->second->is_complete();
}
return false;
}
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &f) -> api_error {
return open(api_path, directory, ofd, handle, f, nullptr);
}
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &f,
std::shared_ptr<i_closeable_open_file> of)
-> api_error {
const auto create_and_add_handle =
[&](std::shared_ptr<i_closeable_open_file> cur_file) {
handle = get_next_handle();
cur_file->add(handle, ofd);
open_handle_lookup_[handle] = cur_file.get();
f = cur_file;
};
recur_mutex_lock open_lock(open_file_mtx_);
auto it = open_file_lookup_.find(api_path);
if (it != open_file_lookup_.end()) {
create_and_add_handle(it->second);
return api_error::success;
}
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, directory, fsi);
if (res != api_error::success) {
return res;
}
if (fsi.source_path.empty()) {
fsi.source_path = utils::path::combine(config_.get_cache_directory(),
{utils::create_uuid_string()});
if ((res = provider_.set_item_meta(fsi.api_path, META_SOURCE,
fsi.source_path)) !=
api_error::success) {
return res;
}
}
if (not of) {
of = std::make_shared<open_file>(
utils::encryption::encrypting_reader::get_data_chunk_size(),
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, *this);
}
open_file_lookup_[api_path] = of;
create_and_add_handle(of);
return api_error::success;
}
auto file_manager::perform_locked_operation(
locked_operation_callback locked_operation) -> bool {
recur_mutex_lock open_lock(open_file_mtx_);
return locked_operation(provider_);
}
void file_manager::queue_upload(const i_open_file &o) {
return queue_upload(o.get_api_path(), o.get_source_path(), false);
}
void file_manager::queue_upload(const std::string &api_path,
const std::string &source_path, bool no_lock) {
if (provider_.is_direct_only()) {
return;
}
std::unique_ptr<mutex_lock> l;
if (not no_lock) {
l = std::make_unique<mutex_lock>(upload_mtx_);
}
remove_upload(api_path, true);
auto res = db_->Put(
rocksdb::WriteOptions(), upload_family_,
std::to_string(utils::get_file_time_now()) + ":" + api_path, source_path);
if (res.ok()) {
remove_resume(api_path, source_path);
event_system::instance().raise<file_upload_queued>(api_path, source_path);
} else {
event_system::instance().raise<file_upload_failed>(api_path, source_path,
res.ToString());
}
if (not no_lock) {
upload_notify_.notify_all();
}
}
auto file_manager::remove_file(const std::string &api_path) -> api_error {
recur_mutex_lock open_lock(open_file_mtx_);
auto it = open_file_lookup_.find(api_path);
if (it != open_file_lookup_.end() && it->second->is_modified()) {
return api_error::file_in_use;
}
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res != api_error::success) {
return res;
}
if ((res = provider_.remove_file(api_path)) != api_error::success) {
return res;
}
remove_upload(api_path);
close_all(api_path);
if (not utils::file::retry_delete_file(fsi.source_path)) {
utils::error::raise_api_path_error(
__FUNCTION__, fsi.api_path, fsi.source_path,
utils::get_last_error_code(), "failed to delete source");
return api_error::success;
}
return api_error::success;
}
void file_manager::remove_resume(const std::string &api_path,
const std::string &source_path) {
auto res = db_->Delete(rocksdb::WriteOptions(), default_family_, api_path);
if (res.ok()) {
event_system::instance().raise<download_stored_removed>(api_path,
source_path);
} else {
utils::error::raise_api_path_error(__FUNCTION__, api_path, source_path,
res.code(),
"failed to remove resume entry");
}
}
void file_manager::remove_upload(const std::string &api_path) {
remove_upload(api_path, false);
}
void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
if (provider_.is_direct_only()) {
return;
}
std::unique_ptr<mutex_lock> l;
if (not no_lock) {
l = std::make_unique<mutex_lock>(upload_mtx_);
}
auto it = upload_lookup_.find(api_path);
if (it == upload_lookup_.end()) {
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
const auto parts = utils::string::split(iterator->key().ToString(), ':');
if (parts[1U] == api_path) {
if (db_->Delete(rocksdb::WriteOptions(), upload_family_,
iterator->key())
.ok()) {
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
iterator->key());
}
event_system::instance().raise<file_upload_removed>(
api_path, iterator->value().ToString());
}
}
} else {
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
const auto parts = utils::string::split(iterator->key().ToString(), ':');
if (parts[1U] == api_path) {
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
iterator->key());
}
}
event_system::instance().raise<file_upload_removed>(
api_path, it->second->get_source_path());
it->second->cancel();
upload_lookup_.erase(api_path);
}
if (not no_lock) {
upload_notify_.notify_all();
}
}
void file_manager::swap_renamed_items(std::string from_api_path,
std::string to_api_path) {
const auto it = open_file_lookup_.find(from_api_path);
if (it != open_file_lookup_.end()) {
open_file_lookup_[to_api_path] = open_file_lookup_[from_api_path];
open_file_lookup_.erase(from_api_path);
open_file_lookup_[to_api_path]->set_api_path(to_api_path);
}
}
auto file_manager::rename_directory(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
unique_recur_mutex_lock l(open_file_mtx_);
// Ensure source directory exists
bool exists{};
auto res = provider_.is_directory(from_api_path, exists);
if (res != api_error::success) {
return res;
}
if (not exists) {
return api_error::directory_not_found;
}
// Ensure destination directory does not exist
res = provider_.is_directory(to_api_path, exists);
if (res != api_error::success) {
return res;
}
if (exists) {
return api_error::directory_exists;
}
// Ensure destination is not a file
res = provider_.is_file(from_api_path, exists);
if (res != api_error::success) {
return res;
}
if (exists) {
return api_error::item_exists;
}
// Create destination directory
res =
provider_.create_directory_clone_source_meta(from_api_path, to_api_path);
if (res != api_error::success) {
return res;
}
directory_item_list list{};
res = provider_.get_directory_items(from_api_path, list);
if (res != api_error::success) {
return res;
}
// Rename all items - directories MUST BE returned first
for (std::size_t i = 0U; (res == api_error::success) && (i < list.size());
i++) {
const auto &api_path = list[i].api_path;
if ((api_path != ".") && (api_path != "..")) {
const auto old_api_path = api_path;
const auto new_api_path =
utils::path::create_api_path(utils::path::combine(
to_api_path, {old_api_path.substr(from_api_path.size())}));
res = list[i].directory ? rename_directory(old_api_path, new_api_path)
: rename_file(old_api_path, new_api_path, false);
}
}
if (res != api_error::success) {
return res;
}
res = provider_.remove_directory(from_api_path);
if (res != api_error::success) {
return res;
}
swap_renamed_items(from_api_path, to_api_path);
return api_error::success;
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite)
-> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
// Don't rename if paths are the same
if (from_api_path == to_api_path) {
return api_error::item_exists;
}
unique_recur_mutex_lock l(open_file_mtx_);
// Don't rename if source is directory
bool exists{};
auto res = provider_.is_directory(from_api_path, exists);
if (res != api_error::success) {
return res;
}
if (exists) {
return api_error::directory_exists;
}
// Don't rename if source does not exist
res = provider_.is_file(from_api_path, exists);
if (res != api_error::success) {
return res;
}
if (not exists) {
return api_error::item_not_found;
}
// Don't rename if destination is directory
res = provider_.is_directory(to_api_path, exists);
if (res != api_error::success) {
return res;
}
if (exists) {
res = api_error::directory_exists;
}
// Check allow overwrite if file exists
bool dest_exists{};
res = provider_.is_file(to_api_path, dest_exists);
if (res != api_error::success) {
return res;
}
if (not overwrite && dest_exists) {
return api_error::item_exists;
}
// Don't rename if destination file has open handles
if (get_open_file_count(to_api_path) != 0U) {
return api_error::file_in_use;
}
// Don't rename if destination file is uploading or downloading
if (is_processing(to_api_path)) {
return api_error::file_in_use;
}
// Handle destination file exists (should overwrite)
if (dest_exists) {
filesystem_item fsi{};
res = provider_.get_filesystem_item(to_api_path, false, fsi);
if (res != api_error::success) {
return res;
}
std::uint64_t file_size{};
if (not utils::file::get_file_size(fsi.source_path, file_size)) {
return api_error::os_error;
}
res = provider_.remove_file(to_api_path);
if ((res == api_error::success) || (res == api_error::item_not_found)) {
if (not utils::file::retry_delete_file(fsi.source_path)) {
utils::error::raise_api_path_error(
__FUNCTION__, fsi.api_path, fsi.source_path,
utils::get_last_error_code(), "failed to delete source path");
}
return handle_file_rename(from_api_path, to_api_path);
}
return res;
}
// Check destination parent directory exists
res = provider_.is_directory(utils::path::get_parent_api_path(to_api_path),
exists);
if (res != api_error::success) {
return res;
}
if (not exists) {
return api_error::directory_not_found;
}
return handle_file_rename(from_api_path, to_api_path);
}
void file_manager::start() {
if (provider_.is_direct_only()) {
stop_requested_ = false;
return;
}
if (not upload_thread_) {
stop_requested_ = false;
struct active_item {
std::string api_path;
std::string source_path;
};
std::vector<active_item> active_items{};
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
const auto parts = utils::string::split(iterator->key().ToString(), ':');
active_items.emplace_back(
active_item{parts[1U], iterator->value().ToString()});
}
for (const auto &active_item : active_items) {
queue_upload(active_item.api_path, active_item.source_path, false);
}
active_items.clear();
iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), default_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
std::string api_path;
std::string source_path;
std::size_t chunk_size{};
boost::dynamic_bitset<> read_state;
restore_resume_entry(json::parse(iterator->value().ToString()), api_path,
chunk_size, read_state, source_path);
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res == api_error::success) {
if (source_path == fsi.source_path) {
std::uint64_t file_size{};
if (utils::file::get_file_size(fsi.source_path, file_size)) {
if (file_size == fsi.size) {
auto f = std::make_shared<open_file>(
chunk_size,
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, read_state, *this);
open_file_lookup_[api_path] = f;
event_system::instance().raise<download_restored>(
fsi.api_path, fsi.source_path);
} else {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"file size mismatch|expected|" + std::to_string(fsi.size) +
"|actual|" + std::to_string(file_size));
}
} else {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"failed to get file size: " +
std::to_string(utils::get_last_error_code()));
}
} else {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"source path mismatch|expected|" + source_path + "|actual|" +
fsi.source_path);
}
} else {
event_system::instance().raise<download_restore_failed>(
api_path, source_path,
"failed to get filesystem item|" + api_error_to_string(res));
}
}
upload_thread_ =
std::make_unique<std::thread>([this] { upload_handler(); });
polling::instance().set_callback(
{"timed_out_close", polling::frequency::second,
[this]() { this->close_timed_out_files(); }});
event_system::instance().raise<service_started>("file_manager");
}
}
void file_manager::stop() {
if (not stop_requested_) {
event_system::instance().raise<service_shutdown_begin>("file_manager");
polling::instance().remove_callback("timed_out_close");
stop_requested_ = true;
unique_mutex_lock upload_lock(upload_mtx_);
upload_notify_.notify_all();
upload_lock.unlock();
if (upload_thread_) {
upload_thread_->join();
upload_thread_.reset();
}
open_file_lookup_.clear();
open_handle_lookup_.clear();
upload_lock.lock();
for (auto &kv : upload_lookup_) {
kv.second->stop();
}
upload_notify_.notify_all();
upload_lock.unlock();
while (not upload_lookup_.empty()) {
upload_lock.lock();
if (not upload_lookup_.empty()) {
upload_notify_.wait_for(upload_lock, 1s);
}
upload_notify_.notify_all();
upload_lock.unlock();
}
event_system::instance().raise<service_shutdown_end>("file_manager");
}
}
void file_manager::store_resume(const i_open_file &o) {
if (provider_.is_direct_only()) {
return;
}
recur_mutex_lock open_lock(open_file_mtx_);
const auto res = db_->Put(rocksdb::WriteOptions(), default_family_,
o.get_api_path(), create_resume_entry(o).dump());
if (res.ok()) {
event_system::instance().raise<download_stored>(o.get_api_path(),
o.get_source_path());
} else {
event_system::instance().raise<download_stored_failed>(
o.get_api_path(), o.get_source_path(), res.ToString());
}
}
void file_manager::upload_completed(const file_upload_completed &e) {
unique_mutex_lock upload_lock(upload_mtx_);
if (not utils::string::to_bool(e.get_cancelled().get<std::string>())) {
const auto err = api_error_from_string(e.get_result().get<std::string>());
switch (err) {
case api_error::success: {
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_active_family_));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
const auto parts =
utils::string::split(iterator->key().ToString(), ':');
if (parts[1U] == e.get_api_path().get<std::string>()) {
db_->Delete(rocksdb::WriteOptions(), upload_active_family_,
iterator->key());
break;
}
}
} break;
case api_error::upload_stopped: {
event_system::instance().raise<file_upload_retry>(e.get_api_path(),
e.get_source(), err);
queue_upload(e.get_api_path(), e.get_source(), true);
upload_notify_.wait_for(upload_lock, 5s);
} break;
default: {
bool exists{};
auto res = provider_.is_file(e.get_api_path(), exists);
if ((res == api_error::success && not exists) ||
not utils::file::is_file(e.get_source())) {
event_system::instance().raise<file_upload_not_found>(e.get_api_path(),
e.get_source());
remove_upload(e.get_api_path(), true);
return;
}
event_system::instance().raise<file_upload_retry>(e.get_api_path(),
e.get_source(), err);
queue_upload(e.get_api_path(), e.get_source(), true);
upload_notify_.wait_for(upload_lock, 5s);
} break;
}
upload_lookup_.erase(e.get_api_path());
}
upload_notify_.notify_all();
}
void file_manager::upload_handler() {
while (not stop_requested_) {
unique_mutex_lock upload_lock(upload_mtx_);
if (not stop_requested_) {
if (upload_lookup_.size() < config_.get_max_upload_count()) {
auto iterator = std::unique_ptr<rocksdb::Iterator>(
db_->NewIterator(rocksdb::ReadOptions(), upload_family_));
iterator->SeekToFirst();
if (iterator->Valid()) {
const auto parts =
utils::string::split(iterator->key().ToString(), ':');
const auto api_path = parts[1u];
const auto source_path = iterator->value().ToString();
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
switch (res) {
case api_error::item_not_found: {
event_system::instance().raise<file_upload_not_found>(api_path,
source_path);
remove_upload(parts[1u], true);
} break;
case api_error::success: {
upload_lookup_[fsi.api_path] =
std::make_unique<upload>(fsi, provider_);
if (db_->Delete(rocksdb::WriteOptions(), upload_family_,
iterator->key())
.ok()) {
db_->Put(rocksdb::WriteOptions(), upload_active_family_,
iterator->key(), iterator->value());
}
} break;
default: {
event_system::instance().raise<file_upload_retry>(api_path,
source_path, res);
queue_upload(api_path, source_path, true);
upload_notify_.wait_for(upload_lock, 5s);
} break;
}
} else {
iterator.release();
upload_notify_.wait(upload_lock);
}
} else {
upload_notify_.wait(upload_lock);
}
}
upload_notify_.notify_all();
}
}
void file_manager::update_used_space(std::uint64_t &used_space) const {
recur_mutex_lock open_lock(open_file_mtx_);
for (const auto &of : open_file_lookup_) {
std::uint64_t file_size{};
auto res = provider_.get_file_size(of.second->get_api_path(), file_size);
if ((res == api_error::success) &&
(file_size != of.second->get_file_size()) &&
(used_space >= file_size)) {
used_space -= file_size;
used_space += of.second->get_file_size();
}
}
}
} // namespace repertory

View File

@ -0,0 +1,596 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "file_manager/events.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path_utils.hpp"
#include "utils/unix/unix_utils.hpp"
#include "utils/utils.hpp"
namespace repertory {
file_manager::open_file::open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
i_upload_manager &um)
: open_file(chunk_size, chunk_timeout, fsi, {}, provider, std::nullopt,
um) {}
file_manager::open_file::open_file(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider,
i_upload_manager &um)
: open_file(chunk_size, chunk_timeout, fsi, open_data, provider,
std::nullopt, um) {}
file_manager::open_file::open_file(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider, std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &um)
: open_file(chunk_size, chunk_timeout, fsi, {}, provider, read_state, um) {}
file_manager::open_file::open_file(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state, i_upload_manager &um)
: open_file_base(chunk_size, chunk_timeout, fsi, open_data, provider),
um_(um) {
if (fsi_.directory && read_state.has_value()) {
throw startup_exception("cannot resume a directory|" + fsi.api_path);
}
if (not fsi.directory) {
set_api_error(native_file::create_or_open(
fsi.source_path, not provider_.is_direct_only(), nf_));
if (get_api_error() == api_error::success) {
if (read_state.has_value()) {
modified_ = true;
um_.store_resume(*this);
read_state_ = read_state.value();
} else if (fsi_.size > 0u) {
read_state_.resize(static_cast<std::size_t>(utils::divide_with_ceiling(
fsi_.size, chunk_size)),
false);
std::uint64_t file_size{};
if (nf_->get_file_size(file_size)) {
if (provider_.is_direct_only() || file_size == fsi.size) {
read_state_.set(0u, read_state_.size(), true);
} else if (not nf_->truncate(fsi.size)) {
set_api_error(api_error::os_error);
}
} else {
set_api_error(api_error::os_error);
}
}
if (get_api_error() != api_error::success && nf_) {
nf_->close();
}
}
}
}
file_manager::open_file::~open_file() { close(); }
void file_manager::open_file::download_chunk(std::size_t chunk,
bool skip_active,
bool should_reset) {
if (should_reset) {
reset_timeout();
}
unique_recur_mutex_lock file_lock(file_mtx_);
if ((get_api_error() == api_error::success) && (chunk < read_state_.size()) &&
not read_state_[chunk]) {
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (not skip_active) {
auto active_download = active_downloads_.at(chunk);
file_lock.unlock();
active_download->wait();
}
return;
}
const auto data_offset = chunk * chunk_size_;
const auto data_size =
(chunk == read_state_.size() - 1u) ? last_chunk_size_ : chunk_size_;
if (active_downloads_.empty() && (read_state_.count() == 0u)) {
event_system::instance().raise<download_begin>(fsi_.api_path,
fsi_.source_path);
}
event_system::instance().raise<download_chunk_begin>(
fsi_.api_path, fsi_.source_path, chunk, read_state_.size(),
read_state_.count());
active_downloads_[chunk] = std::make_shared<download>();
file_lock.unlock();
if (should_reset) {
reset_timeout();
}
std::async(std::launch::async, [this, chunk, data_size, data_offset,
should_reset]() {
const auto notify_complete = [this, chunk, should_reset]() {
unique_recur_mutex_lock file_lock(file_mtx_);
auto active_download = active_downloads_.at(chunk);
active_downloads_.erase(chunk);
event_system::instance().raise<download_chunk_end>(
fsi_.api_path, fsi_.source_path, chunk, read_state_.size(),
read_state_.count(), get_api_error());
if (get_api_error() == api_error::success) {
auto progress = (static_cast<double>(read_state_.count()) /
static_cast<double>(read_state_.size()) * 100.0);
event_system::instance().raise<download_progress>(
fsi_.api_path, fsi_.source_path, progress);
if (read_state_.all() && not notified_) {
notified_ = true;
event_system::instance().raise<download_end>(
fsi_.api_path, fsi_.source_path, get_api_error());
}
} else if (not notified_) {
notified_ = true;
event_system::instance().raise<download_end>(
fsi_.api_path, fsi_.source_path, get_api_error());
}
file_lock.unlock();
active_download->notify(get_api_error());
if (should_reset) {
reset_timeout();
}
};
data_buffer data;
auto res = provider_.read_file_bytes(get_api_path(), data_size,
data_offset, data, stop_requested_);
if (res != api_error::success) {
set_api_error(res);
notify_complete();
return;
}
if (should_reset) {
reset_timeout();
}
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (not nf_->write_bytes(data.data(), data.size(), data_offset,
bytes_written)) {
return api_error::os_error;
}
if (should_reset) {
reset_timeout();
}
return api_error::success;
});
if (res != api_error::success) {
set_api_error(res);
notify_complete();
return;
}
unique_recur_mutex_lock file_lock(file_mtx_);
read_state_.set(chunk);
file_lock.unlock();
notify_complete();
}).wait();
}
}
void file_manager::open_file::download_range(
std::size_t start_chunk_index, std::size_t end_chunk_index_inclusive,
bool should_reset) {
for (std::size_t chunk = start_chunk_index;
chunk <= end_chunk_index_inclusive; chunk++) {
download_chunk(chunk, false, should_reset);
if (get_api_error() != api_error::success) {
return;
}
}
}
auto file_manager::open_file::get_read_state() const
-> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
return read_state_;
}
auto file_manager::open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return read_state_[chunk];
}
auto file_manager::open_file::is_complete() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return read_state_.all();
}
auto file_manager::open_file::native_operation(
const i_open_file::native_operation_callback &operation) -> api_error {
unique_recur_mutex_lock file_lock(file_mtx_);
if (stop_requested_) {
return api_error::download_stopped;
}
file_lock.unlock();
return do_io([&]() -> api_error { return operation(nf_->get_handle()); });
}
auto file_manager::open_file::native_operation(
std::uint64_t new_file_size,
const i_open_file::native_operation_callback &operation) -> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
unique_recur_mutex_lock file_lock(file_mtx_);
if (stop_requested_) {
return api_error::download_stopped;
}
file_lock.unlock();
const auto is_empty_file = new_file_size == 0u;
const auto last_chunk =
is_empty_file ? std::size_t(0u)
: static_cast<std::size_t>(utils::divide_with_ceiling(
new_file_size, chunk_size_)) -
1u;
file_lock.lock();
if (not is_empty_file && (last_chunk < read_state_.size())) {
file_lock.unlock();
update_background_reader(0u);
download_chunk(last_chunk, false, true);
if (get_api_error() != api_error::success) {
return get_api_error();
}
file_lock.lock();
}
const auto original_file_size = get_file_size();
auto res = do_io([&]() -> api_error { return operation(nf_->get_handle()); });
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, get_api_path(),
utils::get_last_error_code(),
"failed to allocate file");
return res;
}
{
std::uint64_t file_size{};
if (not nf_->get_file_size(file_size)) {
utils::error::raise_api_path_error(__FUNCTION__, get_api_path(),
utils::get_last_error_code(),
"failed to get file size");
return set_api_error(api_error::os_error);
}
if (file_size != new_file_size) {
utils::error::raise_api_path_error(
__FUNCTION__, get_api_path(), api_error::file_size_mismatch,
"allocated file size mismatch|expected|" +
std::to_string(new_file_size) + "|actual|" +
std::to_string(file_size));
return set_api_error(api_error::error);
}
}
if (is_empty_file || (read_state_.size() != (last_chunk + 1u))) {
read_state_.resize(is_empty_file ? 0u : last_chunk + 1u);
if (not is_empty_file) {
read_state_[last_chunk] = true;
}
last_chunk_size_ = static_cast<std::size_t>(
new_file_size <= chunk_size_ ? new_file_size
: new_file_size % chunk_size_ ? new_file_size % chunk_size_
: chunk_size_);
}
if (original_file_size != new_file_size) {
if (not modified_) {
um_.store_resume(*this);
}
modified_ = true;
um_.remove_upload(get_api_path());
fsi_.size = new_file_size;
const auto now = std::to_string(utils::get_file_time_now());
res = provider_.set_item_meta(
fsi_.api_path, {
{META_CHANGED, now},
{META_MODIFIED, now},
{META_SIZE, std::to_string(new_file_size)},
{META_WRITTEN, now},
});
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, get_api_path(), res,
"failed to set file meta");
return set_api_error(res);
}
}
return res;
}
auto file_manager::open_file::read(std::size_t read_size,
std::uint64_t read_offset, data_buffer &data)
-> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
read_size =
utils::calculate_read_size(get_file_size(), read_size, read_offset);
if (read_size == 0u) {
return api_error::success;
}
const auto read_from_source = [this, &data, &read_offset,
&read_size]() -> api_error {
return do_io([this, &data, &read_offset, &read_size]() -> api_error {
if (provider_.is_direct_only()) {
return provider_.read_file_bytes(fsi_.api_path, read_size, read_offset,
data, stop_requested_);
}
data.resize(read_size);
std::size_t bytes_read{};
return nf_->read_bytes(data.data(), read_size, read_offset, bytes_read)
? api_error::success
: api_error::os_error;
});
};
unique_recur_mutex_lock file_lock(file_mtx_);
if (read_state_.all()) {
reset_timeout();
return read_from_source();
}
file_lock.unlock();
const auto start_chunk_index =
static_cast<std::size_t>(read_offset / chunk_size_);
const auto end_chunk_index =
static_cast<std::size_t>((read_size + read_offset) / chunk_size_);
update_background_reader(start_chunk_index);
download_range(start_chunk_index, end_chunk_index, true);
if (get_api_error() != api_error::success) {
return get_api_error();
}
file_lock.lock();
return get_api_error() == api_error::success ? read_from_source()
: get_api_error();
}
void file_manager::open_file::remove(std::uint64_t handle) {
recur_mutex_lock file_lock(file_mtx_);
open_file_base::remove(handle);
if (modified_ && read_state_.all() &&
(get_api_error() == api_error::success)) {
um_.queue_upload(*this);
modified_ = false;
}
}
auto file_manager::open_file::resize(std::uint64_t new_file_size) -> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
return native_operation(
new_file_size, [this, &new_file_size](native_handle) -> api_error {
return nf_->truncate(new_file_size) ? api_error::success
: api_error::os_error;
});
}
auto file_manager::open_file::close() -> bool {
if (not fsi_.directory && not stop_requested_) {
stop_requested_ = true;
unique_mutex_lock reader_lock(io_thread_mtx_);
io_thread_notify_.notify_all();
reader_lock.unlock();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
if (open_file_base::close()) {
{
const auto err = get_api_error();
if (err == api_error::success ||
err == api_error::download_incomplete ||
err == api_error::download_stopped) {
if (modified_ && not read_state_.all()) {
set_api_error(api_error::download_incomplete);
} else if (not modified_ && (fsi_.size > 0u) &&
not read_state_.all()) {
set_api_error(api_error::download_stopped);
}
}
}
nf_->close();
nf_.reset();
if (modified_ && (get_api_error() == api_error::success)) {
um_.queue_upload(*this);
} else if (modified_ &&
(get_api_error() == api_error::download_incomplete)) {
um_.store_resume(*this);
} else if (get_api_error() != api_error::success) {
um_.remove_resume(get_api_path(), get_source_path());
if (not utils::file::retry_delete_file(fsi_.source_path)) {
utils::error::raise_api_path_error(
__FUNCTION__, get_api_path(), fsi_.source_path,
utils::get_last_error_code(), "failed to delete file");
}
auto parent = utils::path::remove_file_name(fsi_.source_path);
fsi_.source_path =
utils::path::combine(parent, {utils::create_uuid_string()});
const auto res = provider_.set_item_meta(fsi_.api_path, META_SOURCE,
fsi_.source_path);
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, get_api_path(),
fsi_.source_path, res,
"failed to set file meta");
}
}
}
return true;
}
return false;
}
void file_manager::open_file::update_background_reader(std::size_t read_chunk) {
recur_mutex_lock file_lock(file_mtx_);
read_chunk_index_ = read_chunk;
if (not reader_thread_ && not stop_requested_) {
reader_thread_ = std::make_unique<std::thread>([this]() {
auto next_chunk = 0u;
while (not stop_requested_) {
unique_recur_mutex_lock file_lock(file_mtx_);
if ((fsi_.size == 0u) || read_state_.all()) {
file_lock.unlock();
unique_mutex_lock io_lock(io_thread_mtx_);
if (not stop_requested_ && io_thread_queue_.empty()) {
io_thread_notify_.wait(io_lock);
}
io_thread_notify_.notify_all();
io_lock.unlock();
} else {
do {
next_chunk = read_chunk_index_ =
((read_chunk_index_ + 1u) >= read_state_.size())
? 0u
: read_chunk_index_ + 1u;
} while ((next_chunk != 0u) && (active_downloads_.find(next_chunk) !=
active_downloads_.end()));
file_lock.unlock();
download_chunk(next_chunk, true, false);
}
}
});
}
}
auto file_manager::open_file::write(std::uint64_t write_offset,
const data_buffer &data,
std::size_t &bytes_written) -> api_error {
bytes_written = 0u;
if (fsi_.directory || provider_.is_direct_only()) {
return api_error::invalid_operation;
}
if (data.empty()) {
return api_error::success;
}
unique_recur_mutex_lock file_lock(file_mtx_);
if (stop_requested_) {
return api_error::download_stopped;
}
file_lock.unlock();
const auto start_chunk_index =
static_cast<std::size_t>(write_offset / chunk_size_);
const auto end_chunk_index =
static_cast<std::size_t>((write_offset + data.size()) / chunk_size_);
update_background_reader(start_chunk_index);
download_range(start_chunk_index,
std::min(read_state_.size() - 1u, end_chunk_index), true);
if (get_api_error() != api_error::success) {
return get_api_error();
}
file_lock.lock();
if ((write_offset + data.size()) > fsi_.size) {
auto res = resize(write_offset + data.size());
if (res != api_error::success) {
return res;
}
}
auto res = do_io([&]() -> api_error {
if (not nf_->write_bytes(data.data(), data.size(), write_offset,
bytes_written)) {
return api_error::os_error;
}
reset_timeout();
return api_error::success;
});
if (res != api_error::success) {
return set_api_error(res);
}
const auto now = std::to_string(utils::get_file_time_now());
res = provider_.set_item_meta(fsi_.api_path, {
{META_CHANGED, now},
{META_MODIFIED, now},
{META_WRITTEN, now},
});
if (res != api_error::success) {
utils::error::raise_api_path_error(__FUNCTION__, get_api_path(), res,
"failed to set file meta");
return set_api_error(res);
}
if (not modified_) {
um_.store_resume(*this);
}
modified_ = true;
um_.remove_upload(get_api_path());
return api_error::success;
}
} // namespace repertory

View File

@ -0,0 +1,242 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "utils/path_utils.hpp"
namespace repertory {
file_manager::open_file_base::open_file_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, {}, provider) {}
file_manager::open_file_base::open_file_base(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider)
: chunk_size_(chunk_size),
chunk_timeout_(chunk_timeout),
fsi_(std::move(fsi)),
last_chunk_size_(static_cast<std::size_t>(
fsi.size <= chunk_size ? fsi.size
: fsi.size % chunk_size ? fsi.size % chunk_size
: chunk_size)),
open_data_(std::move(open_data)),
provider_(provider) {
if (not fsi.directory) {
io_thread_ = std::make_unique<std::thread>([this] { file_io_thread(); });
}
}
void file_manager::open_file_base::add(std::uint64_t handle,
open_file_data ofd) {
recur_mutex_lock file_lock(file_mtx_);
open_data_[handle] = ofd;
if (open_data_.size() == 1u) {
event_system::instance().raise<filesystem_item_opened>(
fsi_.api_path, fsi_.source_path, fsi_.directory);
}
event_system::instance().raise<filesystem_item_handle_opened>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory);
}
auto file_manager::open_file_base::can_close() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
if (fsi_.directory) {
return true;
}
if (not open_data_.empty()) {
return false;
}
if (modified_) {
return false;
}
if (get_api_error() != api_error::success) {
return true;
}
if (is_download_complete()) {
return true;
}
const std::chrono::system_clock::time_point last_access = last_access_;
const auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - last_access);
return (duration.count() >= chunk_timeout_);
}
auto file_manager::open_file_base::do_io(std::function<api_error()> action)
-> api_error {
unique_mutex_lock io_lock(io_thread_mtx_);
auto item = std::make_shared<io_item>(action);
io_thread_queue_.emplace_back(item);
io_thread_notify_.notify_all();
io_lock.unlock();
return item->get_result();
}
void file_manager::open_file_base::file_io_thread() {
unique_mutex_lock io_lock(io_thread_mtx_);
io_thread_notify_.notify_all();
io_lock.unlock();
const auto process_queue = [&]() {
io_lock.lock();
if (not io_stop_requested_ && io_thread_queue_.empty()) {
io_thread_notify_.wait(io_lock);
}
while (not io_thread_queue_.empty()) {
auto *item = io_thread_queue_.front().get();
io_thread_notify_.notify_all();
io_lock.unlock();
item->action();
io_lock.lock();
io_thread_queue_.pop_front();
}
io_thread_notify_.notify_all();
io_lock.unlock();
};
while (not io_stop_requested_) {
process_queue();
}
process_queue();
}
auto file_manager::open_file_base::get_api_error() const -> api_error {
mutex_lock error_lock(error_mtx_);
return error_;
}
auto file_manager::open_file_base::get_api_path() const -> std::string {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.api_path;
}
auto file_manager::open_file_base::get_file_size() const -> std::uint64_t {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.size;
}
auto file_manager::open_file_base::get_filesystem_item() const
-> filesystem_item {
recur_mutex_lock file_lock(file_mtx_);
return fsi_;
}
auto file_manager::open_file_base::get_handles() const
-> std::vector<std::uint64_t> {
recur_mutex_lock file_lock(file_mtx_);
std::vector<std::uint64_t> ret;
for (const auto &kv : open_data_) {
ret.emplace_back(kv.first);
}
return ret;
}
auto file_manager::open_file_base::get_open_data() const
-> std::map<std::uint64_t, open_file_data> {
recur_mutex_lock file_lock(file_mtx_);
return open_data_;
}
auto file_manager::open_file_base::get_open_data(std::uint64_t handle) const
-> open_file_data {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.at(handle);
}
auto file_manager::open_file_base::get_open_file_count() const -> std::size_t {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.size();
}
auto file_manager::open_file_base::is_modified() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return modified_;
}
void file_manager::open_file_base::remove(std::uint64_t handle) {
recur_mutex_lock file_lock(file_mtx_);
open_data_.erase(handle);
event_system::instance().raise<filesystem_item_handle_closed>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory, modified_);
if (open_data_.empty()) {
event_system::instance().raise<filesystem_item_closed>(
fsi_.api_path, fsi_.source_path, fsi_.directory, modified_);
}
}
void file_manager::open_file_base::reset_timeout() {
last_access_ = std::chrono::system_clock::now();
}
auto file_manager::open_file_base::set_api_error(const api_error &e)
-> api_error {
mutex_lock error_lock(error_mtx_);
if (error_ != e) {
return ((error_ = (error_ == api_error::success ||
error_ == api_error::download_incomplete ||
error_ == api_error::download_stopped
? e
: error_)));
}
return error_;
}
void file_manager::open_file_base::set_api_path(const std::string &api_path) {
recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path = api_path;
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
}
auto file_manager::open_file_base::close() -> bool {
unique_mutex_lock io_lock(io_thread_mtx_);
if (not fsi_.directory && not io_stop_requested_) {
io_stop_requested_ = true;
io_thread_notify_.notify_all();
io_lock.unlock();
if (io_thread_) {
io_thread_->join();
io_thread_.reset();
return true;
}
}
io_thread_notify_.notify_all();
io_lock.unlock();
return false;
}
} // namespace repertory

View File

@ -0,0 +1,43 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
namespace repertory {
void file_manager::open_file_base::download::notify(const api_error &e) {
complete_ = true;
error_ = e;
unique_mutex_lock lock(mtx_);
notify_.notify_all();
}
auto file_manager::open_file_base::download::wait() -> api_error {
if (not complete_) {
unique_mutex_lock lock(mtx_);
if (not complete_) {
notify_.wait(lock);
}
notify_.notify_all();
}
return error_;
}
} // namespace repertory

View File

@ -0,0 +1,41 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
namespace repertory {
void file_manager::open_file_base::io_item::action() {
result_ = action_();
mutex_lock lock(mtx_);
notify_.notify_all();
}
auto file_manager::open_file_base::io_item::get_result() -> api_error {
unique_mutex_lock lock(mtx_);
if (result_.has_value()) {
return result_.value();
}
notify_.wait(lock);
return result_.value();
}
} // namespace repertory

View File

@ -0,0 +1,317 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "app_config.hpp"
#include "file_manager/events.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/encrypting_reader.hpp"
#include "utils/file_utils.hpp"
#include "utils/path_utils.hpp"
#include "utils/unix/unix_utils.hpp"
#include "utils/utils.hpp"
namespace repertory {
file_manager::ring_buffer_open_file::ring_buffer_open_file(
std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider)
: ring_buffer_open_file(std::move(buffer_directory), chunk_size,
chunk_timeout, std::move(fsi), provider,
(1024ull * 1024ull * 1024ull) / chunk_size) {}
file_manager::ring_buffer_open_file::ring_buffer_open_file(
std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider,
std::size_t ring_size)
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size_))) {
if (ring_size % 2u) {
throw std::runtime_error("ring size must be a multiple of 2");
}
if (ring_size < 4u) {
throw std::runtime_error("ring size must be greater than or equal to 4");
}
if (fsi.size < (ring_state_.size() * chunk_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
last_chunk_ = ring_state_.size() - 1u;
ring_state_.set(0u, ring_state_.size(), true);
buffer_directory = utils::path::absolute(buffer_directory);
if (not utils::file::create_full_directory_path(buffer_directory)) {
throw std::runtime_error("failed to create buffer directory|path|" +
buffer_directory + "|err|" +
std::to_string(utils::get_last_error_code()));
}
fsi_.source_path =
utils::path::combine(buffer_directory, {utils::create_uuid_string()});
auto res = native_file::create_or_open(fsi_.source_path, nf_);
if (res != api_error::success) {
throw std::runtime_error("failed to create buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * chunk_size)) {
nf_->close();
throw std::runtime_error("failed to resize buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
}
file_manager::ring_buffer_open_file::~ring_buffer_open_file() {
close();
nf_->close();
if (not utils::file::retry_delete_file(fsi_.source_path)) {
utils::error::raise_api_path_error(
__FUNCTION__, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to delete file");
}
}
auto file_manager::file_manager::ring_buffer_open_file::download_chunk(
std::size_t chunk) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
if (active_downloads_.find(chunk) != active_downloads_.end()) {
auto active_download = active_downloads_.at(chunk);
chunk_notify_.notify_all();
chunk_lock.unlock();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
auto active_download = std::make_shared<download>();
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all();
chunk_lock.unlock();
data_buffer buffer((chunk == (total_chunks_ - 1u)) ? last_chunk_size_
: chunk_size_);
stop_type stop_requested = !!ring_state_[chunk % ring_state_.size()];
auto res =
provider_.read_file_bytes(fsi_.api_path, buffer.size(),
chunk * chunk_size_, buffer, stop_requested);
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (not nf_->write_bytes(buffer.data(), buffer.size(),
(chunk % ring_state_.size()) * chunk_size_,
bytes_written)) {
return api_error::os_error;
}
return api_error::success;
});
}
active_download->notify(res);
chunk_lock.lock();
active_downloads_.erase(chunk);
chunk_notify_.notify_all();
return res;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
return api_error::success;
}
void file_manager::ring_buffer_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((current_chunk_ + count) > (total_chunks_ - 1u)) {
count = (total_chunks_ - 1u) - current_chunk_;
}
if ((current_chunk_ + count) <= last_chunk_) {
current_chunk_ += count;
} else {
const auto added = count - (last_chunk_ - current_chunk_);
if (added >= ring_state_.size()) {
ring_state_.set(0u, ring_state_.size(), true);
current_chunk_ += count;
first_chunk_ += added;
last_chunk_ =
std::min(total_chunks_ - 1u, first_chunk_ + ring_state_.size() - 1u);
} else {
for (std::size_t i = 0u; i < added; i++) {
ring_state_[(first_chunk_ + i) % ring_state_.size()] = true;
}
first_chunk_ += added;
current_chunk_ += count;
last_chunk_ =
std::min(total_chunks_ - 1u, first_chunk_ + ring_state_.size() - 1u);
}
}
chunk_notify_.notify_all();
}
auto file_manager::ring_buffer_open_file::get_read_state() const
-> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
return read_state.flip();
}
auto file_manager::ring_buffer_open_file::get_read_state(
std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
}
auto file_manager::ring_buffer_open_file::is_download_complete() const -> bool {
return false;
}
auto file_manager::ring_buffer_open_file::native_operation(
const i_open_file::native_operation_callback &operation) -> api_error {
return do_io([&]() -> api_error { return operation(nf_->get_handle()); });
}
void file_manager::ring_buffer_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if (current_chunk_ < count) {
count = current_chunk_;
}
if ((current_chunk_ - count) >= first_chunk_) {
current_chunk_ -= count;
} else {
const auto removed = count - (current_chunk_ - first_chunk_);
if (removed >= ring_state_.size()) {
ring_state_.set(0u, ring_state_.size(), true);
current_chunk_ -= count;
first_chunk_ = current_chunk_;
last_chunk_ =
std::min(total_chunks_ - 1u, first_chunk_ + ring_state_.size() - 1u);
} else {
for (std::size_t i = 0u; i < removed; i++) {
ring_state_[(last_chunk_ - i) % ring_state_.size()] = true;
}
first_chunk_ -= removed;
current_chunk_ -= count;
last_chunk_ =
std::min(total_chunks_ - 1u, first_chunk_ + ring_state_.size() - 1u);
}
}
chunk_notify_.notify_all();
}
auto file_manager::ring_buffer_open_file::read(std::size_t read_size,
std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
reset_timeout();
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
if (read_size == 0u) {
return api_error::success;
}
const auto start_chunk_index =
static_cast<std::size_t>(read_offset / chunk_size_);
read_offset = read_offset - (start_chunk_index * chunk_size_);
data_buffer buffer(chunk_size_);
auto res = api_error::success;
for (std::size_t chunk = start_chunk_index;
(res == api_error::success) && (read_size > 0u); chunk++) {
if (chunk > current_chunk_) {
forward(chunk - current_chunk_);
} else if (chunk < current_chunk_) {
reverse(current_chunk_ - chunk);
}
reset_timeout();
if ((res = download_chunk(chunk)) == api_error::success) {
const auto to_read = std::min(
static_cast<std::size_t>(chunk_size_ - read_offset), read_size);
res = do_io([this, &buffer, &chunk, &data, read_offset,
&to_read]() -> api_error {
std::size_t bytes_read{};
auto res = nf_->read_bytes(buffer.data(), buffer.size(),
((chunk % ring_state_.size()) * chunk_size_),
bytes_read)
? api_error::success
: api_error::os_error;
if (res == api_error::success) {
data.insert(data.end(), buffer.begin() + read_offset,
buffer.begin() + read_offset + to_read);
reset_timeout();
}
return res;
});
read_offset = 0u;
read_size -= to_read;
}
}
return res;
}
void file_manager::ring_buffer_open_file::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
first_chunk_ = first_chunk;
last_chunk_ = first_chunk_ + ring_state_.size() - 1u;
if (current_chunk > last_chunk_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
current_chunk_ = current_chunk;
ring_state_.set(0u, ring_state_.size(), false);
chunk_notify_.notify_all();
}
void file_manager::ring_buffer_open_file::set_api_path(
const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
} // namespace repertory

View File

@ -0,0 +1,62 @@
/*
Copyright <2018-2023> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "providers/i_provider.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/unix/unix_utils.hpp"
namespace repertory {
file_manager::upload::upload(filesystem_item fsi, i_provider &provider)
: fsi_(fsi), provider_(provider) {
thread_ =
std::make_unique<std::thread>(std::bind(&upload::upload_thread, this));
}
file_manager::upload::~upload() {
stop();
thread_->join();
thread_.reset();
}
void file_manager::upload::cancel() {
cancelled_ = true;
stop();
}
void file_manager::upload::stop() { stop_requested_ = true; }
void file_manager::upload::upload_thread() {
error_ = provider_.upload_file(fsi_.api_path, fsi_.source_path,
fsi_.encryption_token, stop_requested_);
if (not utils::file::reset_modified_time(fsi_.source_path)) {
utils::error::raise_api_path_error(
__FUNCTION__, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to reset modified time");
}
event_system::instance().raise<file_upload_completed>(
get_api_path(), get_source_path(), get_api_error(), cancelled_);
}
} // namespace repertory