This commit is contained in:
@@ -70,6 +70,8 @@ private:
|
||||
|
||||
void reader_thread();
|
||||
|
||||
void update_position(std::size_t count, bool is_forward);
|
||||
|
||||
public:
|
||||
auto close() -> bool override;
|
||||
|
||||
|
@@ -71,6 +71,8 @@ private:
|
||||
|
||||
void reader_thread();
|
||||
|
||||
void update_position(std::size_t count, bool is_forward);
|
||||
|
||||
public:
|
||||
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
|
@@ -116,12 +116,21 @@ auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
unlock_and_notify();
|
||||
|
||||
auto res{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
chunk_lock.lock();
|
||||
if (chunk < ring_begin_ || chunk > ring_end_) {
|
||||
res = api_error::invalid_ring_buffer_position;
|
||||
unlock_and_notify();
|
||||
active_download->notify(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
if (res == api_error::success) {
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress =
|
||||
@@ -139,32 +148,7 @@ auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
}
|
||||
|
||||
void direct_open_file::forward(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
|
||||
if ((ring_pos_ + count) <= ring_end_) {
|
||||
ring_pos_ += count;
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
return update_position(count, true);
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
@@ -178,30 +162,7 @@ auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
}
|
||||
|
||||
void direct_open_file::reverse(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
count = std::min(ring_pos_, count);
|
||||
|
||||
if ((ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ -= count;
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
return update_position(count, false);
|
||||
}
|
||||
|
||||
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
@@ -240,8 +201,7 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
|
||||
res = download_chunk(chunk, false);
|
||||
if (res != api_error::success) {
|
||||
if (not stop_requested_ &&
|
||||
res == api_error::invalid_ring_buffer_position) {
|
||||
if (res == api_error::invalid_ring_buffer_position) {
|
||||
read_lock.unlock();
|
||||
|
||||
// TODO limit retry
|
||||
@@ -320,4 +280,45 @@ void direct_open_file::set_api_path(const std::string &api_path) {
|
||||
open_file_base::set_api_path(api_path);
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void direct_open_file::update_position(std::size_t count, bool is_forward) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
|
||||
if (is_forward) {
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
} else {
|
||||
count = std::min(ring_pos_, count);
|
||||
}
|
||||
|
||||
if (is_forward ? (ring_pos_ + count) <= ring_end_
|
||||
: (ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
} else {
|
||||
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
|
||||
: count - (ring_pos_ - ring_begin_);
|
||||
|
||||
if (delta >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < delta; ++idx) {
|
||||
if (is_forward) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
} else {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
}
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
} // namespace repertory
|
||||
|
@@ -179,11 +179,18 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
unlock_and_notify();
|
||||
|
||||
auto res{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
auto res = get_provider().read_file_bytes(
|
||||
get_api_path(), data_size, data_offset, buffer, stop_requested_);
|
||||
|
||||
chunk_lock.lock();
|
||||
if (chunk < ring_begin_ || chunk > ring_end_) {
|
||||
res = api_error::invalid_ring_buffer_position;
|
||||
unlock_and_notify();
|
||||
active_download->notify(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
if (res == api_error::success) {
|
||||
res = do_io([&]() -> api_error {
|
||||
@@ -214,32 +221,7 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::forward(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
|
||||
if ((ring_pos_ + count) <= ring_end_) {
|
||||
ring_pos_ += count;
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
update_position(count, true);
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
@@ -294,8 +276,7 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
|
||||
res = download_chunk(chunk, false);
|
||||
if (res != api_error::success) {
|
||||
if (not stop_requested_ &&
|
||||
res == api_error::invalid_ring_buffer_position) {
|
||||
if (res == api_error::invalid_ring_buffer_position) {
|
||||
read_lock.unlock();
|
||||
|
||||
// TODO limit retry
|
||||
@@ -377,9 +358,6 @@ void ring_buffer_open_file::reader_thread() {
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(get_api_path(), source_path_,
|
||||
@@ -387,30 +365,7 @@ void ring_buffer_open_file::reader_thread() {
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
count = std::min(ring_pos_, count);
|
||||
|
||||
if ((ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ -= count;
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
update_position(count, false);
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::set(std::size_t first_chunk,
|
||||
@@ -442,4 +397,46 @@ void ring_buffer_open_file::set_api_path(const std::string &api_path) {
|
||||
open_file_base::set_api_path(api_path);
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::update_position(std::size_t count,
|
||||
bool is_forward) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
|
||||
if (is_forward) {
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
} else {
|
||||
count = std::min(ring_pos_, count);
|
||||
}
|
||||
|
||||
if (is_forward ? (ring_pos_ + count) <= ring_end_
|
||||
: (ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
} else {
|
||||
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
|
||||
: count - (ring_pos_ - ring_begin_);
|
||||
|
||||
if (delta >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < delta; ++idx) {
|
||||
if (is_forward) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
} else {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
}
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
} // namespace repertory
|
||||
|
Reference in New Issue
Block a user