RocksDB implementations should be transactional #24

This commit is contained in:
Scott E. Graves 2024-12-11 11:59:14 -06:00
parent 7d5d52afe3
commit 48ed06a255
2 changed files with 111 additions and 48 deletions

View File

@ -64,8 +64,16 @@ private:
std::function<rocksdb::Status(rocksdb::Transaction *txn)> action)
-> api_error;
[[nodiscard]] auto remove_api_path(const std::string &api_path,
const std::string &source_path,
rocksdb::Transaction *txn)
-> rocksdb::Status;
[[nodiscard]] auto update_item_meta(const std::string &api_path,
json json_data) -> api_error;
json json_data,
rocksdb::Transaction *base_txn = nullptr,
rocksdb::Status *status = nullptr)
-> api_error;
public:
void clear() override;

View File

@ -322,24 +322,7 @@ void rdb_meta_db::remove_api_path(const std::string &api_path) {
res = perform_action(function_name,
[this, &api_path, &source_path](
rocksdb::Transaction *txn) -> rocksdb::Status {
auto txn_res = txn->Delete(pinned_family_, api_path);
if (not txn_res.ok()) {
return txn_res;
}
txn_res = txn->Delete(size_family_, api_path);
if (not txn_res.ok()) {
return txn_res;
}
if (not source_path.empty()) {
txn_res = txn->Delete(source_family_, source_path);
if (not txn_res.ok()) {
return txn_res;
}
}
return txn->Delete(default_family_, api_path);
return remove_api_path(api_path, source_path, txn);
});
if (res != api_error::success) {
utils::error::raise_api_path_error(function_name, api_path, res,
@ -347,6 +330,30 @@ void rdb_meta_db::remove_api_path(const std::string &api_path) {
}
}
auto rdb_meta_db::remove_api_path(const std::string &api_path,
const std::string &source_path,
rocksdb::Transaction *txn)
-> rocksdb::Status {
auto txn_res = txn->Delete(pinned_family_, api_path);
if (not txn_res.ok()) {
return txn_res;
}
txn_res = txn->Delete(size_family_, api_path);
if (not txn_res.ok()) {
return txn_res;
}
if (not source_path.empty()) {
txn_res = txn->Delete(source_family_, source_path);
if (not txn_res.ok()) {
return txn_res;
}
}
return txn->Delete(default_family_, api_path);
}
auto rdb_meta_db::remove_item_meta(const std::string &api_path,
const std::string &key) -> api_error {
if (key == META_DIRECTORY || key == META_PINNED || key == META_SIZE ||
@ -368,14 +375,27 @@ auto rdb_meta_db::remove_item_meta(const std::string &api_path,
auto rdb_meta_db::rename_item_meta(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
REPERTORY_USES_FUNCTION_NAME();
json json_data;
auto res = get_item_meta_json(from_api_path, json_data);
if (res != api_error::success) {
return res;
}
remove_api_path(from_api_path);
return update_item_meta(to_api_path, json_data);
return perform_action(
function_name, [&](rocksdb::Transaction *txn) -> rocksdb::Status {
auto txn_res = remove_api_path(
from_api_path, json_data[META_SOURCE].get<std::string>(), txn);
if (not txn_res.ok()) {
return txn_res;
}
rocksdb::Status status;
[[maybe_unused]] auto api_res =
update_item_meta(to_api_path, json_data, txn, &status);
return status;
});
}
auto rdb_meta_db::set_item_meta(const std::string &api_path,
@ -423,8 +443,9 @@ auto rdb_meta_db::set_item_meta(const std::string &api_path,
return update_item_meta(api_path, json_data);
}
auto rdb_meta_db::update_item_meta(const std::string &api_path, json json_data)
-> api_error {
auto rdb_meta_db::update_item_meta(const std::string &api_path, json json_data,
rocksdb::Transaction *base_txn,
rocksdb::Status *status) -> api_error {
REPERTORY_USES_FUNCTION_NAME();
try {
@ -469,35 +490,69 @@ auto rdb_meta_db::update_item_meta(const std::string &api_path, json json_data)
json_data.erase(META_PINNED);
json_data.erase(META_SIZE);
return perform_action(
function_name, [&](rocksdb::Transaction *txn) -> rocksdb::Status {
if (should_del_source) {
auto res = txn->Delete(source_family_, orig_source_path);
if (not res.ok()) {
return res;
}
}
const auto do_transaction =
[&](rocksdb::Transaction *txn) -> rocksdb::Status {
if (should_del_source) {
auto res = txn->Delete(source_family_, orig_source_path);
if (status != nullptr) {
*status = res;
}
auto res = txn->Put(pinned_family_, api_path,
utils::string::from_bool(pinned));
if (not res.ok()) {
return res;
}
if (not res.ok()) {
return res;
}
}
res = txn->Put(size_family_, api_path, std::to_string(size));
if (not res.ok()) {
return res;
}
auto res =
txn->Put(pinned_family_, api_path, utils::string::from_bool(pinned));
if (status != nullptr) {
*status = res;
}
if (not source_path.empty()) {
res = txn->Put(source_family_, source_path, api_path);
if (not res.ok()) {
return res;
}
}
if (not res.ok()) {
return res;
}
return txn->Put(default_family_, api_path, json_data.dump());
});
res = txn->Put(size_family_, api_path, std::to_string(size));
if (status != nullptr) {
*status = res;
}
if (not res.ok()) {
return res;
}
if (not source_path.empty()) {
res = txn->Put(source_family_, source_path, api_path);
if (status != nullptr) {
*status = res;
}
if (not res.ok()) {
return res;
}
}
res = txn->Put(default_family_, api_path, json_data.dump());
if (status != nullptr) {
*status = res;
}
return res;
};
if (base_txn == nullptr) {
return perform_action(function_name, do_transaction);
}
auto res = do_transaction(base_txn);
if (status != nullptr) {
*status = res;
}
if (res.ok()) {
return api_error::success;
}
} catch (const std::exception &e) {
utils::error::raise_api_path_error(function_name, api_path, e,
"failed to update item meta");