From 5187f323461fc6582b4c37ae9d7429a7cebf7540 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Wed, 11 Dec 2024 13:31:16 -0600 Subject: [PATCH] RocksDB implementations should be transactional #24 --- .../librepertory/include/db/i_file_mgr_db.hpp | 6 +- .../include/db/impl/rdb_file_mgr_db.hpp | 19 ++- .../include/db/impl/rdb_meta_db.hpp | 2 + .../include/db/impl/sqlite_file_mgr_db.hpp | 6 +- .../src/db/impl/rdb_file_mgr_db.cpp | 148 +++++++++++++----- .../src/db/impl/sqlite_file_mgr_db.cpp | 7 +- 6 files changed, 134 insertions(+), 54 deletions(-) diff --git a/repertory/librepertory/include/db/i_file_mgr_db.hpp b/repertory/librepertory/include/db/i_file_mgr_db.hpp index 1e0e1354..0829261a 100644 --- a/repertory/librepertory/include/db/i_file_mgr_db.hpp +++ b/repertory/librepertory/include/db/i_file_mgr_db.hpp @@ -47,11 +47,11 @@ public: }; public: - [[nodiscard]] virtual auto add_resume(resume_entry entry) -> bool = 0; + [[nodiscard]] virtual auto add_resume(const resume_entry &entry) -> bool = 0; - [[nodiscard]] virtual auto add_upload(upload_entry entry) -> bool = 0; + [[nodiscard]] virtual auto add_upload(const upload_entry &entry) -> bool = 0; - [[nodiscard]] virtual auto add_upload_active(upload_active_entry entry) + [[nodiscard]] virtual auto add_upload_active(const upload_active_entry &entry) -> bool = 0; virtual void clear() = 0; diff --git a/repertory/librepertory/include/db/impl/rdb_file_mgr_db.hpp b/repertory/librepertory/include/db/impl/rdb_file_mgr_db.hpp index 08a2c9f0..56647321 100644 --- a/repertory/librepertory/include/db/impl/rdb_file_mgr_db.hpp +++ b/repertory/librepertory/include/db/impl/rdb_file_mgr_db.hpp @@ -41,7 +41,7 @@ private: const app_config &cfg_; private: - std::unique_ptr db_; + std::unique_ptr db_{nullptr}; std::atomic id_{0U}; rocksdb::ColumnFamilyHandle *resume_family_{}; rocksdb::ColumnFamilyHandle *upload_active_family_{}; @@ -57,12 +57,23 @@ private: perform_action(std::string_view function_name, std::function action) -> bool; + [[nodiscard]] auto perform_action( + std::string_view function_name, + std::function action) -> bool; + + [[nodiscard]] auto remove_resume(const std::string &api_path, + rocksdb::Transaction *txn) + -> rocksdb::Status; + + [[nodiscard]] auto add_resume(const resume_entry &entry, + rocksdb::Transaction *txn) -> rocksdb::Status; + public: - [[nodiscard]] auto add_resume(resume_entry entry) -> bool override; + [[nodiscard]] auto add_resume(const resume_entry &entry) -> bool override; - [[nodiscard]] auto add_upload(upload_entry entry) -> bool override; + [[nodiscard]] auto add_upload(const upload_entry &entry) -> bool override; - [[nodiscard]] auto add_upload_active(upload_active_entry entry) + [[nodiscard]] auto add_upload_active(const upload_active_entry &entry) -> bool override; void clear() override; diff --git a/repertory/librepertory/include/db/impl/rdb_meta_db.hpp b/repertory/librepertory/include/db/impl/rdb_meta_db.hpp index a4433167..9b5788b7 100644 --- a/repertory/librepertory/include/db/impl/rdb_meta_db.hpp +++ b/repertory/librepertory/include/db/impl/rdb_meta_db.hpp @@ -40,6 +40,8 @@ public: private: const app_config &cfg_; + +private: std::unique_ptr db_{nullptr}; rocksdb::ColumnFamilyHandle *default_family_{}; rocksdb::ColumnFamilyHandle *pinned_family_{}; diff --git a/repertory/librepertory/include/db/impl/sqlite_file_mgr_db.hpp b/repertory/librepertory/include/db/impl/sqlite_file_mgr_db.hpp index 223045d3..bf7a8ed2 100644 --- a/repertory/librepertory/include/db/impl/sqlite_file_mgr_db.hpp +++ b/repertory/librepertory/include/db/impl/sqlite_file_mgr_db.hpp @@ -42,11 +42,11 @@ private: utils::db::sqlite::db3_t db_; public: - [[nodiscard]] auto add_resume(resume_entry entry) -> bool override; + [[nodiscard]] auto add_resume(const resume_entry &entry) -> bool override; - [[nodiscard]] auto add_upload(upload_entry entry) -> bool override; + [[nodiscard]] auto add_upload(const upload_entry &entry) -> bool override; - [[nodiscard]] auto add_upload_active(upload_active_entry entry) + [[nodiscard]] auto add_upload_active(const upload_active_entry &entry) -> bool override; void clear() override; diff --git a/repertory/librepertory/src/db/impl/rdb_file_mgr_db.cpp b/repertory/librepertory/src/db/impl/rdb_file_mgr_db.cpp index 0892a50d..585f5095 100644 --- a/repertory/librepertory/src/db/impl/rdb_file_mgr_db.cpp +++ b/repertory/librepertory/src/db/impl/rdb_file_mgr_db.cpp @@ -34,14 +34,14 @@ namespace { create_rocksdb(const repertory::app_config &cfg, const std::string &name, const std::vector &families, std::vector &handles, bool clear) - -> std::unique_ptr { + -> std::unique_ptr { REPERTORY_USES_FUNCTION_NAME(); auto path = repertory::utils::path::combine(cfg.get_data_directory(), {name}); if (clear && not repertory::utils::file::directory{path}.remove_recursively()) { - repertory::utils::error::raise_error( - function_name, "failed to remove file mgr db|" + path); + repertory::utils::error::raise_error(function_name, + "failed to remove meta db|" + path); } rocksdb::Options options{}; @@ -50,14 +50,17 @@ create_rocksdb(const repertory::app_config &cfg, const std::string &name, options.db_log_dir = cfg.get_log_directory(); options.keep_log_file_num = 10; - rocksdb::DB *ptr{}; - auto status = rocksdb::DB::Open(options, path, families, &handles, &ptr); + rocksdb::TransactionDBOptions tx_options{}; + + rocksdb::TransactionDB *ptr{}; + auto status = rocksdb::TransactionDB::Open(options, tx_options, path, + families, &handles, &ptr); if (not status.ok()) { repertory::utils::error::raise_error(function_name, status.ToString()); throw repertory::startup_exception(status.ToString()); } - return std::unique_ptr(ptr); + return std::unique_ptr(ptr); } } // namespace @@ -86,38 +89,51 @@ void rdb_file_mgr_db::create_or_open(bool clear) { upload_family_ = handles[idx++]; } -auto rdb_file_mgr_db::add_resume(resume_entry entry) -> bool { +auto rdb_file_mgr_db::add_resume(const resume_entry &entry) -> bool { REPERTORY_USES_FUNCTION_NAME(); - return perform_action(function_name, [this, &entry]() -> rocksdb::Status { - auto data = json({ - {"chunk_size", entry.chunk_size}, - {"read_state", utils::string::from_dynamic_bitset(entry.read_state)}, - {"source_path", entry.source_path}, - }); - return db_->Put(rocksdb::WriteOptions{}, resume_family_, entry.api_path, - data.dump()); - }); + return perform_action( + function_name, + [this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status { + return add_resume(entry, txn); + }); } -auto rdb_file_mgr_db::add_upload(upload_entry entry) -> bool { +auto rdb_file_mgr_db::add_resume(const resume_entry &entry, + rocksdb::Transaction *txn) -> rocksdb::Status { REPERTORY_USES_FUNCTION_NAME(); - return perform_action(function_name, [this, &entry]() -> rocksdb::Status { - return db_->Put(rocksdb::WriteOptions{}, upload_family_, - utils::string::zero_pad(std::to_string(++id_), 20U) + '|' + - entry.api_path, - entry.source_path); + auto data = json({ + {"chunk_size", entry.chunk_size}, + {"read_state", utils::string::from_dynamic_bitset(entry.read_state)}, + {"source_path", entry.source_path}, }); + return txn->Put(resume_family_, entry.api_path, data.dump()); } -auto rdb_file_mgr_db::add_upload_active(upload_active_entry entry) -> bool { +auto rdb_file_mgr_db::add_upload(const upload_entry &entry) -> bool { REPERTORY_USES_FUNCTION_NAME(); - return perform_action(function_name, [this, &entry]() -> rocksdb::Status { - return db_->Put(rocksdb::WriteOptions{}, upload_active_family_, - entry.api_path, entry.source_path); - }); + return perform_action( + function_name, + [this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status { + return txn->Put(upload_family_, + utils::string::zero_pad(std::to_string(++id_), 20U) + + '|' + entry.api_path, + entry.source_path); + }); +} + +auto rdb_file_mgr_db::add_upload_active(const upload_active_entry &entry) + -> bool { + REPERTORY_USES_FUNCTION_NAME(); + + return perform_action( + function_name, + [this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status { + return txn->Put(upload_active_family_, entry.api_path, + entry.source_path); + }); } void rdb_file_mgr_db::clear() { create_or_open(true); } @@ -215,12 +231,54 @@ auto rdb_file_mgr_db::perform_action(std::string_view function_name, return false; } +auto rdb_file_mgr_db::perform_action( + std::string_view function_name, + std::function action) -> bool { + std::unique_ptr txn{ + db_->BeginTransaction(rocksdb::WriteOptions{}, + rocksdb::TransactionOptions{}), + }; + + try { + auto res = action(txn.get()); + if (res.ok()) { + auto commit_res = txn->Commit(); + if (commit_res.ok()) { + return true; + } + + utils::error::raise_error(function_name, + "rocksdb commit failed|" + res.ToString()); + return false; + } + + utils::error::raise_error(function_name, + "rocksdb action failed|" + res.ToString()); + } catch (const std::exception &ex) { + utils::error::raise_error(function_name, ex, + "failed to handle rocksdb action"); + } + + auto rollback_res = txn->Rollback(); + utils::error::raise_error(function_name, "rocksdb rollback failed|" + + rollback_res.ToString()); + return false; +} + auto rdb_file_mgr_db::remove_resume(const std::string &api_path) -> bool { REPERTORY_USES_FUNCTION_NAME(); - return perform_action(function_name, [this, &api_path]() -> rocksdb::Status { - return db_->Delete(rocksdb::WriteOptions{}, resume_family_, api_path); - }); + return perform_action( + function_name, + [this, &api_path](rocksdb::Transaction *txn) -> rocksdb::Status { + return remove_resume(api_path, txn); + }); +} + +auto rdb_file_mgr_db::remove_resume(const std::string &api_path, + rocksdb::Transaction *txn) + -> rocksdb::Status { + return txn->Delete(resume_family_, api_path); } auto rdb_file_mgr_db::remove_upload(const std::string &api_path) -> bool { @@ -235,9 +293,11 @@ auto rdb_file_mgr_db::remove_upload(const std::string &api_path) -> bool { continue; } - return perform_action(function_name, [this, &iter]() -> rocksdb::Status { - return db_->Delete(rocksdb::WriteOptions{}, upload_family_, iter->key()); - }); + return perform_action( + function_name, + [this, &iter](rocksdb::Transaction *txn) -> rocksdb::Status { + return txn->Delete(upload_family_, iter->key()); + }); } return true; @@ -247,10 +307,11 @@ auto rdb_file_mgr_db::remove_upload_active(const std::string &api_path) -> bool { REPERTORY_USES_FUNCTION_NAME(); - return perform_action(function_name, [this, &api_path]() -> rocksdb::Status { - return db_->Delete(rocksdb::WriteOptions{}, upload_active_family_, - api_path); - }); + return perform_action( + function_name, + [this, &api_path](rocksdb::Transaction *txn) -> rocksdb::Status { + return txn->Delete(upload_active_family_, api_path); + }); } auto rdb_file_mgr_db::rename_resume(const std::string &from_api_path, @@ -279,10 +340,15 @@ auto rdb_file_mgr_db::rename_resume(const std::string &from_api_path, data.at("source_path").get(), }; - if (not remove_resume(from_api_path)) { - return false; - } + return perform_action(function_name, + [this, &entry, &from_api_path]( + rocksdb::Transaction *txn) -> rocksdb::Status { + auto res = remove_resume(from_api_path, txn); + if (not res.ok()) { + return res; + } - return add_resume(entry); + return add_resume(entry, txn); + }); } } // namespace repertory diff --git a/repertory/librepertory/src/db/impl/sqlite_file_mgr_db.cpp b/repertory/librepertory/src/db/impl/sqlite_file_mgr_db.cpp index 1e193339..8d25321c 100644 --- a/repertory/librepertory/src/db/impl/sqlite_file_mgr_db.cpp +++ b/repertory/librepertory/src/db/impl/sqlite_file_mgr_db.cpp @@ -82,7 +82,7 @@ sqlite_file_mgr_db::sqlite_file_mgr_db(const app_config &cfg) { sqlite_file_mgr_db::~sqlite_file_mgr_db() { db_.reset(); } -auto sqlite_file_mgr_db::add_resume(resume_entry entry) -> bool { +auto sqlite_file_mgr_db::add_resume(const resume_entry &entry) -> bool { return utils::db::sqlite::db_insert{*db_, resume_table} .or_replace() .column_value("api_path", entry.api_path) @@ -94,7 +94,7 @@ auto sqlite_file_mgr_db::add_resume(resume_entry entry) -> bool { .ok(); } -auto sqlite_file_mgr_db::add_upload(upload_entry entry) -> bool { +auto sqlite_file_mgr_db::add_upload(const upload_entry &entry) -> bool { return utils::db::sqlite::db_insert{*db_, upload_table} .or_replace() .column_value("api_path", entry.api_path) @@ -103,7 +103,8 @@ auto sqlite_file_mgr_db::add_upload(upload_entry entry) -> bool { .ok(); } -auto sqlite_file_mgr_db::add_upload_active(upload_active_entry entry) -> bool { +auto sqlite_file_mgr_db::add_upload_active(const upload_active_entry &entry) + -> bool { return utils::db::sqlite::db_insert{*db_, upload_active_table} .or_replace() .column_value("api_path", entry.api_path)