try to fix ring buffer
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good
This commit is contained in:
parent
5df2a5c3c0
commit
e4a80e22f3
@ -32,10 +32,6 @@ class i_upload_manager;
|
||||
|
||||
class ring_buffer_open_file final : public open_file_base {
|
||||
public:
|
||||
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout, filesystem_item fsi,
|
||||
i_provider &provider);
|
||||
|
||||
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout, filesystem_item fsi,
|
||||
i_provider &provider, std::size_t ring_size);
|
||||
@ -60,9 +56,9 @@ private:
|
||||
std::unique_ptr<std::thread> chunk_reverse_thread_;
|
||||
std::condition_variable chunk_notify_;
|
||||
mutable std::mutex chunk_mtx_;
|
||||
std::size_t current_chunk_{};
|
||||
std::size_t first_chunk_{};
|
||||
std::size_t last_chunk_;
|
||||
std::size_t ring_begin_{};
|
||||
std::size_t ring_end_{};
|
||||
std::size_t ring_pos_{};
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
private:
|
||||
@ -87,16 +83,14 @@ public:
|
||||
void forward(std::size_t count);
|
||||
|
||||
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
|
||||
return current_chunk_;
|
||||
return ring_pos_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
|
||||
return first_chunk_;
|
||||
return ring_begin_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_last_chunk() const -> std::size_t {
|
||||
return last_chunk_;
|
||||
}
|
||||
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
|
||||
|
||||
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
|
||||
|
||||
|
@ -34,15 +34,6 @@
|
||||
#include "utils/utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout,
|
||||
filesystem_item fsi,
|
||||
i_provider &provider)
|
||||
: ring_buffer_open_file(std::move(buffer_directory), chunk_size,
|
||||
chunk_timeout, std::move(fsi), provider,
|
||||
(1024ULL * 1024ULL * 1024ULL) / chunk_size) {}
|
||||
|
||||
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout,
|
||||
@ -65,7 +56,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
throw std::runtime_error("file size is less than ring buffer size");
|
||||
}
|
||||
|
||||
last_chunk_ = ring_state_.size() - 1U;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
|
||||
buffer_directory = utils::path::absolute(buffer_directory);
|
||||
@ -131,8 +123,8 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_
|
||||
: chunk_size_);
|
||||
data_buffer buffer(chunk == (total_chunks_ - 1U) ? last_chunk_size_
|
||||
: chunk_size_);
|
||||
|
||||
auto res =
|
||||
provider_.read_file_bytes(fsi_.api_path, buffer.size(),
|
||||
@ -165,28 +157,28 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error {
|
||||
|
||||
void ring_buffer_open_file::forward(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if ((current_chunk_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - current_chunk_;
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
|
||||
if ((current_chunk_ + count) <= last_chunk_) {
|
||||
current_chunk_ += count;
|
||||
if ((ring_pos_ + count) <= ring_end_) {
|
||||
ring_pos_ += count;
|
||||
} else {
|
||||
const auto added = count - (last_chunk_ - current_chunk_);
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
current_chunk_ += count;
|
||||
first_chunk_ += added;
|
||||
last_chunk_ =
|
||||
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true;
|
||||
}
|
||||
first_chunk_ += added;
|
||||
current_chunk_ += count;
|
||||
last_chunk_ =
|
||||
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,28 +203,28 @@ auto ring_buffer_open_file::native_operation(
|
||||
|
||||
void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if (current_chunk_ < count) {
|
||||
count = current_chunk_;
|
||||
if (ring_pos_ < count) {
|
||||
count = ring_pos_;
|
||||
}
|
||||
|
||||
if ((current_chunk_ - count) >= first_chunk_) {
|
||||
current_chunk_ -= count;
|
||||
if ((ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ -= count;
|
||||
} else {
|
||||
const auto removed = count - (current_chunk_ - first_chunk_);
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
current_chunk_ -= count;
|
||||
first_chunk_ = current_chunk_;
|
||||
last_chunk_ =
|
||||
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = true;
|
||||
}
|
||||
first_chunk_ -= removed;
|
||||
current_chunk_ -= count;
|
||||
last_chunk_ =
|
||||
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,6 +238,7 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
data.clear();
|
||||
reset_timeout();
|
||||
|
||||
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
|
||||
@ -253,46 +246,54 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
const auto start_chunk_index =
|
||||
static_cast<std::size_t>(read_offset / chunk_size_);
|
||||
read_offset = read_offset - (start_chunk_index * chunk_size_);
|
||||
data_buffer buffer(chunk_size_);
|
||||
auto begin_chunk = static_cast<std::size_t>(read_offset / chunk_size_);
|
||||
read_offset = read_offset - (begin_chunk * chunk_size_);
|
||||
|
||||
auto res = api_error::success;
|
||||
for (std::size_t chunk = start_chunk_index;
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
(res == api_error::success) && (read_size > 0U); ++chunk) {
|
||||
if (chunk > current_chunk_) {
|
||||
forward(chunk - current_chunk_);
|
||||
} else if (chunk < current_chunk_) {
|
||||
reverse(current_chunk_ - chunk);
|
||||
if (chunk > ring_pos_) {
|
||||
forward(chunk - ring_pos_);
|
||||
} else if (chunk < ring_pos_) {
|
||||
reverse(ring_pos_ - chunk);
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
res = download_chunk(chunk);
|
||||
if (res != api_error::success) {
|
||||
continue;
|
||||
return res;
|
||||
}
|
||||
|
||||
const auto to_read = std::min(
|
||||
static_cast<std::size_t>(chunk_size_ - read_offset), read_size);
|
||||
res = do_io([this, &buffer, &chunk, &data, read_offset,
|
||||
&to_read]() -> api_error {
|
||||
reset_timeout();
|
||||
|
||||
auto to_read = std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
|
||||
read_size);
|
||||
res = do_io([&]() -> api_error {
|
||||
data_buffer buffer(to_read);
|
||||
|
||||
std::size_t bytes_read{};
|
||||
auto ret = nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
if (ret == api_error::success) {
|
||||
data.insert(
|
||||
data.end(), buffer.begin() + static_cast<std::int64_t>(read_offset),
|
||||
buffer.begin() + static_cast<std::int64_t>(read_offset + to_read));
|
||||
reset_timeout();
|
||||
auto result = nf_->read(buffer,
|
||||
(((chunk % ring_state_.size()) * chunk_size_) +
|
||||
read_offset),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
buffer.resize(bytes_read);
|
||||
|
||||
if (result != api_error::success) {
|
||||
return result;
|
||||
}
|
||||
|
||||
return ret;
|
||||
reset_timeout();
|
||||
|
||||
data.insert(data.end(), buffer.begin(), buffer.end());
|
||||
read_size -= buffer.size();
|
||||
|
||||
return result;
|
||||
});
|
||||
|
||||
read_offset = 0U;
|
||||
read_size -= to_read;
|
||||
}
|
||||
|
||||
return res;
|
||||
@ -306,16 +307,17 @@ void ring_buffer_open_file::set(std::size_t first_chunk,
|
||||
throw std::runtime_error("first chunk must be less than total chunks");
|
||||
}
|
||||
|
||||
first_chunk_ = first_chunk;
|
||||
last_chunk_ = first_chunk_ + ring_state_.size() - 1U;
|
||||
ring_begin_ = first_chunk;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
|
||||
if (current_chunk > last_chunk_) {
|
||||
if (current_chunk > ring_end_) {
|
||||
chunk_notify_.notify_all();
|
||||
throw std::runtime_error(
|
||||
"current chunk must be less than or equal to last chunk");
|
||||
}
|
||||
|
||||
current_chunk_ = current_chunk;
|
||||
ring_pos_ = current_chunk;
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
|
@ -315,7 +315,7 @@ TEST(ring_buffer_open_file, can_reverse_full_ring) {
|
||||
}
|
||||
|
||||
TEST(ring_buffer_open_file, read_full_file) {
|
||||
auto &nf = test::create_random_file(test_chunk_size * 32u);
|
||||
auto &nf = test::create_random_file(test_chunk_size * 33u + 11u);
|
||||
auto download_source_path = nf.get_path();
|
||||
|
||||
auto dest_path = test::generate_test_file_name("ring_buffer_open_file");
|
||||
@ -327,7 +327,7 @@ TEST(ring_buffer_open_file, read_full_file) {
|
||||
filesystem_item fsi;
|
||||
fsi.directory = false;
|
||||
fsi.api_path = "/test.txt";
|
||||
fsi.size = test_chunk_size * 32u;
|
||||
fsi.size = test_chunk_size * 33u + 11u;
|
||||
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
|
||||
|
||||
EXPECT_CALL(mp, read_file_bytes)
|
||||
|
Loading…
x
Reference in New Issue
Block a user