ring buffer fixes
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
Scott E. Graves 2024-12-22 18:47:20 -06:00
parent e4a80e22f3
commit 3bdb342d24
2 changed files with 26 additions and 18 deletions

View File

@ -56,6 +56,7 @@ private:
std::unique_ptr<std::thread> chunk_reverse_thread_;
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::mutex read_mtx_;
std::size_t ring_begin_{};
std::size_t ring_end_{};
std::size_t ring_pos_{};

View File

@ -43,7 +43,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
if ((ring_size % 2U) != 0U) {
throw std::runtime_error("ring size must be a multiple of 2");
}
@ -52,13 +52,12 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
throw std::runtime_error("ring size must be greater than or equal to 4");
}
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
if (not can_handle_file(fsi_.size, chunk_size, ring_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), true);
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
ring_state_.set(0U, ring_size, true);
buffer_directory = utils::path::absolute(buffer_directory);
if (not utils::file::directory(buffer_directory).create_directory()) {
@ -75,7 +74,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * chunk_size)) {
if (not nf_->truncate(ring_size * chunk_size)) {
nf_->close();
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
@ -117,18 +116,22 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error {
}
if (ring_state_[chunk % ring_state_.size()]) {
auto active_download = std::make_shared<download>();
auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all();
chunk_lock.unlock();
data_buffer buffer(chunk == (total_chunks_ - 1U) ? last_chunk_size_
: chunk_size_);
data_buffer buffer;
auto data_offset{chunk * chunk_size_};
auto data_size{
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
auto res =
provider_.read_file_bytes(fsi_.api_path, buffer.size(),
chunk * chunk_size_, buffer, stop_requested_);
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
};
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
@ -238,18 +241,19 @@ auto ring_buffer_open_file::read(std::size_t read_size,
return api_error::invalid_operation;
}
data.clear();
reset_timeout();
mutex_lock lock(read_mtx_);
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
auto begin_chunk = static_cast<std::size_t>(read_offset / chunk_size_);
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
read_offset = read_offset - (begin_chunk * chunk_size_);
auto res = api_error::success;
auto res{api_error::success};
for (std::size_t chunk = begin_chunk;
(res == api_error::success) && (read_size > 0U); ++chunk) {
if (chunk > ring_pos_) {
@ -267,8 +271,11 @@ auto ring_buffer_open_file::read(std::size_t read_size,
reset_timeout();
auto to_read = std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
read_size);
auto to_read{
std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
read_size),
};
res = do_io([&]() -> api_error {
data_buffer buffer(to_read);
@ -288,7 +295,7 @@ auto ring_buffer_open_file::read(std::size_t read_size,
reset_timeout();
data.insert(data.end(), buffer.begin(), buffer.end());
read_size -= buffer.size();
read_size -= bytes_read;
return result;
});