This commit is contained in:
Scott E. Graves 2024-10-18 11:38:27 -05:00
parent a0a5ca3390
commit c216df9b73
12 changed files with 1960 additions and 1937 deletions

View File

@ -57,8 +57,8 @@ public:
open_file_base(const open_file_base &) noexcept = delete;
open_file_base(open_file_base &&) noexcept = delete;
auto operator=(open_file_base &&) noexcept -> open_file_base & = delete;
auto
operator=(const open_file_base &) noexcept -> open_file_base & = delete;
auto operator=(const open_file_base &) noexcept
-> open_file_base & = delete;
public:
class download final {
@ -168,17 +168,17 @@ public:
[[nodiscard]] auto get_filesystem_item() const -> filesystem_item override;
[[nodiscard]] auto
get_handles() const -> std::vector<std::uint64_t> override;
[[nodiscard]] auto get_handles() const
-> std::vector<std::uint64_t> override;
[[nodiscard]] auto
get_open_data() -> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data()
-> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto
get_open_data(std::uint64_t handle) -> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle)
-> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle) const
-> const open_file_data & override;
@ -267,8 +267,8 @@ public:
public:
auto close() -> bool override;
[[nodiscard]] auto
get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
@ -276,20 +276,20 @@ public:
auto is_write_supported() const -> bool override { return true; }
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto
native_operation(std::uint64_t new_file_size,
native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t new_file_size,
native_operation_callback callback)
-> api_error override;
void remove(std::uint64_t handle) override;
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto
resize(std::uint64_t new_file_size) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t new_file_size)
-> api_error override;
[[nodiscard]] auto write(std::uint64_t write_offset,
const data_buffer &data,
@ -356,8 +356,8 @@ public:
return last_chunk_;
}
[[nodiscard]] auto
get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
@ -369,12 +369,12 @@ public:
auto is_write_supported() const -> bool override { return false; }
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto
native_operation(std::uint64_t,
native_operation_callback) -> api_error override {
[[nodiscard]] auto native_operation(std::uint64_t,
native_operation_callback)
-> api_error override {
return api_error::not_supported;
}
@ -391,8 +391,8 @@ public:
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &,
std::size_t &) -> api_error override {
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
-> api_error override {
return api_error::not_supported;
}
};
@ -458,8 +458,7 @@ private:
i_provider &provider_;
private:
utils::db::sqlite::db3_t db_;
std::uint64_t next_handle_{0U};
std::atomic<std::uint64_t> next_handle_{0U};
mutable std::recursive_mutex open_file_mtx_;
std::unordered_map<std::string, std::shared_ptr<i_closeable_open_file>>
open_file_lookup_;
@ -470,6 +469,8 @@ private:
std::unique_ptr<std::thread> upload_thread_;
private:
[[nodiscard]] auto create_db() const -> utils::db::sqlite::db3_t;
void close_timed_out_files();
auto get_open_file_by_handle(std::uint64_t handle) const
@ -483,12 +484,22 @@ private:
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error;
void queue_upload(const std::string &api_path, const std::string &source_path,
bool no_lock);
bool no_lock, sqlite3 *db);
void remove_upload(const std::string &api_path, bool no_lock);
void remove_resume(const std::string &api_path,
const std::string &source_path, sqlite3 *db);
void remove_upload(const std::string &api_path, bool no_lock, sqlite3 *db);
[[nodiscard]] auto rename_directory(const std::string &from_api_path,
const std::string &to_api_path,
sqlite3 *db) -> api_error;
[[nodiscard]] auto rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite,
sqlite3 *db) -> api_error;
void swap_renamed_items(std::string from_api_path, std::string to_api_path,
bool directory);
bool directory, sqlite3 *db);
void upload_completed(const file_upload_completed &evt);
@ -498,7 +509,8 @@ public:
[[nodiscard]] auto get_next_handle() -> std::uint64_t;
auto handle_file_rename(const std::string &from_api_path,
const std::string &to_api_path) -> api_error;
const std::string &to_api_path, sqlite3 *db)
-> api_error;
void queue_upload(const i_open_file &file) override;
@ -537,8 +549,8 @@ public:
[[nodiscard]] auto has_no_open_file_handles() const -> bool override;
[[nodiscard]] auto
is_processing(const std::string &api_path) const -> bool override;
[[nodiscard]] auto is_processing(const std::string &api_path) const
-> bool override;
#if defined(PROJECT_TESTING)
[[nodiscard]] auto open(std::shared_ptr<i_closeable_open_file> of,
@ -551,13 +563,13 @@ public:
[[nodiscard]] auto remove_file(const std::string &api_path) -> api_error;
[[nodiscard]] auto
rename_directory(const std::string &from_api_path,
const std::string &to_api_path) -> api_error;
[[nodiscard]] auto rename_directory(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error;
[[nodiscard]] auto rename_file(const std::string &from_api_path,
const std::string &to_api_path,
bool overwrite) -> api_error;
const std::string &to_api_path, bool overwrite)
-> api_error;
void start();

View File

@ -25,7 +25,6 @@
#include "file_manager/events.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/common.hpp"
#include "utils/db/sqlite/db_common.hpp"
#include "utils/db/sqlite/db_delete.hpp"
@ -40,8 +39,8 @@
#include "utils/time.hpp"
namespace {
[[nodiscard]] auto
create_resume_entry(const repertory::i_open_file &file) -> json {
[[nodiscard]] auto create_resume_entry(const repertory::i_open_file &file)
-> json {
return {
{"chunk_size", file.get_chunk_size()},
{"path", file.get_api_path()},
@ -104,32 +103,6 @@ namespace repertory {
file_manager::file_manager(app_config &config, i_provider &provider)
: config_(config), provider_(provider) {
if (not provider_.is_direct_only()) {
auto db_path =
utils::path::combine(config.get_data_directory(), {"file_manager.db"});
sqlite3 *db3{nullptr};
auto res =
sqlite3_open_v2(db_path.c_str(), &db3,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (res != SQLITE_OK) {
throw startup_exception("failed to open db|" + db_path + '|' +
std::to_string(res) + '|' + sqlite3_errstr(res));
}
db_ = utils::db::sqlite::db3_t{
db3,
utils::db::sqlite::sqlite3_deleter(),
};
for (auto &&create_item : sql_create_tables) {
std::string err;
if (not utils::db::sqlite::execute_sql(*db_, create_item.second, err)) {
db_.reset();
throw startup_exception(err);
}
}
utils::db::sqlite::set_journal_mode(*db_);
E_SUBSCRIBE_EXACT(file_upload_completed,
[this](const file_upload_completed &completed) {
this->upload_completed(completed);
@ -139,7 +112,6 @@ file_manager::file_manager(app_config &config, i_provider &provider)
file_manager::~file_manager() {
stop();
db_.reset();
E_CONSUMER_RELEASE();
}
@ -194,6 +166,38 @@ void file_manager::close_timed_out_files() {
closeable_list.clear();
}
auto file_manager::create_db() const -> utils::db::sqlite::db3_t {
auto db_path =
utils::path::combine(config_.get_data_directory(), {"file_manager.db"});
sqlite3 *db3{nullptr};
auto db_res =
sqlite3_open_v2(db_path.c_str(), &db3,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (db_res != SQLITE_OK) {
throw std::runtime_error("failed to open db|" + db_path + '|' +
std::to_string(db_res) + '|' +
sqlite3_errstr(db_res));
}
auto db = utils::db::sqlite::db3_t{
db3,
utils::db::sqlite::sqlite3_deleter(),
};
for (auto &&create_item : sql_create_tables) {
std::string err;
if (not utils::db::sqlite::execute_sql(*db, create_item.second, err)) {
db.reset();
throw std::runtime_error(err);
}
}
utils::db::sqlite::set_journal_mode(*db);
return db;
}
auto file_manager::create(const std::string &api_path, api_meta_map &meta,
open_file_data ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file) -> api_error {
@ -271,7 +275,7 @@ auto file_manager::get_directory_items(const std::string &api_path) const
auto file_manager::get_next_handle() -> std::uint64_t {
if (++next_handle_ == 0U) {
next_handle_++;
++next_handle_;
}
return next_handle_;
@ -349,23 +353,27 @@ auto file_manager::get_open_handle_count() const -> std::size_t {
auto file_manager::get_stored_downloads() const -> std::vector<json> {
REPERTORY_USES_FUNCTION_NAME();
std::vector<json> ret;
if (not provider_.is_direct_only()) {
auto result = utils::db::sqlite::db_select{*db_.get(), resume_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not result.get_row(row)) {
continue;
}
if (not row.has_value()) {
continue;
}
if (provider_.is_direct_only()) {
return {};
}
ret.push_back(row.value().get_column("data").get_value_as_json());
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
auto db = create_db();
std::vector<json> ret;
auto result = utils::db::sqlite::db_select{*db.get(), resume_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not result.get_row(row)) {
continue;
}
if (not row.has_value()) {
continue;
}
ret.push_back(row.value().get_column("data").get_value_as_json());
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
@ -373,8 +381,8 @@ auto file_manager::get_stored_downloads() const -> std::vector<json> {
}
auto file_manager::handle_file_rename(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
const std::string &to_api_path,
sqlite3 *db) -> api_error {
std::string source_path{};
auto file_iter = open_file_lookup_.find(from_api_path);
if (file_iter != open_file_lookup_.end()) {
@ -387,7 +395,7 @@ auto file_manager::handle_file_rename(const std::string &from_api_path,
source_path = upload_lookup_.at(from_api_path)->get_source_path();
}
} else {
auto result = utils::db::sqlite::db_select{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_select{*db, upload_table}
.column("source_path")
.where("api_path")
.equals(from_api_path)
@ -399,22 +407,22 @@ auto file_manager::handle_file_rename(const std::string &from_api_path,
}
}
remove_upload(from_api_path);
remove_upload(from_api_path, true, db);
auto ret = provider_.rename_file(from_api_path, to_api_path);
if (ret != api_error::success) {
queue_upload(from_api_path, source_path, false);
queue_upload(from_api_path, source_path, false, db);
return ret;
}
swap_renamed_items(from_api_path, to_api_path, false);
swap_renamed_items(from_api_path, to_api_path, false, db);
ret = source_path.empty()
? api_error::success
: provider_.set_item_meta(to_api_path, META_SOURCE, source_path);
if (should_upload) {
queue_upload(to_api_path, source_path, false);
queue_upload(to_api_path, source_path, false, db);
}
return ret;
@ -435,7 +443,9 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
}
upload_lock.unlock();
utils::db::sqlite::db_select query{*db_.get(), upload_table};
auto db = create_db();
utils::db::sqlite::db_select query{*db.get(), upload_table};
if (query.where("api_path").equals(api_path).go().has_row()) {
return true;
};
@ -455,10 +465,11 @@ auto file_manager::open(const std::string &api_path, bool directory,
return open(api_path, directory, ofd, handle, file, nullptr);
}
auto file_manager::open(
const std::string &api_path, bool directory, const open_file_data &ofd,
std::uint64_t &handle, std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error {
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file)
-> api_error {
const auto create_and_add_handle =
[&](std::shared_ptr<i_closeable_open_file> cur_file) {
handle = get_next_handle();
@ -502,11 +513,14 @@ auto file_manager::open(
}
void file_manager::queue_upload(const i_open_file &file) {
return queue_upload(file.get_api_path(), file.get_source_path(), false);
auto db = create_db();
return queue_upload(file.get_api_path(), file.get_source_path(), false,
db.get());
}
void file_manager::queue_upload(const std::string &api_path,
const std::string &source_path, bool no_lock) {
const std::string &source_path, bool no_lock,
sqlite3 *db) {
if (provider_.is_direct_only()) {
return;
}
@ -515,10 +529,10 @@ void file_manager::queue_upload(const std::string &api_path,
if (not no_lock) {
lock = std::make_unique<mutex_lock>(upload_mtx_);
}
remove_upload(api_path, true);
remove_upload(api_path, true, db);
auto result =
utils::db::sqlite::db_insert{*db_.get(), upload_table}
utils::db::sqlite::db_insert{*db, upload_table}
.or_replace()
.column_value("api_path", api_path)
.column_value("date_time",
@ -526,7 +540,7 @@ void file_manager::queue_upload(const std::string &api_path,
.column_value("source_path", source_path)
.go();
if (result.ok()) {
remove_resume(api_path, source_path);
remove_resume(api_path, source_path, db);
event_system::instance().raise<file_upload_queued>(api_path, source_path);
} else {
event_system::instance().raise<file_upload_failed>(
@ -552,9 +566,10 @@ auto file_manager::remove_file(const std::string &api_path) -> api_error {
close_all(api_path);
remove_upload(api_path);
auto db = create_db();
remove_upload(api_path, true, db.get());
auto result = utils::db::sqlite::db_delete{*db_.get(), resume_table}
auto result = utils::db::sqlite::db_delete{*db.get(), resume_table}
.where("api_path")
.equals(api_path)
.go();
@ -580,7 +595,13 @@ auto file_manager::remove_file(const std::string &api_path) -> api_error {
void file_manager::remove_resume(const std::string &api_path,
const std::string &source_path) {
auto result = utils::db::sqlite::db_delete{*db_.get(), resume_table}
auto db = create_db();
return remove_resume(api_path, source_path, db.get());
}
void file_manager::remove_resume(const std::string &api_path,
const std::string &source_path, sqlite3 *db) {
auto result = utils::db::sqlite::db_delete{*db, resume_table}
.where("api_path")
.equals(api_path)
.go();
@ -591,10 +612,12 @@ void file_manager::remove_resume(const std::string &api_path,
}
void file_manager::remove_upload(const std::string &api_path) {
remove_upload(api_path, false);
auto db = create_db();
remove_upload(api_path, false, db.get());
}
void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
void file_manager::remove_upload(const std::string &api_path, bool no_lock,
sqlite3 *db) {
REPERTORY_USES_FUNCTION_NAME();
if (provider_.is_direct_only()) {
@ -606,7 +629,7 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
lock = std::make_unique<mutex_lock>(upload_mtx_);
}
auto result = utils::db::sqlite::db_delete{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_delete{*db, upload_table}
.where("api_path")
.equals(api_path)
.go();
@ -616,7 +639,7 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
"failed to remove from upload table");
}
result = utils::db::sqlite::db_delete{*db_.get(), upload_active_table}
result = utils::db::sqlite::db_delete{*db, upload_active_table}
.where("api_path")
.equals(api_path)
.go();
@ -643,6 +666,13 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
auto file_manager::rename_directory(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
auto db = create_db();
return rename_directory(from_api_path, to_api_path, db.get());
}
auto file_manager::rename_directory(const std::string &from_api_path,
const std::string &to_api_path, sqlite3 *db)
-> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
@ -698,8 +728,9 @@ auto file_manager::rename_directory(const std::string &from_api_path,
auto old_api_path = api_path;
auto new_api_path = utils::path::create_api_path(utils::path::combine(
to_api_path, {old_api_path.substr(from_api_path.size())}));
res = list[i].directory ? rename_directory(old_api_path, new_api_path)
: rename_file(old_api_path, new_api_path, false);
res = list[i].directory
? rename_directory(old_api_path, new_api_path, db)
: rename_file(old_api_path, new_api_path, false, db);
}
}
@ -712,13 +743,20 @@ auto file_manager::rename_directory(const std::string &from_api_path,
return res;
}
swap_renamed_items(from_api_path, to_api_path, true);
swap_renamed_items(from_api_path, to_api_path, true, db);
return api_error::success;
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path,
bool overwrite) -> api_error {
const std::string &to_api_path, bool overwrite)
-> api_error {
auto db = create_db();
return rename_file(from_api_path, to_api_path, overwrite, db.get());
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite,
sqlite3 *db) -> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
@ -786,12 +824,18 @@ auto file_manager::rename_file(const std::string &from_api_path,
}
}
return handle_file_rename(from_api_path, to_api_path);
return handle_file_rename(from_api_path, to_api_path, db);
}
void file_manager::start() {
REPERTORY_USES_FUNCTION_NAME();
if (upload_thread_) {
return;
}
stop_requested_ = false;
polling::instance().set_callback(
{"timed_out_close", polling::frequency::second,
[this]() { this->close_timed_out_files(); }});
@ -801,106 +845,104 @@ void file_manager::start() {
return;
}
if (not upload_thread_) {
stop_requested_ = false;
auto db = create_db();
struct active_item final {
std::string api_path;
std::string source_path;
};
std::vector<active_item> active_items{};
auto result =
utils::db::sqlite::db_select{*db.get(), upload_active_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (result.get_row(row) && row.has_value()) {
active_items.emplace_back(active_item{
row->get_column("api_path").get_value<std::string>(),
row->get_column("source_path").get_value<std::string>(),
});
}
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
for (auto &&active_item : active_items) {
queue_upload(active_item.api_path, active_item.source_path, false,
db.get());
}
active_items.clear();
result = utils::db::sqlite::db_select{*db.get(), resume_table}.go();
if (not result.ok()) {
return;
}
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not(result.get_row(row) && row.has_value())) {
return;
}
auto resume_entry = row.value().get_column("data").get_value_as_json();
struct active_item final {
std::string api_path;
std::string source_path;
};
std::size_t chunk_size{};
boost::dynamic_bitset<> read_state;
restore_resume_entry(resume_entry, api_path, chunk_size, read_state,
source_path);
std::vector<active_item> active_items{};
auto result =
utils::db::sqlite::db_select{*db_.get(), upload_active_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (result.get_row(row) && row.has_value()) {
active_items.emplace_back(active_item{
row->get_column("api_path").get_value<std::string>(),
row->get_column("source_path").get_value<std::string>(),
});
}
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
std::abort();
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res != api_error::success) {
event_system::instance().raise<download_restore_failed>(
api_path, source_path,
"failed to get filesystem item|" + api_error_to_string(res));
continue;
}
}
for (auto &&active_item : active_items) {
queue_upload(active_item.api_path, active_item.source_path, false);
}
active_items.clear();
result = utils::db::sqlite::db_select{*db_.get(), resume_table}.go();
if (not result.ok()) {
return;
}
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not(result.get_row(row) && row.has_value())) {
return;
}
auto resume_entry = row.value().get_column("data").get_value_as_json();
std::string api_path;
std::string source_path;
std::size_t chunk_size{};
boost::dynamic_bitset<> read_state;
restore_resume_entry(resume_entry, api_path, chunk_size, read_state,
source_path);
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res != api_error::success) {
event_system::instance().raise<download_restore_failed>(
api_path, source_path,
"failed to get filesystem item|" + api_error_to_string(res));
continue;
}
if (source_path != fsi.source_path) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"source path mismatch|expected|" + source_path + "|actual|" +
fsi.source_path);
continue;
}
auto opt_size = utils::file::file{fsi.source_path}.size();
if (not opt_size.has_value()) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"failed to get file size: " +
std::to_string(utils::get_last_error_code()));
continue;
}
auto file_size{opt_size.value()};
if (file_size != fsi.size) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"file size mismatch|expected|" + std::to_string(fsi.size) +
"|actual|" + std::to_string(file_size));
continue;
}
auto closeable_file = std::make_shared<open_file>(
chunk_size,
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, read_state, *this);
open_file_lookup_[api_path] = closeable_file;
event_system::instance().raise<download_restored>(fsi.api_path,
fsi.source_path);
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
if (source_path != fsi.source_path) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"source path mismatch|expected|" + source_path + "|actual|" +
fsi.source_path);
continue;
}
auto opt_size = utils::file::file{fsi.source_path}.size();
if (not opt_size.has_value()) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"failed to get file size: " +
std::to_string(utils::get_last_error_code()));
continue;
}
auto file_size{opt_size.value()};
if (file_size != fsi.size) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"file size mismatch|expected|" + std::to_string(fsi.size) +
"|actual|" + std::to_string(file_size));
continue;
}
auto closeable_file = std::make_shared<open_file>(
chunk_size,
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, read_state, *this);
open_file_lookup_[api_path] = closeable_file;
event_system::instance().raise<download_restored>(fsi.api_path,
fsi.source_path);
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
@ -909,40 +951,43 @@ void file_manager::start() {
}
void file_manager::stop() {
if (not stop_requested_) {
event_system::instance().raise<service_shutdown_begin>("file_manager");
polling::instance().remove_callback("timed_out_close");
stop_requested_ = true;
unique_mutex_lock upload_lock(upload_mtx_);
upload_notify_.notify_all();
upload_lock.unlock();
if (upload_thread_) {
upload_thread_->join();
upload_thread_.reset();
}
open_file_lookup_.clear();
upload_lock.lock();
for (auto &&item : upload_lookup_) {
item.second->stop();
}
upload_notify_.notify_all();
upload_lock.unlock();
while (not upload_lookup_.empty()) {
upload_lock.lock();
if (not upload_lookup_.empty()) {
upload_notify_.wait_for(upload_lock, 1ms);
}
upload_notify_.notify_all();
upload_lock.unlock();
}
event_system::instance().raise<service_shutdown_end>("file_manager");
if (stop_requested_) {
return;
}
event_system::instance().raise<service_shutdown_begin>("file_manager");
polling::instance().remove_callback("timed_out_close");
stop_requested_ = true;
unique_mutex_lock upload_lock(upload_mtx_);
upload_notify_.notify_all();
upload_lock.unlock();
if (upload_thread_) {
upload_thread_->join();
}
open_file_lookup_.clear();
upload_lock.lock();
for (auto &&item : upload_lookup_) {
item.second->stop();
}
upload_notify_.notify_all();
upload_lock.unlock();
while (not upload_lookup_.empty()) {
upload_lock.lock();
if (not upload_lookup_.empty()) {
upload_notify_.wait_for(upload_lock, 1ms);
}
upload_notify_.notify_all();
upload_lock.unlock();
}
upload_thread_.reset();
event_system::instance().raise<service_shutdown_end>("file_manager");
}
void file_manager::store_resume(const i_open_file &file) {
@ -950,7 +995,8 @@ void file_manager::store_resume(const i_open_file &file) {
return;
}
auto result = utils::db::sqlite::db_insert{*db_.get(), resume_table}
auto db = create_db();
auto result = utils::db::sqlite::db_insert{*db.get(), resume_table}
.or_replace()
.column_value("api_path", file.get_api_path())
.column_value("data", create_resume_entry(file).dump())
@ -968,7 +1014,8 @@ void file_manager::store_resume(const i_open_file &file) {
}
void file_manager::swap_renamed_items(std::string from_api_path,
std::string to_api_path, bool directory) {
std::string to_api_path, bool directory,
sqlite3 *db) {
REPERTORY_USES_FUNCTION_NAME();
auto file_iter = open_file_lookup_.find(from_api_path);
@ -983,7 +1030,7 @@ void file_manager::swap_renamed_items(std::string from_api_path,
return;
}
auto result = utils::db::sqlite::db_update{*db_.get(), resume_table}
auto result = utils::db::sqlite::db_update{*db, resume_table}
.column_value("api_path", to_api_path)
.where("api_path")
.equals(from_api_path)
@ -1001,13 +1048,14 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
unique_mutex_lock upload_lock(upload_mtx_);
if (not utils::string::to_bool(evt.get_cancelled().get<std::string>())) {
auto db = create_db();
auto err = api_error_from_string(evt.get_result().get<std::string>());
if (err == api_error::success) {
auto result =
utils::db::sqlite::db_delete{*db_.get(), upload_active_table}
.where("api_path")
.equals(evt.get_api_path().get<std::string>())
.go();
auto result = utils::db::sqlite::db_delete{*db.get(), upload_active_table}
.where("api_path")
.equals(evt.get_api_path().get<std::string>())
.go();
if (not result.ok()) {
utils::error::raise_api_path_error(
function_name, evt.get_api_path().get<std::string>(),
@ -1023,12 +1071,12 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
not utils::file::file(evt.get_source().get<std::string>()).exists()) {
event_system::instance().raise<file_upload_not_found>(
evt.get_api_path(), evt.get_source());
remove_upload(evt.get_api_path(), true);
remove_upload(evt.get_api_path(), true, db.get());
} else {
event_system::instance().raise<file_upload_retry>(
evt.get_api_path(), evt.get_source(), err);
queue_upload(evt.get_api_path(), evt.get_source(), true);
queue_upload(evt.get_api_path(), evt.get_source(), true, db.get());
upload_notify_.wait_for(upload_lock, 5s);
}
}
@ -1040,6 +1088,8 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
void file_manager::upload_handler() {
REPERTORY_USES_FUNCTION_NAME();
auto db = create_db();
while (not stop_requested_) {
auto should_wait{true};
unique_mutex_lock upload_lock(upload_mtx_);
@ -1049,7 +1099,7 @@ void file_manager::upload_handler() {
}
if (upload_lookup_.size() < config_.get_max_upload_count()) {
auto result = utils::db::sqlite::db_select{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_select{*db.get(), upload_table}
.order_by("api_path", true)
.limit(1)
.go();
@ -1067,7 +1117,7 @@ void file_manager::upload_handler() {
should_wait = false;
event_system::instance().raise<file_upload_not_found>(api_path,
source_path);
remove_upload(api_path, true);
remove_upload(api_path, true, db.get());
} break;
case api_error::success: {
@ -1075,14 +1125,13 @@ void file_manager::upload_handler() {
upload_lookup_[fsi.api_path] =
std::make_unique<upload>(fsi, provider_);
auto del_res =
utils::db::sqlite::db_delete{*db_.get(), upload_table}
.where("api_path")
.equals(api_path)
.go();
auto del_res = utils::db::sqlite::db_delete{*db.get(), upload_table}
.where("api_path")
.equals(api_path)
.go();
if (del_res.ok()) {
auto ins_res =
utils::db::sqlite::db_insert{*db_.get(), upload_active_table}
utils::db::sqlite::db_insert{*db.get(), upload_active_table}
.column_value("api_path", api_path)
.column_value("source_path", source_path)
.go();
@ -1097,7 +1146,7 @@ void file_manager::upload_handler() {
default: {
event_system::instance().raise<file_upload_retry>(api_path,
source_path, res);
queue_upload(api_path, source_path, true);
queue_upload(api_path, source_path, true, db.get());
} break;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -28,17 +28,13 @@
namespace repertory::utils::db::sqlite {
using db_types_t = std::variant<std::int64_t, std::string>;
struct sqlite3_deleter {
void operator()(sqlite3 *db3) const {
if (db3 != nullptr) {
sqlite3_close_v2(db3);
}
}
struct sqlite3_deleter final {
void operator()(sqlite3 *db3) const;
};
using db3_t = std::unique_ptr<sqlite3, sqlite3_deleter>;
struct sqlite3_statement_deleter {
struct sqlite3_statement_deleter final {
void operator()(sqlite3_stmt *stmt) const {
if (stmt != nullptr) {
sqlite3_finalize(stmt);
@ -104,16 +100,7 @@ public:
}
#if defined(PROJECT_ENABLE_JSON)
[[nodiscard]] auto get_value_as_json() const -> nlohmann::json {
return std::visit(
overloaded{
[this](std::int64_t value) -> auto {
return nlohmann::json({{name_, value}});
},
[](auto &&value) -> auto { return nlohmann::json::parse(value); },
},
value_);
}
[[nodiscard]] auto get_value_as_json() const -> nlohmann::json;
#endif // defined(PROJECT_ENABLE_JSON)
};
@ -211,7 +198,8 @@ public:
[[nodiscard]] auto get_error() const -> std::int32_t { return res_; }
[[nodiscard]] auto get_error_str() const -> std::string {
return sqlite3_errstr(res_);
auto &&err_msg = sqlite3_errstr(res_);
return err_msg == nullptr ? std::to_string(res_) : err_msg;
}
[[nodiscard]] auto get_row(std::optional<db_row<ctx_t>> &row) const -> bool {

View File

@ -72,8 +72,7 @@ extern std::atomic<const i_exception_handler *> exception_handler;
}
#endif // defined(PROJECT_ENABLE_TESTING)
[[nodiscard]] auto create_error_message(std::string_view function_name,
std::vector<std::string_view> items)
[[nodiscard]] auto create_error_message(std::vector<std::string_view> items)
-> std::string;
void handle_error(std::string_view function_name, std::string_view msg);

View File

@ -19,13 +19,63 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_common.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_common.hpp"
#include "utils/common.hpp"
#include "utils/error.hpp"
namespace repertory::utils::db::sqlite {
auto execute_sql(sqlite3 &db3, const std::string &sql,
std::string &err) -> bool {
void sqlite3_deleter::operator()(sqlite3 *db3) const {
REPERTORY_USES_FUNCTION_NAME();
if (db3 == nullptr) {
return;
}
utils::error::handle_error(function_name, "closing database handle");
sqlite3_db_cacheflush(db3);
std::string err_msg;
execute_sql(*db3, "PRAGMA wal_checkpoint(full);", err_msg);
if (not utils::retry_action(
[&db3]() -> bool {
auto res = sqlite3_close_v2(db3);
if (res == SQLITE_OK) {
return true;
}
auto &&err_str = sqlite3_errstr(res);
utils::error::handle_error(
function_name,
utils::error::create_error_message({
"failed to close database",
(err_str == nullptr ? std::to_string(res) : err_str),
}));
return false;
},
60U)) {
utils::error::handle_error(function_name, "failed to close database");
}
}
#if defined(PROJECT_ENABLE_JSON)
auto db_column::get_value_as_json() const -> nlohmann::json {
return std::visit(
overloaded{
[this](std::int64_t value) -> auto {
return nlohmann::json({{name_, value}});
},
[](auto &&value) -> auto { return nlohmann::json::parse(value); },
},
value_);
}
#endif // defined(PROJECT_ENABLE_JSON)
auto execute_sql(sqlite3 &db3, const std::string &sql, std::string &err)
-> bool {
char *err_msg{nullptr};
auto res = sqlite3_exec(&db3, sql.c_str(), nullptr, nullptr, &err_msg);
if (err_msg != nullptr) {

View File

@ -24,7 +24,10 @@
#if defined(PROJECT_ENABLE_SQLITE)
namespace repertory::utils::db::sqlite {
void db_delete::context::clear() { where_data.reset(); }
void db_delete::context::clear() {
// stmt.reset();
where_data.reset();
}
auto db_delete::context::db_delete_op_t::dump() const -> std::string {
return db_delete{ctx}.dump();

View File

@ -24,10 +24,13 @@
#if defined(PROJECT_ENABLE_SQLITE)
namespace repertory::utils::db::sqlite {
void db_insert::context::clear() { values.clear(); }
void db_insert::context::clear() {
// stmt.reset();
values.clear();
}
auto db_insert::column_value(std::string column_name,
db_types_t value) -> db_insert & {
auto db_insert::column_value(std::string column_name, db_types_t value)
-> db_insert & {
ctx_->values[column_name] = value;
return *this;
}

View File

@ -31,6 +31,7 @@ void db_select::context::clear() {
limit.reset();
offset.reset();
order_by.reset();
// stmt.reset();
where_data.reset();
}
@ -72,8 +73,8 @@ auto db_select::column(std::string column_name) -> db_select & {
return *this;
}
auto db_select::count(std::string column_name,
std::string as_column_name) -> db_select & {
auto db_select::count(std::string column_name, std::string as_column_name)
-> db_select & {
ctx_->count_columns[column_name] = as_column_name;
return *this;
}
@ -209,8 +210,8 @@ auto db_select::offset(std::int32_t value) -> db_select & {
return *this;
}
auto db_select::order_by(std::string column_name,
bool ascending) -> db_select & {
auto db_select::order_by(std::string column_name, bool ascending)
-> db_select & {
ctx_->order_by = {column_name, ascending};
return *this;
}

View File

@ -28,6 +28,7 @@ void db_update::context::clear() {
column_values.clear();
limit.reset();
order_by.reset();
// stmt.reset();
where_data.reset();
}
@ -52,8 +53,8 @@ auto db_update::context::db_update_op_t::order_by(std::string column_name,
return *this;
}
auto db_update::column_value(std::string column_name,
db_types_t value) -> db_update & {
auto db_update::column_value(std::string column_name, db_types_t value)
-> db_update & {
ctx_->column_values[column_name] = value;
return *this;
}
@ -173,8 +174,8 @@ auto db_update::limit(std::int32_t value) -> db_update & {
return *this;
}
auto db_update::order_by(std::string column_name,
bool ascending) -> db_update & {
auto db_update::order_by(std::string column_name, bool ascending)
-> db_update & {
ctx_->order_by = {column_name, ascending};
return *this;
}

View File

@ -25,10 +25,8 @@ namespace repertory::utils::error {
std::atomic<const i_exception_handler *> exception_handler{
&default_exception_handler};
auto create_error_message(std::string_view function_name,
std::vector<std::string_view> items) -> std::string {
auto create_error_message(std::vector<std::string_view> items) -> std::string {
std::stringstream stream{};
stream << function_name;
for (auto &&item : items) {
stream << '|' << item;
}

View File

@ -486,7 +486,7 @@ auto file::write(const unsigned char *data, std::size_t to_write,
break;
}
bytes_written += res;
bytes_written += written;
}
flush();