RocksDB implementations should be transactional #24
This commit is contained in:
parent
3a62c389ef
commit
5187f32346
@ -47,11 +47,11 @@ public:
|
||||
};
|
||||
|
||||
public:
|
||||
[[nodiscard]] virtual auto add_resume(resume_entry entry) -> bool = 0;
|
||||
[[nodiscard]] virtual auto add_resume(const resume_entry &entry) -> bool = 0;
|
||||
|
||||
[[nodiscard]] virtual auto add_upload(upload_entry entry) -> bool = 0;
|
||||
[[nodiscard]] virtual auto add_upload(const upload_entry &entry) -> bool = 0;
|
||||
|
||||
[[nodiscard]] virtual auto add_upload_active(upload_active_entry entry)
|
||||
[[nodiscard]] virtual auto add_upload_active(const upload_active_entry &entry)
|
||||
-> bool = 0;
|
||||
|
||||
virtual void clear() = 0;
|
||||
|
@ -41,7 +41,7 @@ private:
|
||||
const app_config &cfg_;
|
||||
|
||||
private:
|
||||
std::unique_ptr<rocksdb::DB> db_;
|
||||
std::unique_ptr<rocksdb::TransactionDB> db_{nullptr};
|
||||
std::atomic<std::uint64_t> id_{0U};
|
||||
rocksdb::ColumnFamilyHandle *resume_family_{};
|
||||
rocksdb::ColumnFamilyHandle *upload_active_family_{};
|
||||
@ -57,12 +57,23 @@ private:
|
||||
perform_action(std::string_view function_name,
|
||||
std::function<rocksdb::Status()> action) -> bool;
|
||||
|
||||
[[nodiscard]] auto perform_action(
|
||||
std::string_view function_name,
|
||||
std::function<rocksdb::Status(rocksdb::Transaction *txn)> action) -> bool;
|
||||
|
||||
[[nodiscard]] auto remove_resume(const std::string &api_path,
|
||||
rocksdb::Transaction *txn)
|
||||
-> rocksdb::Status;
|
||||
|
||||
[[nodiscard]] auto add_resume(const resume_entry &entry,
|
||||
rocksdb::Transaction *txn) -> rocksdb::Status;
|
||||
|
||||
public:
|
||||
[[nodiscard]] auto add_resume(resume_entry entry) -> bool override;
|
||||
[[nodiscard]] auto add_resume(const resume_entry &entry) -> bool override;
|
||||
|
||||
[[nodiscard]] auto add_upload(upload_entry entry) -> bool override;
|
||||
[[nodiscard]] auto add_upload(const upload_entry &entry) -> bool override;
|
||||
|
||||
[[nodiscard]] auto add_upload_active(upload_active_entry entry)
|
||||
[[nodiscard]] auto add_upload_active(const upload_active_entry &entry)
|
||||
-> bool override;
|
||||
|
||||
void clear() override;
|
||||
|
@ -40,6 +40,8 @@ public:
|
||||
|
||||
private:
|
||||
const app_config &cfg_;
|
||||
|
||||
private:
|
||||
std::unique_ptr<rocksdb::TransactionDB> db_{nullptr};
|
||||
rocksdb::ColumnFamilyHandle *default_family_{};
|
||||
rocksdb::ColumnFamilyHandle *pinned_family_{};
|
||||
|
@ -42,11 +42,11 @@ private:
|
||||
utils::db::sqlite::db3_t db_;
|
||||
|
||||
public:
|
||||
[[nodiscard]] auto add_resume(resume_entry entry) -> bool override;
|
||||
[[nodiscard]] auto add_resume(const resume_entry &entry) -> bool override;
|
||||
|
||||
[[nodiscard]] auto add_upload(upload_entry entry) -> bool override;
|
||||
[[nodiscard]] auto add_upload(const upload_entry &entry) -> bool override;
|
||||
|
||||
[[nodiscard]] auto add_upload_active(upload_active_entry entry)
|
||||
[[nodiscard]] auto add_upload_active(const upload_active_entry &entry)
|
||||
-> bool override;
|
||||
|
||||
void clear() override;
|
||||
|
@ -34,14 +34,14 @@ namespace {
|
||||
create_rocksdb(const repertory::app_config &cfg, const std::string &name,
|
||||
const std::vector<rocksdb::ColumnFamilyDescriptor> &families,
|
||||
std::vector<rocksdb::ColumnFamilyHandle *> &handles, bool clear)
|
||||
-> std::unique_ptr<rocksdb::DB> {
|
||||
-> std::unique_ptr<rocksdb::TransactionDB> {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
auto path = repertory::utils::path::combine(cfg.get_data_directory(), {name});
|
||||
if (clear &&
|
||||
not repertory::utils::file::directory{path}.remove_recursively()) {
|
||||
repertory::utils::error::raise_error(
|
||||
function_name, "failed to remove file mgr db|" + path);
|
||||
repertory::utils::error::raise_error(function_name,
|
||||
"failed to remove meta db|" + path);
|
||||
}
|
||||
|
||||
rocksdb::Options options{};
|
||||
@ -50,14 +50,17 @@ create_rocksdb(const repertory::app_config &cfg, const std::string &name,
|
||||
options.db_log_dir = cfg.get_log_directory();
|
||||
options.keep_log_file_num = 10;
|
||||
|
||||
rocksdb::DB *ptr{};
|
||||
auto status = rocksdb::DB::Open(options, path, families, &handles, &ptr);
|
||||
rocksdb::TransactionDBOptions tx_options{};
|
||||
|
||||
rocksdb::TransactionDB *ptr{};
|
||||
auto status = rocksdb::TransactionDB::Open(options, tx_options, path,
|
||||
families, &handles, &ptr);
|
||||
if (not status.ok()) {
|
||||
repertory::utils::error::raise_error(function_name, status.ToString());
|
||||
throw repertory::startup_exception(status.ToString());
|
||||
}
|
||||
|
||||
return std::unique_ptr<rocksdb::DB>(ptr);
|
||||
return std::unique_ptr<rocksdb::TransactionDB>(ptr);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
@ -86,38 +89,51 @@ void rdb_file_mgr_db::create_or_open(bool clear) {
|
||||
upload_family_ = handles[idx++];
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::add_resume(resume_entry entry) -> bool {
|
||||
auto rdb_file_mgr_db::add_resume(const resume_entry &entry) -> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(function_name, [this, &entry]() -> rocksdb::Status {
|
||||
auto data = json({
|
||||
{"chunk_size", entry.chunk_size},
|
||||
{"read_state", utils::string::from_dynamic_bitset(entry.read_state)},
|
||||
{"source_path", entry.source_path},
|
||||
});
|
||||
return db_->Put(rocksdb::WriteOptions{}, resume_family_, entry.api_path,
|
||||
data.dump());
|
||||
});
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return add_resume(entry, txn);
|
||||
});
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::add_upload(upload_entry entry) -> bool {
|
||||
auto rdb_file_mgr_db::add_resume(const resume_entry &entry,
|
||||
rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(function_name, [this, &entry]() -> rocksdb::Status {
|
||||
return db_->Put(rocksdb::WriteOptions{}, upload_family_,
|
||||
utils::string::zero_pad(std::to_string(++id_), 20U) + '|' +
|
||||
entry.api_path,
|
||||
entry.source_path);
|
||||
auto data = json({
|
||||
{"chunk_size", entry.chunk_size},
|
||||
{"read_state", utils::string::from_dynamic_bitset(entry.read_state)},
|
||||
{"source_path", entry.source_path},
|
||||
});
|
||||
return txn->Put(resume_family_, entry.api_path, data.dump());
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::add_upload_active(upload_active_entry entry) -> bool {
|
||||
auto rdb_file_mgr_db::add_upload(const upload_entry &entry) -> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(function_name, [this, &entry]() -> rocksdb::Status {
|
||||
return db_->Put(rocksdb::WriteOptions{}, upload_active_family_,
|
||||
entry.api_path, entry.source_path);
|
||||
});
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Put(upload_family_,
|
||||
utils::string::zero_pad(std::to_string(++id_), 20U) +
|
||||
'|' + entry.api_path,
|
||||
entry.source_path);
|
||||
});
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::add_upload_active(const upload_active_entry &entry)
|
||||
-> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &entry](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Put(upload_active_family_, entry.api_path,
|
||||
entry.source_path);
|
||||
});
|
||||
}
|
||||
|
||||
void rdb_file_mgr_db::clear() { create_or_open(true); }
|
||||
@ -215,12 +231,54 @@ auto rdb_file_mgr_db::perform_action(std::string_view function_name,
|
||||
return false;
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::perform_action(
|
||||
std::string_view function_name,
|
||||
std::function<rocksdb::Status(rocksdb::Transaction *txn)> action) -> bool {
|
||||
std::unique_ptr<rocksdb::Transaction> txn{
|
||||
db_->BeginTransaction(rocksdb::WriteOptions{},
|
||||
rocksdb::TransactionOptions{}),
|
||||
};
|
||||
|
||||
try {
|
||||
auto res = action(txn.get());
|
||||
if (res.ok()) {
|
||||
auto commit_res = txn->Commit();
|
||||
if (commit_res.ok()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
utils::error::raise_error(function_name,
|
||||
"rocksdb commit failed|" + res.ToString());
|
||||
return false;
|
||||
}
|
||||
|
||||
utils::error::raise_error(function_name,
|
||||
"rocksdb action failed|" + res.ToString());
|
||||
} catch (const std::exception &ex) {
|
||||
utils::error::raise_error(function_name, ex,
|
||||
"failed to handle rocksdb action");
|
||||
}
|
||||
|
||||
auto rollback_res = txn->Rollback();
|
||||
utils::error::raise_error(function_name, "rocksdb rollback failed|" +
|
||||
rollback_res.ToString());
|
||||
return false;
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::remove_resume(const std::string &api_path) -> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(function_name, [this, &api_path]() -> rocksdb::Status {
|
||||
return db_->Delete(rocksdb::WriteOptions{}, resume_family_, api_path);
|
||||
});
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &api_path](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return remove_resume(api_path, txn);
|
||||
});
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::remove_resume(const std::string &api_path,
|
||||
rocksdb::Transaction *txn)
|
||||
-> rocksdb::Status {
|
||||
return txn->Delete(resume_family_, api_path);
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::remove_upload(const std::string &api_path) -> bool {
|
||||
@ -235,9 +293,11 @@ auto rdb_file_mgr_db::remove_upload(const std::string &api_path) -> bool {
|
||||
continue;
|
||||
}
|
||||
|
||||
return perform_action(function_name, [this, &iter]() -> rocksdb::Status {
|
||||
return db_->Delete(rocksdb::WriteOptions{}, upload_family_, iter->key());
|
||||
});
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &iter](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Delete(upload_family_, iter->key());
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
@ -247,10 +307,11 @@ auto rdb_file_mgr_db::remove_upload_active(const std::string &api_path)
|
||||
-> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
return perform_action(function_name, [this, &api_path]() -> rocksdb::Status {
|
||||
return db_->Delete(rocksdb::WriteOptions{}, upload_active_family_,
|
||||
api_path);
|
||||
});
|
||||
return perform_action(
|
||||
function_name,
|
||||
[this, &api_path](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Delete(upload_active_family_, api_path);
|
||||
});
|
||||
}
|
||||
|
||||
auto rdb_file_mgr_db::rename_resume(const std::string &from_api_path,
|
||||
@ -279,10 +340,15 @@ auto rdb_file_mgr_db::rename_resume(const std::string &from_api_path,
|
||||
data.at("source_path").get<std::string>(),
|
||||
};
|
||||
|
||||
if (not remove_resume(from_api_path)) {
|
||||
return false;
|
||||
}
|
||||
return perform_action(function_name,
|
||||
[this, &entry, &from_api_path](
|
||||
rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
auto res = remove_resume(from_api_path, txn);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return add_resume(entry);
|
||||
return add_resume(entry, txn);
|
||||
});
|
||||
}
|
||||
} // namespace repertory
|
||||
|
@ -82,7 +82,7 @@ sqlite_file_mgr_db::sqlite_file_mgr_db(const app_config &cfg) {
|
||||
|
||||
sqlite_file_mgr_db::~sqlite_file_mgr_db() { db_.reset(); }
|
||||
|
||||
auto sqlite_file_mgr_db::add_resume(resume_entry entry) -> bool {
|
||||
auto sqlite_file_mgr_db::add_resume(const resume_entry &entry) -> bool {
|
||||
return utils::db::sqlite::db_insert{*db_, resume_table}
|
||||
.or_replace()
|
||||
.column_value("api_path", entry.api_path)
|
||||
@ -94,7 +94,7 @@ auto sqlite_file_mgr_db::add_resume(resume_entry entry) -> bool {
|
||||
.ok();
|
||||
}
|
||||
|
||||
auto sqlite_file_mgr_db::add_upload(upload_entry entry) -> bool {
|
||||
auto sqlite_file_mgr_db::add_upload(const upload_entry &entry) -> bool {
|
||||
return utils::db::sqlite::db_insert{*db_, upload_table}
|
||||
.or_replace()
|
||||
.column_value("api_path", entry.api_path)
|
||||
@ -103,7 +103,8 @@ auto sqlite_file_mgr_db::add_upload(upload_entry entry) -> bool {
|
||||
.ok();
|
||||
}
|
||||
|
||||
auto sqlite_file_mgr_db::add_upload_active(upload_active_entry entry) -> bool {
|
||||
auto sqlite_file_mgr_db::add_upload_active(const upload_active_entry &entry)
|
||||
-> bool {
|
||||
return utils::db::sqlite::db_insert{*db_, upload_active_table}
|
||||
.or_replace()
|
||||
.column_value("api_path", entry.api_path)
|
||||
|
Loading…
x
Reference in New Issue
Block a user