refactor
Some checks are pending
BlockStorage/repertory/pipeline/head Build queued...

This commit is contained in:
2024-12-28 12:49:37 -06:00
parent b3b2a32557
commit aa7d24ef2a
5 changed files with 32 additions and 37 deletions

View File

@ -74,7 +74,6 @@ private:
std::size_t read_chunk_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
mutable std::recursive_mutex rw_mtx_;
stop_type stop_requested_{false};

View File

@ -51,7 +51,7 @@ public:
static constexpr const auto min_ring_size{5U};
private:
boost::dynamic_bitset<> ring_state_;
boost::dynamic_bitset<> read_state_;
std::size_t total_chunks_;
private:
@ -79,7 +79,7 @@ protected:
}
[[nodiscard]] auto get_ring_size() const -> std::size_t {
return ring_state_.size();
return read_state_.size();
}
[[nodiscard]] virtual auto on_check_start() -> bool = 0;

View File

@ -49,7 +49,6 @@ public:
-> ring_buffer_open_file & = delete;
private:
boost::dynamic_bitset<> ring_state_;
std::string source_path_;
private:

View File

@ -32,21 +32,20 @@
namespace repertory {
ring_buffer_base::ring_buffer_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider,
std::size_t ring_size, bool disable_io)
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io),
ring_state_(ring_size),
read_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (disable_io) {
if (fsi.size > 0U) {
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
read_state_.resize(std::min(total_chunks_, read_state_.size()));
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), false);
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
read_state_.set(0U, read_state_.size(), false);
}
} else {
if (ring_size < min_ring_size) {
@ -54,7 +53,7 @@ ring_buffer_base::ring_buffer_base(std::uint64_t chunk_size,
}
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
ring_state_.set(0U, ring_size, false);
read_state_.set(0U, ring_size, false);
}
}
@ -97,7 +96,7 @@ auto ring_buffer_base::close() -> bool {
}
auto ring_buffer_base::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
@ -125,7 +124,7 @@ auto ring_buffer_base::download_chunk(std::size_t chunk,
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
if (read_state_[chunk % read_state_.size()]) {
return unlock_and_return(api_error::success);
}
@ -156,7 +155,7 @@ auto ring_buffer_base::download_chunk(std::size_t chunk,
if (result == api_error::success) {
result = on_chunk_downloaded(chunk, buffer);
if (result == api_error::success) {
ring_state_[chunk % ring_state_.size()] = true;
read_state_[chunk % read_state_.size()] = true;
auto progress = (static_cast<double>(chunk + 1U) /
static_cast<double>(total_chunks_)) *
100.0;
@ -179,16 +178,16 @@ void ring_buffer_base::forward(std::size_t count) {
auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return ring_state_;
return read_state_;
}
auto ring_buffer_base::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()];
return read_state_[chunk % read_state_.size()];
}
auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
data_buffer &data) -> api_error {
if (is_directory()) {
return api_error::invalid_operation;
}
@ -280,7 +279,7 @@ void ring_buffer_base::reader_thread() {
chunk_lock.unlock();
};
if (ring_state_[next_chunk % ring_state_.size()]) {
if (read_state_[next_chunk % read_state_.size()]) {
check_and_wait();
continue;
}
@ -299,8 +298,7 @@ void ring_buffer_base::reverse(std::size_t count) {
update_position(count, false);
}
void ring_buffer_base::set(std::size_t first_chunk,
std::size_t current_chunk) {
void ring_buffer_base::set(std::size_t first_chunk, std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
@ -309,7 +307,7 @@ void ring_buffer_base::set(std::size_t first_chunk,
ring_begin_ = first_chunk;
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
if (current_chunk > ring_end_) {
chunk_notify_.notify_all();
@ -318,7 +316,7 @@ void ring_buffer_base::set(std::size_t first_chunk,
}
ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), true);
read_state_.set(0U, read_state_.size(), true);
chunk_notify_.notify_all();
}
@ -347,16 +345,16 @@ void ring_buffer_base::update_position(std::size_t count, bool is_forward) {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
if (delta >= read_state_.size()) {
read_state_.set(0U, read_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
read_state_[(ring_begin_ + idx) % read_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
read_state_[(ring_end_ - idx) % read_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
@ -364,7 +362,7 @@ void ring_buffer_base::update_position(std::size_t count, bool is_forward) {
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
}
chunk_notify_.notify_all();

View File

@ -37,8 +37,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
i_provider &provider,
std::size_t ring_size)
: ring_buffer_base(chunk_size, chunk_timeout, fsi, provider, ring_size,
false),
ring_state_(ring_size),
false),
source_path_(utils::path::combine(buffer_directory,
{
utils::create_uuid_string(),
@ -98,7 +97,7 @@ auto ring_buffer_open_file::on_check_start() -> bool {
utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * get_chunk_size())) {
if (not nf_->truncate(get_ring_size() * get_chunk_size())) {
nf_->close();
nf_.reset();
@ -113,7 +112,7 @@ auto ring_buffer_open_file::on_chunk_downloaded(
std::size_t chunk, const data_buffer &buffer) -> api_error {
return do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(),
if (nf_->write(buffer, (chunk % get_ring_size()) * get_chunk_size(),
&bytes_written)) {
return api_error::success;
}
@ -127,10 +126,10 @@ auto ring_buffer_open_file::on_read_chunk(
data_buffer &data, std::size_t &bytes_read) -> api_error {
data_buffer buffer(read_size);
auto res = do_io([&]() -> api_error {
return nf_->read(buffer,
(((chunk % ring_state_.size()) * get_chunk_size()) +
read_offset),
&bytes_read)
return nf_->read(
buffer,
(((chunk % get_ring_size()) * get_chunk_size()) + read_offset),
&bytes_read)
? api_error::success
: api_error::os_error;
});