revert
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
2024-12-28 08:34:08 -06:00
parent 6fdda46869
commit 504556bedd
4 changed files with 100 additions and 91 deletions

View File

@ -70,8 +70,6 @@ private:
void reader_thread();
void update_position(std::size_t count, bool is_forward);
public:
auto close() -> bool override;

View File

@ -71,8 +71,6 @@ private:
void reader_thread();
void update_position(std::size_t count, bool is_forward);
public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,

View File

@ -139,7 +139,32 @@ auto direct_open_file::download_chunk(std::size_t chunk,
}
void direct_open_file::forward(std::size_t count) {
return update_position(count, true);
mutex_lock chunk_lock(chunk_mtx_);
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
if ((ring_pos_ + count) <= ring_end_) {
ring_pos_ += count;
} else {
auto added = count - (ring_end_ - ring_pos_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += count;
ring_begin_ += added;
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
}
ring_begin_ += added;
ring_pos_ += count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
@ -153,7 +178,30 @@ auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
}
void direct_open_file::reverse(std::size_t count) {
return update_position(count, false);
mutex_lock chunk_lock(chunk_mtx_);
count = std::min(ring_pos_, count);
if ((ring_pos_ - count) >= ring_begin_) {
ring_pos_ -= count;
} else {
auto removed = count - (ring_pos_ - ring_begin_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ -= count;
ring_begin_ = ring_pos_;
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
ring_begin_ -= removed;
ring_pos_ -= count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
@ -272,45 +320,4 @@ void direct_open_file::set_api_path(const std::string &api_path) {
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void direct_open_file::update_position(std::size_t count, bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
} else {
count = std::min(ring_pos_, count);
}
if (is_forward ? (ring_pos_ + count) <= ring_end_
: (ring_pos_ - count) >= ring_begin_) {
ring_pos_ += is_forward ? count : -count;
} else {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
} // namespace repertory

View File

@ -214,7 +214,32 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
}
void ring_buffer_open_file::forward(std::size_t count) {
update_position(count, true);
mutex_lock chunk_lock(chunk_mtx_);
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
if ((ring_pos_ + count) <= ring_end_) {
ring_pos_ += count;
} else {
auto added = count - (ring_end_ - ring_pos_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += count;
ring_begin_ += added;
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
}
ring_begin_ += added;
ring_pos_ += count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
@ -362,7 +387,30 @@ void ring_buffer_open_file::reader_thread() {
}
void ring_buffer_open_file::reverse(std::size_t count) {
update_position(count, false);
mutex_lock chunk_lock(chunk_mtx_);
count = std::min(ring_pos_, count);
if ((ring_pos_ - count) >= ring_begin_) {
ring_pos_ -= count;
} else {
auto removed = count - (ring_pos_ - ring_begin_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ -= count;
ring_begin_ = ring_pos_;
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
ring_begin_ -= removed;
ring_pos_ -= count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
void ring_buffer_open_file::set(std::size_t first_chunk,
@ -394,46 +442,4 @@ void ring_buffer_open_file::set_api_path(const std::string &api_path) {
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void ring_buffer_open_file::update_position(std::size_t count,
bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
} else {
count = std::min(ring_pos_, count);
}
if (is_forward ? (ring_pos_ + count) <= ring_end_
: (ring_pos_ - count) >= ring_begin_) {
ring_pos_ += is_forward ? count : -count;
} else {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
} // namespace repertory