RocksDB implementations should be transactional #24
This commit is contained in:
parent
f0c774de5a
commit
bb9892cc84
@ -40,7 +40,7 @@ public:
|
||||
|
||||
private:
|
||||
const app_config &cfg_;
|
||||
std::unique_ptr<rocksdb::DB> db_{nullptr};
|
||||
std::unique_ptr<rocksdb::TransactionDB> db_{nullptr};
|
||||
rocksdb::ColumnFamilyHandle *default_family_{};
|
||||
rocksdb::ColumnFamilyHandle *pinned_family_{};
|
||||
rocksdb::ColumnFamilyHandle *size_family_{};
|
||||
@ -59,6 +59,11 @@ private:
|
||||
perform_action(std::string_view function_name,
|
||||
std::function<rocksdb::Status()> action) -> api_error;
|
||||
|
||||
[[nodiscard]] auto perform_action(
|
||||
std::string_view function_name,
|
||||
std::function<rocksdb::Status(rocksdb::Transaction *txn)> action)
|
||||
-> api_error;
|
||||
|
||||
[[nodiscard]] auto update_item_meta(const std::string &api_path,
|
||||
json json_data) -> api_error;
|
||||
|
||||
|
@ -33,7 +33,7 @@ namespace {
|
||||
create_rocksdb(const repertory::app_config &cfg, const std::string &name,
|
||||
const std::vector<rocksdb::ColumnFamilyDescriptor> &families,
|
||||
std::vector<rocksdb::ColumnFamilyHandle *> &handles, bool clear)
|
||||
-> std::unique_ptr<rocksdb::DB> {
|
||||
-> std::unique_ptr<rocksdb::TransactionDB> {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
auto path = repertory::utils::path::combine(cfg.get_data_directory(), {name});
|
||||
@ -49,14 +49,17 @@ create_rocksdb(const repertory::app_config &cfg, const std::string &name,
|
||||
options.db_log_dir = cfg.get_log_directory();
|
||||
options.keep_log_file_num = 10;
|
||||
|
||||
rocksdb::DB *ptr{};
|
||||
auto status = rocksdb::DB::Open(options, path, families, &handles, &ptr);
|
||||
rocksdb::TransactionDBOptions tx_options{};
|
||||
|
||||
rocksdb::TransactionDB *ptr{};
|
||||
auto status = rocksdb::TransactionDB::Open(options, tx_options, path,
|
||||
families, &handles, &ptr);
|
||||
if (not status.ok()) {
|
||||
repertory::utils::error::raise_error(function_name, status.ToString());
|
||||
throw repertory::startup_exception(status.ToString());
|
||||
}
|
||||
|
||||
return std::unique_ptr<rocksdb::DB>(ptr);
|
||||
return std::unique_ptr<rocksdb::TransactionDB>(ptr);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
@ -92,7 +95,7 @@ void rdb_meta_db::clear() { create_or_open(true); }
|
||||
auto rdb_meta_db::create_iterator(rocksdb::ColumnFamilyHandle *family) const
|
||||
-> std::shared_ptr<rocksdb::Iterator> {
|
||||
return std::shared_ptr<rocksdb::Iterator>(
|
||||
db_->NewIterator(rocksdb::ReadOptions(), family));
|
||||
db_->NewIterator(rocksdb::ReadOptions{}, family));
|
||||
}
|
||||
|
||||
auto rdb_meta_db::get_api_path(const std::string &source_path,
|
||||
@ -104,7 +107,7 @@ auto rdb_meta_db::get_api_path(const std::string &source_path,
|
||||
}
|
||||
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), source_family_, source_path,
|
||||
return db_->Get(rocksdb::ReadOptions{}, source_family_, source_path,
|
||||
&api_path);
|
||||
});
|
||||
}
|
||||
@ -129,7 +132,7 @@ auto rdb_meta_db::get_item_meta_json(const std::string &api_path,
|
||||
{
|
||||
std::string value;
|
||||
auto res = perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), default_family_, api_path,
|
||||
return db_->Get(rocksdb::ReadOptions{}, default_family_, api_path,
|
||||
&value);
|
||||
});
|
||||
if (res != api_error::success) {
|
||||
@ -144,7 +147,7 @@ auto rdb_meta_db::get_item_meta_json(const std::string &api_path,
|
||||
{
|
||||
std::string value;
|
||||
auto res = perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), pinned_family_, api_path,
|
||||
return db_->Get(rocksdb::ReadOptions{}, pinned_family_, api_path,
|
||||
&value);
|
||||
});
|
||||
if (res != api_error::success) {
|
||||
@ -158,7 +161,7 @@ auto rdb_meta_db::get_item_meta_json(const std::string &api_path,
|
||||
{
|
||||
std::string value;
|
||||
auto res = perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), size_family_, api_path, &value);
|
||||
return db_->Get(rocksdb::ReadOptions{}, size_family_, api_path, &value);
|
||||
});
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
@ -199,13 +202,13 @@ auto rdb_meta_db::get_item_meta(const std::string &api_path,
|
||||
|
||||
if (key == META_PINNED) {
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), pinned_family_, api_path, &value);
|
||||
return db_->Get(rocksdb::ReadOptions{}, pinned_family_, api_path, &value);
|
||||
});
|
||||
}
|
||||
|
||||
if (key == META_SIZE) {
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Get(rocksdb::ReadOptions(), size_family_, api_path, &value);
|
||||
return db_->Get(rocksdb::ReadOptions{}, size_family_, api_path, &value);
|
||||
});
|
||||
}
|
||||
|
||||
@ -271,21 +274,77 @@ auto rdb_meta_db::perform_action(std::string_view function_name,
|
||||
return res.IsNotFound() ? api_error::item_not_found : api_error::error;
|
||||
}
|
||||
|
||||
auto rdb_meta_db::perform_action(
|
||||
std::string_view function_name,
|
||||
std::function<rocksdb::Status(rocksdb::Transaction *txn)> action)
|
||||
-> api_error {
|
||||
std::unique_ptr<rocksdb::Transaction> txn{
|
||||
db_->BeginTransaction(rocksdb::WriteOptions{},
|
||||
rocksdb::TransactionOptions{}),
|
||||
};
|
||||
|
||||
try {
|
||||
auto res = action(txn.get());
|
||||
if (res.ok()) {
|
||||
auto commit_res = txn->Commit();
|
||||
if (commit_res.ok()) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
utils::error::raise_error(function_name,
|
||||
"rocksdb commit failed|" + res.ToString());
|
||||
return api_error::error;
|
||||
}
|
||||
|
||||
utils::error::raise_error(function_name,
|
||||
"rocksdb action failed|" + res.ToString());
|
||||
} catch (const std::exception &ex) {
|
||||
utils::error::raise_error(function_name, ex,
|
||||
"failed to handle rocksdb action");
|
||||
}
|
||||
|
||||
auto rollback_res = txn->Rollback();
|
||||
utils::error::raise_error(function_name, "rocksdb rollback failed|" +
|
||||
rollback_res.ToString());
|
||||
return api_error::error;
|
||||
}
|
||||
|
||||
void rdb_meta_db::remove_api_path(const std::string &api_path) {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
std::string source_path;
|
||||
[[maybe_unused]] auto res = get_item_meta(api_path, META_SOURCE, source_path);
|
||||
auto res = get_item_meta(api_path, META_SOURCE, source_path);
|
||||
if (res != api_error::success) {
|
||||
utils::error::raise_api_path_error(function_name, api_path, res,
|
||||
"failed to get source path");
|
||||
}
|
||||
|
||||
res = perform_action(
|
||||
function_name, [this, &api_path, &source_path]() -> rocksdb::Status {
|
||||
db_->Delete(rocksdb::WriteOptions(), pinned_family_, api_path);
|
||||
db_->Delete(rocksdb::WriteOptions(), size_family_, api_path);
|
||||
if (not source_path.empty()) {
|
||||
db_->Delete(rocksdb::WriteOptions(), source_family_, source_path);
|
||||
}
|
||||
return db_->Delete(rocksdb::WriteOptions(), default_family_, api_path);
|
||||
});
|
||||
res = perform_action(function_name,
|
||||
[this, &api_path, &source_path](
|
||||
rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
auto res = txn->Delete(pinned_family_, api_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
res = txn->Delete(size_family_, api_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (not source_path.empty()) {
|
||||
res = txn->Delete(source_family_, source_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
return txn->Delete(default_family_, api_path);
|
||||
});
|
||||
if (res != api_error::success) {
|
||||
utils::error::raise_api_path_error(function_name, api_path, res,
|
||||
"failed to remove api path");
|
||||
}
|
||||
}
|
||||
|
||||
auto rdb_meta_db::remove_item_meta(const std::string &api_path,
|
||||
@ -325,15 +384,17 @@ auto rdb_meta_db::set_item_meta(const std::string &api_path,
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (key == META_PINNED) {
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Put(rocksdb::WriteOptions(), pinned_family_, api_path, value);
|
||||
});
|
||||
return perform_action(function_name,
|
||||
[&](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Put(pinned_family_, api_path, value);
|
||||
});
|
||||
}
|
||||
|
||||
if (key == META_SIZE) {
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Put(rocksdb::WriteOptions(), size_family_, api_path, value);
|
||||
});
|
||||
return perform_action(function_name,
|
||||
[&](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
return txn->Put(size_family_, api_path, value);
|
||||
});
|
||||
}
|
||||
|
||||
json json_data;
|
||||
@ -393,51 +454,50 @@ auto rdb_meta_db::update_item_meta(const std::string &api_path, json json_data)
|
||||
json_data[META_SIZE] = std::to_string(size);
|
||||
json_data[META_SOURCE] = source_path;
|
||||
|
||||
auto should_del_source{false};
|
||||
std::string orig_source_path;
|
||||
if (not directory) {
|
||||
std::string orig_source_path;
|
||||
auto res = get_item_meta(api_path, META_SOURCE, orig_source_path);
|
||||
if (res != api_error::success && res != api_error::item_not_found) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (not orig_source_path.empty() && orig_source_path != source_path) {
|
||||
res = perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
return db_->Delete(rocksdb::WriteOptions(), source_family_,
|
||||
orig_source_path);
|
||||
});
|
||||
if (res != api_error::success && res != api_error::item_not_found) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
should_del_source =
|
||||
not orig_source_path.empty() && orig_source_path != source_path;
|
||||
}
|
||||
|
||||
json_data.erase(META_PINNED);
|
||||
json_data.erase(META_SIZE);
|
||||
|
||||
return perform_action(function_name, [&]() -> rocksdb::Status {
|
||||
auto res = db_->Put(rocksdb::WriteOptions(), pinned_family_, api_path,
|
||||
utils::string::from_bool(pinned));
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
return perform_action(
|
||||
function_name, [&](rocksdb::Transaction *txn) -> rocksdb::Status {
|
||||
if (should_del_source) {
|
||||
auto res = txn->Delete(source_family_, orig_source_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
res = db_->Put(rocksdb::WriteOptions(), size_family_, api_path,
|
||||
std::to_string(size));
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
auto res = txn->Put(pinned_family_, api_path,
|
||||
utils::string::from_bool(pinned));
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
if (not source_path.empty()) {
|
||||
res = db_->Put(rocksdb::WriteOptions(), source_family_, source_path,
|
||||
api_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
res = txn->Put(size_family_, api_path, std::to_string(size));
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return db_->Put(rocksdb::WriteOptions(), default_family_, api_path,
|
||||
json_data.dump());
|
||||
});
|
||||
if (not source_path.empty()) {
|
||||
res = txn->Put(source_family_, source_path, api_path);
|
||||
if (not res.ok()) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
return txn->Put(default_family_, api_path, json_data.dump());
|
||||
});
|
||||
} catch (const std::exception &e) {
|
||||
utils::error::raise_api_path_error(function_name, api_path, e,
|
||||
"failed to update item meta");
|
||||
|
Loading…
x
Reference in New Issue
Block a user