ring buffer background reader

This commit is contained in:
Scott E. Graves 2024-12-23 07:46:25 -06:00
parent 0a70469cd0
commit 75a4676eac
4 changed files with 131 additions and 78 deletions

View File

@ -42,8 +42,8 @@ public:
ring_buffer_open_file() = delete; ring_buffer_open_file() = delete;
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete; ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete; ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
auto operator=(ring_buffer_open_file &&) noexcept -> ring_buffer_open_file & = auto operator=(ring_buffer_open_file &&) noexcept
delete; -> ring_buffer_open_file & = delete;
auto operator=(const ring_buffer_open_file &) noexcept auto operator=(const ring_buffer_open_file &) noexcept
-> ring_buffer_open_file & = delete; -> ring_buffer_open_file & = delete;
@ -64,11 +64,9 @@ private:
stop_type stop_requested_{false}; stop_type stop_requested_{false};
private: private:
auto download_chunk(std::size_t chunk) -> api_error; auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void forward_reader_thread(std::size_t count); void background_reader_thread();
void reverse_reader_thread(std::size_t count);
protected: protected:
[[nodiscard]] auto is_download_complete() const -> bool override { [[nodiscard]] auto is_download_complete() const -> bool override {
@ -108,8 +106,8 @@ public:
return false; return false;
} }
[[nodiscard]] auto [[nodiscard]] auto native_operation(native_operation_callback callback)
native_operation(native_operation_callback callback) -> api_error override; -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback) [[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback)
-> api_error override { -> api_error override {
@ -129,8 +127,8 @@ public:
void set_api_path(const std::string &api_path) override; void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, [[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
std::size_t &) -> api_error override { -> api_error override {
return api_error::not_supported; return api_error::not_supported;
} }
}; };

View File

@ -194,6 +194,7 @@ enum class api_error {
invalid_handle, invalid_handle,
invalid_operation, invalid_operation,
invalid_ring_buffer_multiple, invalid_ring_buffer_multiple,
invalid_ring_buffer_position,
invalid_ring_buffer_size, invalid_ring_buffer_size,
invalid_version, invalid_version,
item_exists, item_exists,
@ -215,31 +216,33 @@ enum class api_error {
[[nodiscard]] auto api_error_from_string(std::string_view str) -> api_error; [[nodiscard]] auto api_error_from_string(std::string_view str) -> api_error;
[[nodiscard]] auto [[nodiscard]] auto api_error_to_string(const api_error &error)
api_error_to_string(const api_error &error) -> const std::string &; -> const std::string &;
enum class database_type { enum class database_type {
rocksdb, rocksdb,
sqlite, sqlite,
}; };
[[nodiscard]] auto database_type_from_string(
std::string type,
database_type default_type = database_type::rocksdb) -> database_type;
[[nodiscard]] auto [[nodiscard]] auto
database_type_to_string(const database_type &type) -> std::string; database_type_from_string(std::string type,
database_type default_type = database_type::rocksdb)
-> database_type;
[[nodiscard]] auto database_type_to_string(const database_type &type)
-> std::string;
enum class download_type { enum class download_type {
direct, direct,
fallback, fallback,
ring_buffer, ring_buffer,
}; };
[[nodiscard]] auto download_type_from_string(
std::string type,
download_type default_type = download_type::fallback) -> download_type;
[[nodiscard]] auto [[nodiscard]] auto
download_type_to_string(const download_type &type) -> std::string; download_type_from_string(std::string type,
download_type default_type = download_type::fallback)
-> download_type;
[[nodiscard]] auto download_type_to_string(const download_type &type)
-> std::string;
enum class exit_code : std::int32_t { enum class exit_code : std::int32_t {
success = 0, success = 0,

View File

@ -19,6 +19,8 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include <algorithm>
#include "file_manager/ring_buffer_open_file.hpp" #include "file_manager/ring_buffer_open_file.hpp"
#include "app_config.hpp" #include "app_config.hpp"
@ -69,7 +71,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
source_path_ = source_path_ =
utils::path::combine(buffer_directory, {utils::create_uuid_string()}); utils::path::combine(buffer_directory, {utils::create_uuid_string()});
nf_ = utils::file::file::open_or_create_file(source_path_); nf_ = utils::file::file::open_or_create_file(source_path_);
if (not *nf_) { if (not*nf_) {
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
utils::get_last_error_code())); utils::get_last_error_code()));
} }
@ -105,57 +107,76 @@ auto ring_buffer_open_file::close() -> bool {
return open_file_base::close(); return open_file_base::close();
} }
auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error { auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active)
-> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_); unique_mutex_lock chunk_lock(chunk_mtx_);
if (active_downloads_.find(chunk) != active_downloads_.end()) { const auto unlock_and_notify = [this, &chunk_lock]() {
auto active_download = active_downloads_.at(chunk);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
chunk_lock.unlock(); chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = active_downloads_.at(chunk);
unlock_and_notify();
return active_download->wait(); return active_download->wait();
} }
if (ring_state_[chunk % ring_state_.size()]) { if (not ring_state_[chunk % ring_state_.size()]) {
auto active_download{std::make_shared<download>()}; return unlock_and_return(api_error::success);
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all();
chunk_lock.unlock();
data_buffer buffer;
auto data_offset{chunk * chunk_size_};
auto data_size{
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
};
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_,
&bytes_written)) {
return api_error::success;
}
return api_error::os_error;
});
}
active_download->notify(res);
chunk_lock.lock();
active_downloads_.erase(chunk);
chunk_notify_.notify_all();
return res;
} }
chunk_notify_.notify_all(); auto active_download{std::make_shared<download>()};
chunk_lock.unlock(); active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
unlock_and_notify();
return api_error::success; data_buffer buffer;
auto data_offset{chunk * chunk_size_};
auto data_size{
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
};
chunk_lock.lock();
if (res == api_error::success) {
res = (chunk >= ring_begin_ && chunk <= ring_end_)
? do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer,
(chunk % ring_state_.size()) * chunk_size_,
&bytes_written)) {
return api_error::success;
}
return api_error::os_error;
})
: api_error::invalid_ring_buffer_position;
}
active_downloads_.erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
} }
void ring_buffer_open_file::forward(std::size_t count) { void ring_buffer_open_file::forward(std::size_t count) {
@ -179,6 +200,7 @@ void ring_buffer_open_file::forward(std::size_t count) {
ring_begin_ += added; ring_begin_ += added;
ring_pos_ += count; ring_pos_ += count;
} }
ring_end_ = ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
} }
@ -204,9 +226,7 @@ auto ring_buffer_open_file::native_operation(
void ring_buffer_open_file::reverse(std::size_t count) { void ring_buffer_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_); mutex_lock chunk_lock(chunk_mtx_);
if (ring_pos_ < count) { count = std::min(ring_pos_, count);
count = ring_pos_;
}
if ((ring_pos_ - count) >= ring_begin_) { if ((ring_pos_ - count) >= ring_begin_) {
ring_pos_ -= count; ring_pos_ -= count;
@ -232,16 +252,14 @@ void ring_buffer_open_file::reverse(std::size_t count) {
} }
auto ring_buffer_open_file::read(std::size_t read_size, auto ring_buffer_open_file::read(std::size_t read_size,
std::uint64_t read_offset, std::uint64_t read_offset, data_buffer &data)
data_buffer &data) -> api_error { -> api_error {
if (fsi_.directory) { if (fsi_.directory) {
return api_error::invalid_operation; return api_error::invalid_operation;
} }
reset_timeout(); reset_timeout();
mutex_lock lock(read_mtx_);
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
if (read_size == 0U) { if (read_size == 0U) {
return api_error::success; return api_error::success;
@ -251,6 +269,8 @@ auto ring_buffer_open_file::read(std::size_t read_size,
read_offset = read_offset - (begin_chunk * chunk_size_); read_offset = read_offset - (begin_chunk * chunk_size_);
auto res{api_error::success}; auto res{api_error::success};
unique_mutex_lock read_lock(read_mtx_);
for (std::size_t chunk = begin_chunk; for (std::size_t chunk = begin_chunk;
(res == api_error::success) && (read_size > 0U); ++chunk) { (res == api_error::success) && (read_size > 0U); ++chunk) {
if (chunk > ring_pos_) { if (chunk > ring_pos_) {
@ -259,10 +279,16 @@ auto ring_buffer_open_file::read(std::size_t read_size,
reverse(ring_pos_ - chunk); reverse(ring_pos_ - chunk);
} }
reset_timeout(); res = download_chunk(chunk, false);
res = download_chunk(chunk);
if (res != api_error::success) { if (res != api_error::success) {
if (not stop_requested_ &&
res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res; return res;
} }
@ -331,4 +357,29 @@ void ring_buffer_open_file::set_api_path(const std::string &api_path) {
open_file_base::set_api_path(api_path); open_file_base::set_api_path(api_path);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
} }
void ring_buffer_open_file::background_reader_thread() {
unique_mutex_lock read_lock(read_mtx_);
read_lock.unlock();
while (not stop_requested_) {
read_lock.lock();
auto next_chunk =
ring_pos_ + 1U >= ring_state_.size() ? 0U : ring_pos_ + 1U;
if (not ring_state_[next_chunk % ring_state_.size()]) {
read_lock.unlock();
continue;
}
read_lock.unlock();
download_chunk(next_chunk, true);
unique_mutex_lock chunk_lock(chunk_mtx_);
if (ring_state_.none()) {
chunk_notify_.wait(chunk_lock);
}
chunk_notify_.notify_all();
}
}
} // namespace repertory } // namespace repertory

View File

@ -25,8 +25,8 @@
#include "utils/string.hpp" #include "utils/string.hpp"
namespace repertory { namespace repertory {
auto database_type_from_string(std::string type, auto database_type_from_string(std::string type, database_type default_type)
database_type default_type) -> database_type { -> database_type {
type = utils::string::to_lower(utils::string::trim(type)); type = utils::string::to_lower(utils::string::trim(type));
if (type == "rocksdb") { if (type == "rocksdb") {
return database_type::rocksdb; return database_type::rocksdb;
@ -50,8 +50,8 @@ auto database_type_to_string(const database_type &type) -> std::string {
} }
} }
auto download_type_from_string(std::string type, auto download_type_from_string(std::string type, download_type default_type)
download_type default_type) -> download_type { -> download_type {
type = utils::string::to_lower(utils::string::trim(type)); type = utils::string::to_lower(utils::string::trim(type));
if (type == "direct") { if (type == "direct") {
return download_type::direct; return download_type::direct;
@ -106,6 +106,7 @@ static const std::unordered_map<api_error, std::string> LOOKUP = {
{api_error::invalid_handle, "invalid_handle"}, {api_error::invalid_handle, "invalid_handle"},
{api_error::invalid_operation, "invalid_operation"}, {api_error::invalid_operation, "invalid_operation"},
{api_error::invalid_ring_buffer_multiple, "invalid_ring_buffer_multiple"}, {api_error::invalid_ring_buffer_multiple, "invalid_ring_buffer_multiple"},
{api_error::invalid_ring_buffer_position, "invalid_ring_buffer_position"},
{api_error::invalid_ring_buffer_size, "invalid_ring_buffer_size"}, {api_error::invalid_ring_buffer_size, "invalid_ring_buffer_size"},
{api_error::invalid_version, "invalid_version"}, {api_error::invalid_version, "invalid_version"},
{api_error::item_exists, "item_exists"}, {api_error::item_exists, "item_exists"},