This commit is contained in:
2024-12-27 20:56:55 -06:00
parent 7b98f26d34
commit ecd24784aa
7 changed files with 34 additions and 26 deletions

View File

@ -68,6 +68,7 @@ private:
private:
bool allocated{false};
std::unique_ptr<utils::file::i_file> nf_;
bool notified_{false};
std::size_t read_chunk_{};
boost::dynamic_bitset<> read_state_;

View File

@ -107,6 +107,7 @@ private:
i_provider &provider_;
private:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
mutable std::recursive_mutex file_mtx_;
@ -121,16 +122,17 @@ private:
bool modified_{false};
bool removed_{false};
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
std::unique_ptr<utils::file::i_file> nf_;
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
[[nodiscard]] auto get_active_downloads()
-> std::unordered_map<std::size_t, std::shared_ptr<download>> & {
return active_downloads_;
}
[[nodiscard]] auto get_mutex() const -> std::recursive_mutex & {
return file_mtx_;
}

View File

@ -50,6 +50,9 @@ public:
private:
std::string source_path_;
private:
std::unique_ptr<utils::file::i_file> nf_;
protected:
[[nodiscard]] auto handle_read_buffer(
std::size_t chunk,
@ -67,6 +70,9 @@ protected:
-> api_error override;
public:
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto get_source_path() const -> std::string override {
return source_path_;
}

View File

@ -128,8 +128,10 @@ public:
return false;
}
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)

View File

@ -28,13 +28,10 @@
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp"
#include "utils/time.hpp"
#include "utils/utils.hpp"
namespace repertory {
open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
@ -281,12 +278,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto read_state = get_read_state();
if ((get_api_error() == api_error::success) && (chunk < read_state.size()) &&
not read_state[chunk]) {
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return;
}
auto active_download = active_downloads_.at(chunk);
auto active_download = get_active_downloads().at(chunk);
rw_lock.unlock();
active_download->wait();
@ -296,12 +293,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto data_offset = chunk * get_chunk_size();
auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size()
: get_chunk_size();
if (active_downloads_.empty() && (read_state.count() == 0U)) {
if (get_active_downloads().empty() && (read_state.count() == 0U)) {
event_system::instance().raise<download_begin>(get_api_path(),
get_source_path());
}
active_downloads_[chunk] = std::make_shared<download>();
get_active_downloads()[chunk] = std::make_shared<download>();
rw_lock.unlock();
if (should_reset) {
@ -314,8 +311,8 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto state = get_read_state();
unique_recur_mutex_lock lock(rw_mtx_);
auto active_download = active_downloads_.at(chunk);
active_downloads_.erase(chunk);
auto active_download = get_active_downloads().at(chunk);
get_active_downloads().erase(chunk);
if (get_api_error() == api_error::success) {
auto progress = (static_cast<double>(state.count()) /
static_cast<double>(state.size())) *
@ -677,8 +674,8 @@ void open_file::update_background_reader(std::size_t read_chunk) {
do {
next_chunk = read_chunk_ =
((read_chunk_ + 1U) >= read_state.size()) ? 0U : read_chunk_ + 1U;
} while ((next_chunk != 0U) &&
(active_downloads_.find(next_chunk) != active_downloads_.end()));
} while ((next_chunk != 0U) && (get_active_downloads().find(next_chunk) !=
get_active_downloads().end()));
lock.unlock();

View File

@ -110,6 +110,11 @@ auto ring_buffer_open_file::on_chunk_downloaded(
});
}
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_buffer_open_file::use_buffer(
std::size_t chunk,
std::function<api_error(const data_buffer &data)> func) -> api_error {

View File

@ -122,12 +122,12 @@ auto ring_file_base::download_chunk(std::size_t chunk,
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = active_downloads_.at(chunk);
auto active_download = get_active_downloads().at(chunk);
unlock_and_notify();
return active_download->wait();
@ -138,7 +138,7 @@ auto ring_file_base::download_chunk(std::size_t chunk,
}
auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download;
get_active_downloads()[chunk] = active_download;
auto res = handle_read_buffer(chunk, [&](auto &&buffer) {
auto data_offset{chunk * get_chunk_size()};
@ -169,7 +169,7 @@ auto ring_file_base::download_chunk(std::size_t chunk,
return api_error::success;
});
active_downloads_.erase(chunk);
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(res);
@ -195,11 +195,6 @@ auto ring_file_base::get_read_state_size() const -> std::size_t {
return ring_state_.size();
}
auto ring_file_base::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) {