refactor direct_open_file

This commit is contained in:
2024-12-26 07:57:59 -06:00
parent 5d3ee92636
commit bc0e216b75
3 changed files with 338 additions and 54 deletions

View File

@ -19,8 +19,8 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_
#include "file_manager/open_file_base.hpp" #include "file_manager/open_file_base.hpp"
@ -45,11 +45,29 @@ public:
auto operator=(const direct_open_file &) noexcept auto operator=(const direct_open_file &) noexcept
-> direct_open_file & = delete; -> direct_open_file & = delete;
public:
static constexpr const auto ring_size{5U};
private: private:
std::atomic<std::uint64_t> last_progress_{0U}; boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_; std::size_t total_chunks_;
private:
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_;
std::size_t ring_begin_{};
std::array<data_buffer, ring_size> ring_data_;
std::size_t ring_end_{};
std::size_t ring_pos_{};
stop_type stop_requested_{false}; stop_type stop_requested_{false};
private:
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void background_reader_thread();
protected: protected:
[[nodiscard]] auto is_download_complete() const -> bool override { [[nodiscard]] auto is_download_complete() const -> bool override {
return false; return false;
@ -58,33 +76,39 @@ protected:
public: public:
auto close() -> bool override; auto close() -> bool override;
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return ring_pos_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return ring_begin_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; } [[nodiscard]] auto is_complete() const -> bool override { return true; }
[[nodiscard]] auto is_write_supported() const -> bool override { [[nodiscard]] auto is_write_supported() const -> bool override {
return false; return false;
} }
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override {
return {};
}
[[nodiscard]] auto get_read_state(std::size_t /* chunk */) const
-> bool override {
return false;
}
[[nodiscard]] auto get_total_chunks() const -> std::uint64_t {
return total_chunks_;
}
[[nodiscard]] auto native_operation(native_operation_callback /* callback */) [[nodiscard]] auto native_operation(native_operation_callback /* callback */)
-> api_error override { -> api_error override {
return api_error::not_supported; return api_error::not_supported;
} }
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */, [[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /*callback*/) native_operation_callback /* callback */)
-> api_error override { -> api_error override {
return api_error::not_supported; return api_error::not_supported;
} }
@ -92,10 +116,16 @@ public:
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override; data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t /*size*/) -> api_error override { [[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
return api_error::not_supported; return api_error::not_supported;
} }
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &) [[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
-> api_error override { -> api_error override {
return api_error::not_supported; return api_error::not_supported;
@ -103,4 +133,4 @@ public:
}; };
} // namespace repertory } // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_

View File

@ -27,32 +27,234 @@
#include "providers/i_provider.hpp" #include "providers/i_provider.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/common.hpp" #include "utils/common.hpp"
#include "utils/time.hpp"
namespace repertory { namespace repertory {
direct_open_file::direct_open_file(std::uint64_t chunk_size, direct_open_file::direct_open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider) filesystem_item fsi, i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, true), : open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>( total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) { utils::divide_with_ceiling(fsi_.size, chunk_size))) {
event_system::instance().raise<download_begin>(fsi_.api_path, ""); ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), true);
if (fsi_.size > 0U) {
reader_thread_ =
std::make_unique<std::thread>([this]() { background_reader_thread(); });
}
event_system::instance().raise<download_begin>(fsi_.api_path, "direct");
} }
direct_open_file::~direct_open_file() { close(); } direct_open_file::~direct_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
}
void direct_open_file::background_reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
chunk_notify_.notify_all();
chunk_lock.unlock();
while (not stop_requested_) {
chunk_lock.lock();
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
if (stop_requested_) {
chunk_notify_.notify_all();
chunk_lock.unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
next_chunk = ring_pos_;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
};
if (not ring_state_[next_chunk % ring_state_.size()]) {
check_and_wait();
continue;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
download_chunk(next_chunk, true);
chunk_lock.lock();
check_and_wait();
}
event_system::instance().raise<download_end>(fsi_.api_path, "direct",
api_error::download_stopped);
}
auto direct_open_file::close() -> bool { auto direct_open_file::close() -> bool {
stop_requested_ = true; stop_requested_ = true;
last_progress_ = 0U;
auto ret = open_file_base::close();
event_system::instance().raise<download_end>(fsi_.api_path, "", unique_mutex_lock chunk_lock(chunk_mtx_);
api_error::download_stopped); chunk_notify_.notify_all();
return ret; chunk_lock.unlock();
return open_file_base::close();
}
auto direct_open_file::download_chunk(std::size_t chunk, bool skip_active)
-> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = active_downloads_.at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (not ring_state_[chunk % ring_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
unlock_and_notify();
auto &buffer = ring_data_.at(chunk % ring_state_.size());
auto data_offset{chunk * chunk_size_};
auto data_size{
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
event_system::instance().raise<download_chunk_begin>(
fsi_.api_path, "direct", chunk, get_read_state().size(),
get_read_state().count());
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
};
chunk_lock.lock();
if (res == api_error::success) {
auto progress =
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(fsi_.api_path, "direct",
progress);
}
event_system::instance().raise<download_chunk_end>(
fsi_.api_path, "direct", chunk, get_read_state().size(),
get_read_state().count(), res);
active_downloads_.erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
}
void direct_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
if ((ring_pos_ + count) <= ring_end_) {
ring_pos_ += count;
} else {
auto added = count - (ring_end_ - ring_pos_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
ring_pos_ += count;
ring_begin_ += added;
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true;
}
ring_begin_ += added;
ring_pos_ += count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
return read_state.flip();
}
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
}
void direct_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
count = std::min(ring_pos_, count);
if ((ring_pos_ - count) >= ring_begin_) {
ring_pos_ -= count;
} else {
auto removed = count - (ring_pos_ - ring_begin_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
ring_pos_ -= count;
ring_begin_ = ring_pos_;
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = true;
}
ring_begin_ -= removed;
ring_pos_ -= count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
} }
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset, auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error { data_buffer &data) -> api_error {
if (fsi_.directory) { if (fsi_.directory) {
return api_error::invalid_operation; return api_error::invalid_operation;
} }
@ -64,23 +266,79 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
return api_error::success; return api_error::success;
} }
auto res = provider_.read_file_bytes(fsi_.api_path, read_size, read_offset, auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
data, stop_requested_); read_offset = read_offset - (begin_chunk * chunk_size_);
if (res != api_error::success) {
return res;
}
reset_timeout(); auto res{api_error::success};
if ((utils::time::get_time_now() - last_progress_.load()) >
(2U * utils::time::NANOS_PER_SECOND)) { unique_mutex_lock read_lock(read_mtx_);
last_progress_ = utils::time::get_time_now(); for (std::size_t chunk = begin_chunk;
auto progress = (static_cast<double>(read_offset + read_size) / (res == api_error::success) && (read_size > 0U); ++chunk) {
static_cast<double>(fsi_.size)) * if (chunk > ring_pos_) {
100.0; forward(chunk - ring_pos_);
event_system::instance().raise<download_progress>(fsi_.api_path, "", } else if (chunk < ring_pos_) {
progress); reverse(ring_pos_ - chunk);
}
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (not stop_requested_ &&
res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res;
}
reset_timeout();
auto to_read{
std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
read_size),
};
auto &source_buffer = ring_data_.at(chunk % ring_state_.size());
auto begin = std::next(source_buffer.begin(),
static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
data.insert(data.end(), begin, end);
read_offset = 0U;
} }
return res; return res;
} }
void direct_open_file::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
ring_begin_ = first_chunk;
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
if (current_chunk > ring_end_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), false);
chunk_notify_.notify_all();
}
void direct_open_file::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
} // namespace repertory } // namespace repertory

View File

@ -45,12 +45,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
ring_state_(ring_size), ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>( total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi_.size, chunk_size))) { utils::divide_with_ceiling(fsi_.size, chunk_size))) {
if ((ring_size % 2U) != 0U) { if (ring_size < 5U) {
throw std::runtime_error("ring size must be a multiple of 2"); throw std::runtime_error("ring size must be greater than or equal to 5");
}
if (ring_size < 4U) {
throw std::runtime_error("ring size must be greater than or equal to 4");
} }
if (not can_handle_file(fsi_.size, chunk_size, ring_size)) { if (not can_handle_file(fsi_.size, chunk_size, ring_size)) {
@ -149,7 +145,7 @@ void ring_buffer_open_file::background_reader_thread() {
auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size, auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
std::size_t chunk_size, std::size_t chunk_size,
std::size_t ring_size) -> bool { std::size_t ring_size) -> bool {
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size * 2U); return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
} }
auto ring_buffer_open_file::close() -> bool { auto ring_buffer_open_file::close() -> bool {