Compare commits
2 Commits
1c2927790b
...
424a1f6cce
Author | SHA1 | Date | |
---|---|---|---|
424a1f6cce | |||
1667c18d7e |
@ -40,7 +40,7 @@ direct_open_file::direct_open_file(std::uint64_t chunk_size,
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -104,36 +104,31 @@ auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
return active_download->wait();
|
||||
}
|
||||
|
||||
if (not ring_state_[chunk % ring_state_.size()]) {
|
||||
if (ring_state_[chunk % ring_state_.size()]) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download{std::make_shared<download>()};
|
||||
active_downloads_[chunk] = active_download;
|
||||
ring_state_[chunk % ring_state_.size()] = false;
|
||||
|
||||
auto &buffer = ring_data_.at(chunk % ring_state_.size());
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
unlock_and_notify();
|
||||
|
||||
auto res{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
chunk_lock.lock();
|
||||
if (res == api_error::success) {
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(get_api_path(), "direct",
|
||||
progress);
|
||||
res = (chunk >= ring_begin_ && chunk <= ring_end_)
|
||||
? res
|
||||
: api_error::invalid_ring_buffer_position;
|
||||
}
|
||||
|
||||
active_downloads_.erase(chunk);
|
||||
@ -154,12 +149,12 @@ void direct_open_file::forward(std::size_t count) {
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
@ -174,13 +169,12 @@ void direct_open_file::forward(std::size_t count) {
|
||||
|
||||
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
auto read_state = ring_state_;
|
||||
return read_state.flip();
|
||||
return ring_state_;
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return not ring_state_[chunk % ring_state_.size()];
|
||||
return ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
void direct_open_file::reverse(std::size_t count) {
|
||||
@ -192,12 +186,12 @@ void direct_open_file::reverse(std::size_t count) {
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
@ -271,7 +265,7 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data.insert(data.end(), begin, end);
|
||||
|
||||
read_offset = 0U;
|
||||
read_size -= static_cast<std::uint64_t>(std::distance(begin, end));
|
||||
read_size -= to_read;
|
||||
}
|
||||
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
@ -303,7 +297,7 @@ void direct_open_file::reader_thread() {
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
if (ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
}
|
||||
|
||||
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
|
||||
ring_state_.set(0U, ring_size, true);
|
||||
ring_state_.set(0U, ring_size, false);
|
||||
}
|
||||
|
||||
ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
@ -166,14 +166,12 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
return active_download->wait();
|
||||
}
|
||||
|
||||
if (not ring_state_[chunk % ring_state_.size()]) {
|
||||
if (ring_state_[chunk % ring_state_.size()]) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download{std::make_shared<download>()};
|
||||
active_downloads_[chunk] = active_download;
|
||||
ring_state_[chunk % ring_state_.size()] = false;
|
||||
unlock_and_notify();
|
||||
|
||||
data_buffer buffer;
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
@ -186,26 +184,25 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
chunk_lock.lock();
|
||||
if (res == api_error::success) {
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(get_api_path(),
|
||||
source_path_, progress);
|
||||
res =
|
||||
(chunk >= ring_begin_ && chunk <= ring_end_)
|
||||
? do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer,
|
||||
(chunk % ring_state_.size()) * get_chunk_size(),
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
}
|
||||
res = do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(),
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
return api_error::os_error;
|
||||
})
|
||||
: api_error::invalid_ring_buffer_position;
|
||||
return api_error::os_error;
|
||||
});
|
||||
|
||||
if (res == api_error::success) {
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress = (static_cast<double>(chunk + 1U) /
|
||||
static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(get_api_path(),
|
||||
source_path_, progress);
|
||||
}
|
||||
}
|
||||
|
||||
active_downloads_.erase(chunk);
|
||||
@ -226,12 +223,12 @@ void ring_buffer_open_file::forward(std::size_t count) {
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
@ -246,13 +243,12 @@ void ring_buffer_open_file::forward(std::size_t count) {
|
||||
|
||||
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
auto read_state = ring_state_;
|
||||
return read_state.flip();
|
||||
return ring_state_;
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return not ring_state_[chunk % ring_state_.size()];
|
||||
return ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::native_operation(
|
||||
@ -269,12 +265,12 @@ void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = true;
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
@ -398,7 +394,7 @@ void ring_buffer_open_file::reader_thread() {
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
if (ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
@ -435,7 +431,7 @@ void ring_buffer_open_file::set(std::size_t first_chunk,
|
||||
}
|
||||
|
||||
ring_pos_ = current_chunk;
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
Reference in New Issue
Block a user