Compare commits
2 Commits
cc70aadc03
...
1c2927790b
Author | SHA1 | Date | |
---|---|---|---|
1c2927790b | |||
c6870c0299 |
@ -98,7 +98,7 @@ public:
|
||||
[[nodiscard]] auto get_result() -> api_error;
|
||||
};
|
||||
|
||||
protected:
|
||||
private:
|
||||
std::uint64_t chunk_size_;
|
||||
std::uint8_t chunk_timeout_;
|
||||
filesystem_item fsi_;
|
||||
@ -109,30 +109,59 @@ protected:
|
||||
private:
|
||||
api_error error_{api_error::success};
|
||||
mutable std::mutex error_mtx_;
|
||||
mutable std::recursive_mutex file_mtx_;
|
||||
stop_type io_stop_requested_{false};
|
||||
std::unique_ptr<std::thread> io_thread_;
|
||||
|
||||
protected:
|
||||
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
|
||||
mutable std::recursive_mutex file_mtx_;
|
||||
std::atomic<std::chrono::system_clock::time_point> last_access_{
|
||||
std::chrono::system_clock::now()};
|
||||
bool modified_{false};
|
||||
std::unique_ptr<utils::file::i_file> nf_;
|
||||
mutable std::mutex io_thread_mtx_;
|
||||
std::condition_variable io_thread_notify_;
|
||||
std::deque<std::shared_ptr<io_item>> io_thread_queue_;
|
||||
std::atomic<std::chrono::system_clock::time_point> last_access_{
|
||||
std::chrono::system_clock::now(),
|
||||
};
|
||||
bool modified_{false};
|
||||
bool removed_{false};
|
||||
|
||||
protected:
|
||||
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
|
||||
std::unique_ptr<utils::file::i_file> nf_;
|
||||
|
||||
private:
|
||||
void file_io_thread();
|
||||
|
||||
protected:
|
||||
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
|
||||
|
||||
[[nodiscard]] auto get_mutex() const -> std::recursive_mutex & {
|
||||
return file_mtx_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_last_chunk_size() const -> std::size_t;
|
||||
|
||||
[[nodiscard]] auto get_provider() -> i_provider & { return provider_; }
|
||||
|
||||
[[nodiscard]] auto get_provider() const -> const i_provider & {
|
||||
return provider_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto is_removed() const -> bool;
|
||||
|
||||
void notify_io();
|
||||
|
||||
void reset_timeout();
|
||||
|
||||
auto set_api_error(const api_error &e) -> api_error;
|
||||
auto set_api_error(const api_error &err) -> api_error;
|
||||
|
||||
void set_file_size(std::uint64_t size);
|
||||
|
||||
void set_last_chunk_size(std::size_t size);
|
||||
|
||||
void set_modified(bool modified);
|
||||
|
||||
void set_removed(bool removed);
|
||||
|
||||
void set_source_path(std::string source_path);
|
||||
|
||||
void wait_for_io(stop_type &stop_requested);
|
||||
|
||||
public:
|
||||
void add(std::uint64_t handle, open_file_data ofd) override;
|
||||
@ -171,9 +200,7 @@ public:
|
||||
|
||||
[[nodiscard]] auto get_open_file_count() const -> std::size_t override;
|
||||
|
||||
[[nodiscard]] auto get_source_path() const -> std::string override {
|
||||
return fsi_.source_path;
|
||||
}
|
||||
[[nodiscard]] auto get_source_path() const -> std::string override;
|
||||
|
||||
[[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override;
|
||||
|
||||
|
@ -34,8 +34,8 @@ direct_open_file::direct_open_file(std::uint64_t chunk_size,
|
||||
filesystem_item fsi, i_provider &provider)
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, provider, true),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
|
||||
if (fsi_.size > 0U) {
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size))) {
|
||||
if (fsi.size > 0U) {
|
||||
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
|
||||
|
||||
ring_end_ =
|
||||
@ -56,11 +56,11 @@ direct_open_file::~direct_open_file() {
|
||||
}
|
||||
|
||||
auto direct_open_file::check_start() -> api_error {
|
||||
if (fsi_.size == 0U || reader_thread_) {
|
||||
if (get_file_size() == 0U || reader_thread_) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, "direct");
|
||||
event_system::instance().raise<download_begin>(get_api_path(), "direct");
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
@ -113,15 +113,15 @@ auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
ring_state_[chunk % ring_state_.size()] = false;
|
||||
|
||||
auto &buffer = ring_data_.at(chunk % ring_state_.size());
|
||||
auto data_offset{chunk * chunk_size_};
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
unlock_and_notify();
|
||||
|
||||
auto res{
|
||||
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
|
||||
stop_requested_),
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
chunk_lock.lock();
|
||||
@ -129,7 +129,7 @@ auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(fsi_.api_path, "direct",
|
||||
event_system::instance().raise<download_progress>(get_api_path(), "direct",
|
||||
progress);
|
||||
res = (chunk >= ring_begin_ && chunk <= ring_end_)
|
||||
? res
|
||||
@ -173,13 +173,13 @@ void direct_open_file::forward(std::size_t count) {
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
auto read_state = ring_state_;
|
||||
return read_state.flip();
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return not ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
@ -212,19 +212,20 @@ void direct_open_file::reverse(std::size_t count) {
|
||||
|
||||
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (fsi_.directory) {
|
||||
if (is_directory()) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
|
||||
read_size =
|
||||
utils::calculate_read_size(get_file_size(), read_size, read_offset);
|
||||
if (read_size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
|
||||
read_offset = read_offset - (begin_chunk * chunk_size_);
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
|
||||
read_offset = read_offset - (begin_chunk * get_chunk_size());
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
@ -259,7 +260,7 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
reset_timeout();
|
||||
|
||||
auto to_read{
|
||||
std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
|
||||
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
|
||||
read_size),
|
||||
};
|
||||
|
||||
@ -316,7 +317,7 @@ void direct_open_file::reader_thread() {
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, "direct",
|
||||
event_system::instance().raise<download_end>(get_api_path(), "direct",
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
mgr_(mgr) {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (fsi_.directory) {
|
||||
if (fsi.directory) {
|
||||
if (read_state.has_value()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi.api_path, fsi.source_path,
|
||||
@ -79,7 +79,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
}
|
||||
|
||||
nf_ = utils::file::file::open_or_create_file(fsi.source_path,
|
||||
provider_.is_read_only());
|
||||
get_provider().is_read_only());
|
||||
set_api_error(*nf_ ? api_error::success : api_error::os_error);
|
||||
if (get_api_error() != api_error::success) {
|
||||
return;
|
||||
@ -92,12 +92,12 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
return;
|
||||
}
|
||||
|
||||
if (fsi_.size == 0U) {
|
||||
if (fsi.size == 0U) {
|
||||
return;
|
||||
}
|
||||
|
||||
read_state_.resize(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi_.size, chunk_size)),
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size)),
|
||||
false);
|
||||
|
||||
auto file_size = nf_->size();
|
||||
@ -109,7 +109,7 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
return;
|
||||
}
|
||||
|
||||
if (provider_.is_read_only() || file_size.value() == fsi.size) {
|
||||
if (get_provider().is_read_only() || file_size.value() == fsi.size) {
|
||||
read_state_.set(0U, read_state_.size(), true);
|
||||
allocated = true;
|
||||
}
|
||||
@ -125,12 +125,12 @@ auto open_file::adjust_cache_size(std::uint64_t file_size,
|
||||
bool shrink) -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (file_size == fsi_.size) {
|
||||
if (file_size == get_file_size()) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
if (file_size > fsi_.size) {
|
||||
auto size = file_size - fsi_.size;
|
||||
if (file_size > get_file_size()) {
|
||||
auto size = file_size - get_file_size();
|
||||
auto res = shrink ? cache_size_mgr::instance().shrink(size)
|
||||
: cache_size_mgr::instance().expand(size);
|
||||
if (res == api_error::success) {
|
||||
@ -138,13 +138,13 @@ auto open_file::adjust_cache_size(std::uint64_t file_size,
|
||||
}
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, fsi_.source_path, res,
|
||||
function_name, get_api_path(), get_source_path(), res,
|
||||
fmt::format("failed to {} cache|size|{}",
|
||||
(shrink ? "shrink" : "expand"), size));
|
||||
return set_api_error(res);
|
||||
}
|
||||
|
||||
auto size = fsi_.size - file_size;
|
||||
auto size = get_file_size() - file_size;
|
||||
auto res = shrink ? cache_size_mgr::instance().expand(size)
|
||||
: cache_size_mgr::instance().shrink(size);
|
||||
if (res == api_error::success) {
|
||||
@ -152,7 +152,7 @@ auto open_file::adjust_cache_size(std::uint64_t file_size,
|
||||
}
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, fsi_.source_path, res,
|
||||
function_name, get_api_path(), get_source_path(), res,
|
||||
fmt::format("failed to {} cache|size|{}", (shrink ? "expand" : "shrink"),
|
||||
size));
|
||||
return set_api_error(res);
|
||||
@ -161,7 +161,7 @@ auto open_file::adjust_cache_size(std::uint64_t file_size,
|
||||
auto open_file::check_start() -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
unique_recur_mutex_lock file_lock(get_mutex());
|
||||
if (allocated) {
|
||||
return api_error::success;
|
||||
}
|
||||
@ -169,12 +169,12 @@ auto open_file::check_start() -> api_error {
|
||||
auto file_size = nf_->size();
|
||||
if (not file_size.has_value()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, fsi_.source_path,
|
||||
function_name, get_api_path(), get_source_path(),
|
||||
utils::get_last_error_code(), "failed to get file size");
|
||||
return set_api_error(api_error::os_error);
|
||||
}
|
||||
|
||||
if (file_size.value() == fsi_.size) {
|
||||
if (file_size.value() == get_file_size()) {
|
||||
allocated = true;
|
||||
return api_error::success;
|
||||
}
|
||||
@ -186,11 +186,11 @@ auto open_file::check_start() -> api_error {
|
||||
}
|
||||
file_lock.lock();
|
||||
|
||||
if (not nf_->truncate(fsi_.size)) {
|
||||
if (not nf_->truncate(get_file_size())) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, fsi_.source_path,
|
||||
function_name, get_api_path(), get_source_path(),
|
||||
utils::get_last_error_code(),
|
||||
fmt::format("failed to truncate file|size|{}", fsi_.size));
|
||||
fmt::format("failed to truncate file|size|{}", get_file_size()));
|
||||
return set_api_error(res);
|
||||
}
|
||||
|
||||
@ -201,15 +201,13 @@ auto open_file::check_start() -> api_error {
|
||||
auto open_file::close() -> bool {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (fsi_.directory || stop_requested_) {
|
||||
if (is_directory() || stop_requested_) {
|
||||
return false;
|
||||
}
|
||||
|
||||
stop_requested_ = true;
|
||||
|
||||
unique_mutex_lock reader_lock(io_thread_mtx_);
|
||||
io_thread_notify_.notify_all();
|
||||
reader_lock.unlock();
|
||||
notify_io();
|
||||
|
||||
if (reader_thread_) {
|
||||
reader_thread_->join();
|
||||
@ -224,9 +222,10 @@ auto open_file::close() -> bool {
|
||||
auto err = get_api_error();
|
||||
if (err == api_error::success || err == api_error::download_incomplete ||
|
||||
err == api_error::download_stopped) {
|
||||
if (modified_ && not read_state.all()) {
|
||||
if (is_modified() && not read_state.all()) {
|
||||
set_api_error(api_error::download_incomplete);
|
||||
} else if (not modified_ && (fsi_.size > 0U) && not read_state.all()) {
|
||||
} else if (not is_modified() && (get_file_size() > 0U) &&
|
||||
not read_state.all()) {
|
||||
set_api_error(api_error::download_stopped);
|
||||
}
|
||||
|
||||
@ -235,7 +234,7 @@ auto open_file::close() -> bool {
|
||||
|
||||
nf_->close();
|
||||
|
||||
if (modified_) {
|
||||
if (is_modified()) {
|
||||
if (err == api_error::success) {
|
||||
mgr_.queue_upload(*this);
|
||||
return true;
|
||||
@ -248,24 +247,24 @@ auto open_file::close() -> bool {
|
||||
}
|
||||
|
||||
if (err != api_error::success || read_state.all()) {
|
||||
mgr_.remove_resume(fsi_.api_path, get_source_path());
|
||||
mgr_.remove_resume(get_api_path(), get_source_path());
|
||||
}
|
||||
|
||||
if (err == api_error::success) {
|
||||
return true;
|
||||
}
|
||||
|
||||
file_manager::remove_source_and_shrink_cache(fsi_.api_path, fsi_.source_path,
|
||||
fsi_.size, allocated);
|
||||
file_manager::remove_source_and_shrink_cache(
|
||||
get_api_path(), get_source_path(), get_file_size(), allocated);
|
||||
|
||||
auto parent = utils::path::get_parent_path(fsi_.source_path);
|
||||
fsi_.source_path =
|
||||
utils::path::combine(parent, {utils::create_uuid_string()});
|
||||
auto res =
|
||||
provider_.set_item_meta(fsi_.api_path, META_SOURCE, fsi_.source_path);
|
||||
auto parent = utils::path::get_parent_path(get_source_path());
|
||||
set_source_path(utils::path::combine(parent, {utils::create_uuid_string()}));
|
||||
|
||||
auto res = get_provider().set_item_meta(get_api_path(), META_SOURCE,
|
||||
get_source_path());
|
||||
if (res != api_error::success) {
|
||||
utils::error::raise_api_path_error(function_name, fsi_.api_path,
|
||||
fsi_.source_path, res,
|
||||
utils::error::raise_api_path_error(function_name, get_api_path(),
|
||||
get_source_path(), res,
|
||||
"failed to set new source path");
|
||||
}
|
||||
|
||||
@ -294,12 +293,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
return;
|
||||
}
|
||||
|
||||
auto data_offset = chunk * chunk_size_;
|
||||
auto data_size =
|
||||
(chunk == read_state.size() - 1U) ? last_chunk_size_ : chunk_size_;
|
||||
auto data_offset = chunk * get_chunk_size();
|
||||
auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size()
|
||||
: get_chunk_size();
|
||||
if (active_downloads_.empty() && (read_state.count() == 0U)) {
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path,
|
||||
fsi_.source_path);
|
||||
event_system::instance().raise<download_begin>(get_api_path(),
|
||||
get_source_path());
|
||||
}
|
||||
|
||||
active_downloads_[chunk] = std::make_shared<download>();
|
||||
@ -322,16 +321,16 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
static_cast<double>(state.size())) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(
|
||||
fsi_.api_path, fsi_.source_path, progress);
|
||||
get_api_path(), get_source_path(), progress);
|
||||
if (state.all() && not notified_) {
|
||||
notified_ = true;
|
||||
event_system::instance().raise<download_end>(
|
||||
fsi_.api_path, fsi_.source_path, get_api_error());
|
||||
get_api_path(), get_source_path(), get_api_error());
|
||||
}
|
||||
} else if (not notified_) {
|
||||
notified_ = true;
|
||||
event_system::instance().raise<download_end>(
|
||||
fsi_.api_path, fsi_.source_path, get_api_error());
|
||||
get_api_path(), get_source_path(), get_api_error());
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
@ -343,7 +342,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
};
|
||||
|
||||
data_buffer buffer;
|
||||
auto res = provider_.read_file_bytes(
|
||||
auto res = get_provider().read_file_bytes(
|
||||
get_api_path(), data_size, data_offset, buffer, stop_requested_);
|
||||
if (res != api_error::success) {
|
||||
set_api_error(res);
|
||||
@ -389,12 +388,12 @@ void open_file::download_range(std::size_t begin_chunk, std::size_t end_chunk,
|
||||
}
|
||||
|
||||
auto open_file::get_allocated() const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return allocated;
|
||||
}
|
||||
|
||||
auto open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return read_state_;
|
||||
}
|
||||
|
||||
@ -424,7 +423,7 @@ auto open_file::native_operation(
|
||||
i_open_file::native_operation_callback callback) -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (fsi_.directory) {
|
||||
if (is_directory()) {
|
||||
return set_api_error(api_error::invalid_operation);
|
||||
}
|
||||
|
||||
@ -446,7 +445,7 @@ auto open_file::native_operation(
|
||||
auto last_chunk = is_empty_file
|
||||
? std::size_t(0U)
|
||||
: static_cast<std::size_t>(utils::divide_with_ceiling(
|
||||
new_file_size, chunk_size_)) -
|
||||
new_file_size, get_chunk_size())) -
|
||||
1U;
|
||||
|
||||
unique_recur_mutex_lock rw_lock(rw_mtx_);
|
||||
@ -503,10 +502,11 @@ auto open_file::native_operation(
|
||||
}
|
||||
set_read_state(read_state);
|
||||
|
||||
last_chunk_size_ = static_cast<std::size_t>(
|
||||
new_file_size <= chunk_size_ ? new_file_size
|
||||
: (new_file_size % chunk_size_) == 0U ? chunk_size_
|
||||
: new_file_size % chunk_size_);
|
||||
set_last_chunk_size(static_cast<std::size_t>(
|
||||
new_file_size <= get_chunk_size() ? new_file_size
|
||||
: (new_file_size % get_chunk_size()) == 0U
|
||||
? get_chunk_size()
|
||||
: new_file_size % get_chunk_size()));
|
||||
}
|
||||
|
||||
if (original_file_size == new_file_size) {
|
||||
@ -514,15 +514,15 @@ auto open_file::native_operation(
|
||||
}
|
||||
set_modified();
|
||||
|
||||
fsi_.size = new_file_size;
|
||||
set_file_size(new_file_size);
|
||||
auto now = std::to_string(utils::time::get_time_now());
|
||||
res = provider_.set_item_meta(fsi_.api_path,
|
||||
{
|
||||
{META_CHANGED, now},
|
||||
{META_MODIFIED, now},
|
||||
{META_SIZE, std::to_string(new_file_size)},
|
||||
{META_WRITTEN, now},
|
||||
});
|
||||
res = get_provider().set_item_meta(
|
||||
get_api_path(), {
|
||||
{META_CHANGED, now},
|
||||
{META_MODIFIED, now},
|
||||
{META_SIZE, std::to_string(new_file_size)},
|
||||
{META_WRITTEN, now},
|
||||
});
|
||||
if (res == api_error::success) {
|
||||
return res;
|
||||
}
|
||||
@ -534,7 +534,7 @@ auto open_file::native_operation(
|
||||
|
||||
auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (fsi_.directory) {
|
||||
if (is_directory()) {
|
||||
return set_api_error(api_error::invalid_operation);
|
||||
}
|
||||
|
||||
@ -556,9 +556,9 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
const auto read_from_source = [this, &data, &read_offset,
|
||||
&read_size]() -> api_error {
|
||||
return do_io([this, &data, &read_offset, &read_size]() -> api_error {
|
||||
if (provider_.is_read_only()) {
|
||||
return provider_.read_file_bytes(fsi_.api_path, read_size, read_offset,
|
||||
data, stop_requested_);
|
||||
if (get_provider().is_read_only()) {
|
||||
return get_provider().read_file_bytes(
|
||||
get_api_path(), read_size, read_offset, data, stop_requested_);
|
||||
}
|
||||
|
||||
data.resize(read_size);
|
||||
@ -574,9 +574,9 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
return read_from_source();
|
||||
}
|
||||
|
||||
auto begin_chunk = static_cast<std::size_t>(read_offset / chunk_size_);
|
||||
auto begin_chunk = static_cast<std::size_t>(read_offset / get_chunk_size());
|
||||
auto end_chunk =
|
||||
static_cast<std::size_t>((read_size + read_offset) / chunk_size_);
|
||||
static_cast<std::size_t>((read_size + read_offset) / get_chunk_size());
|
||||
|
||||
update_background_reader(begin_chunk);
|
||||
|
||||
@ -594,14 +594,14 @@ void open_file::remove(std::uint64_t handle) {
|
||||
open_file_base::remove(handle);
|
||||
|
||||
recur_mutex_lock rw_lock(rw_mtx_);
|
||||
if (modified_ && get_read_state().all() &&
|
||||
if (is_modified() && get_read_state().all() &&
|
||||
(get_api_error() == api_error::success)) {
|
||||
mgr_.queue_upload(*this);
|
||||
modified_ = false;
|
||||
open_file_base::set_modified(false);
|
||||
}
|
||||
|
||||
if (removed_ && (get_open_file_count() == 0U)) {
|
||||
removed_ = false;
|
||||
if (is_removed() && (get_open_file_count() == 0U)) {
|
||||
open_file_base::set_removed(false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -609,8 +609,8 @@ void open_file::remove_all() {
|
||||
open_file_base::remove_all();
|
||||
|
||||
recur_mutex_lock rw_lock(rw_mtx_);
|
||||
modified_ = false;
|
||||
removed_ = true;
|
||||
open_file_base::set_modified(false);
|
||||
open_file_base::set_removed(true);
|
||||
|
||||
mgr_.remove_upload(get_api_path());
|
||||
|
||||
@ -618,11 +618,11 @@ void open_file::remove_all() {
|
||||
}
|
||||
|
||||
auto open_file::resize(std::uint64_t new_file_size) -> api_error {
|
||||
if (fsi_.directory) {
|
||||
if (is_directory()) {
|
||||
return set_api_error(api_error::invalid_operation);
|
||||
}
|
||||
|
||||
if (new_file_size == fsi_.size) {
|
||||
if (new_file_size == get_file_size()) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
@ -634,24 +634,24 @@ auto open_file::resize(std::uint64_t new_file_size) -> api_error {
|
||||
}
|
||||
|
||||
void open_file::set_modified() {
|
||||
if (not modified_) {
|
||||
modified_ = true;
|
||||
if (not is_modified()) {
|
||||
open_file_base::set_modified(true);
|
||||
mgr_.store_resume(*this);
|
||||
}
|
||||
|
||||
if (not removed_) {
|
||||
removed_ = true;
|
||||
if (not is_removed()) {
|
||||
open_file_base::set_removed(true);
|
||||
mgr_.remove_upload(get_api_path());
|
||||
}
|
||||
}
|
||||
|
||||
void open_file::set_read_state(std::size_t chunk) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
read_state_.set(chunk);
|
||||
}
|
||||
|
||||
void open_file::set_read_state(boost::dynamic_bitset<> read_state) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
read_state_ = std::move(read_state);
|
||||
}
|
||||
|
||||
@ -668,15 +668,9 @@ void open_file::update_background_reader(std::size_t read_chunk) {
|
||||
while (not stop_requested_) {
|
||||
unique_recur_mutex_lock lock(rw_mtx_);
|
||||
auto read_state = get_read_state();
|
||||
if ((fsi_.size == 0U) || read_state.all()) {
|
||||
if ((get_file_size() == 0U) || read_state.all()) {
|
||||
lock.unlock();
|
||||
|
||||
unique_mutex_lock io_lock(io_thread_mtx_);
|
||||
if (not stop_requested_ && io_thread_queue_.empty()) {
|
||||
io_thread_notify_.wait(io_lock);
|
||||
}
|
||||
io_thread_notify_.notify_all();
|
||||
io_lock.unlock();
|
||||
wait_for_io(stop_requested_);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -699,7 +693,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
|
||||
bytes_written = 0U;
|
||||
|
||||
if (fsi_.directory || provider_.is_read_only()) {
|
||||
if (is_directory() || get_provider().is_read_only()) {
|
||||
return set_api_error(api_error::invalid_operation);
|
||||
}
|
||||
|
||||
@ -716,9 +710,9 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
return res;
|
||||
}
|
||||
|
||||
auto begin_chunk = static_cast<std::size_t>(write_offset / chunk_size_);
|
||||
auto begin_chunk = static_cast<std::size_t>(write_offset / get_chunk_size());
|
||||
auto end_chunk =
|
||||
static_cast<std::size_t>((write_offset + data.size()) / chunk_size_);
|
||||
static_cast<std::size_t>((write_offset + data.size()) / get_chunk_size());
|
||||
|
||||
update_background_reader(begin_chunk);
|
||||
|
||||
@ -729,7 +723,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock rw_lock(rw_mtx_);
|
||||
if ((write_offset + data.size()) > fsi_.size) {
|
||||
if ((write_offset + data.size()) > get_file_size()) {
|
||||
res = resize(write_offset + data.size());
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
@ -749,11 +743,11 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
}
|
||||
|
||||
auto now = std::to_string(utils::time::get_time_now());
|
||||
res = provider_.set_item_meta(fsi_.api_path, {
|
||||
{META_CHANGED, now},
|
||||
{META_MODIFIED, now},
|
||||
{META_WRITTEN, now},
|
||||
});
|
||||
res = get_provider().set_item_meta(get_api_path(), {
|
||||
{META_CHANGED, now},
|
||||
{META_MODIFIED, now},
|
||||
{META_WRITTEN, now},
|
||||
});
|
||||
if (res != api_error::success) {
|
||||
utils::error::raise_api_path_error(function_name, get_api_path(), res,
|
||||
"failed to set file meta");
|
||||
|
@ -133,6 +133,24 @@ auto open_file_base::can_close() const -> bool {
|
||||
return (duration.count() >= chunk_timeout_);
|
||||
}
|
||||
|
||||
auto open_file_base::close() -> bool {
|
||||
unique_mutex_lock io_lock(io_thread_mtx_);
|
||||
if (io_stop_requested_ || not io_thread_) {
|
||||
io_thread_notify_.notify_all();
|
||||
io_lock.unlock();
|
||||
return false;
|
||||
}
|
||||
|
||||
io_stop_requested_ = true;
|
||||
io_thread_notify_.notify_all();
|
||||
io_lock.unlock();
|
||||
|
||||
io_thread_->join();
|
||||
io_thread_.reset();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
auto open_file_base::do_io(std::function<api_error()> action) -> api_error {
|
||||
unique_mutex_lock io_lock(io_thread_mtx_);
|
||||
auto item = std::make_shared<io_item>(action);
|
||||
@ -191,6 +209,36 @@ auto open_file_base::get_file_size() const -> std::uint64_t {
|
||||
return fsi_.size;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto open_file_base::get_last_chunk_size() const -> std::size_t {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return last_chunk_size_;
|
||||
}
|
||||
|
||||
void open_file_base::set_file_size(std::uint64_t size) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
fsi_.size = size;
|
||||
}
|
||||
|
||||
void open_file_base::set_last_chunk_size(std::size_t size) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
last_chunk_size_ = size;
|
||||
}
|
||||
|
||||
void open_file_base::set_modified(bool modified) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
modified_ = modified;
|
||||
}
|
||||
|
||||
void open_file_base::set_removed(bool removed) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
removed_ = removed;
|
||||
}
|
||||
|
||||
void open_file_base::set_source_path(std::string source_path) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
fsi_.source_path = std::move(source_path);
|
||||
}
|
||||
|
||||
auto open_file_base::get_filesystem_item() const -> filesystem_item {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return fsi_;
|
||||
@ -235,6 +283,11 @@ auto open_file_base::get_open_file_count() const -> std::size_t {
|
||||
return open_data_.size();
|
||||
}
|
||||
|
||||
auto open_file_base::get_source_path() const -> std::string {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return fsi_.source_path;
|
||||
}
|
||||
|
||||
auto open_file_base::has_handle(std::uint64_t handle) const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return open_data_.find(handle) != open_data_.end();
|
||||
@ -245,6 +298,16 @@ auto open_file_base::is_modified() const -> bool {
|
||||
return modified_;
|
||||
}
|
||||
|
||||
auto open_file_base::is_removed() const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return removed_;
|
||||
}
|
||||
|
||||
void open_file_base::notify_io() {
|
||||
mutex_lock io_lock(io_thread_mtx_);
|
||||
io_thread_notify_.notify_all();
|
||||
}
|
||||
|
||||
void open_file_base::remove(std::uint64_t handle) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
if (open_data_.find(handle) == open_data_.end()) {
|
||||
@ -303,21 +366,12 @@ void open_file_base::set_api_path(const std::string &api_path) {
|
||||
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
|
||||
}
|
||||
|
||||
auto open_file_base::close() -> bool {
|
||||
void open_file_base::wait_for_io(stop_type &stop_requested) {
|
||||
unique_mutex_lock io_lock(io_thread_mtx_);
|
||||
if (io_stop_requested_ || not io_thread_) {
|
||||
io_thread_notify_.notify_all();
|
||||
io_lock.unlock();
|
||||
return false;
|
||||
if (not stop_requested && io_thread_queue_.empty()) {
|
||||
io_thread_notify_.wait(io_lock);
|
||||
}
|
||||
|
||||
io_stop_requested_ = true;
|
||||
io_thread_notify_.notify_all();
|
||||
io_lock.unlock();
|
||||
|
||||
io_thread_->join();
|
||||
io_thread_.reset();
|
||||
|
||||
return true;
|
||||
}
|
||||
} // namespace repertory
|
||||
|
@ -45,12 +45,12 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
source_path_(utils::path::combine(buffer_directory,
|
||||
{utils::create_uuid_string()})),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size))) {
|
||||
if (ring_size < 5U) {
|
||||
throw std::runtime_error("ring size must be greater than or equal to 5");
|
||||
}
|
||||
|
||||
if (not can_handle_file(fsi_.size, chunk_size, ring_size)) {
|
||||
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
|
||||
throw std::runtime_error("file size is less than ring buffer size");
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
|
||||
if (not utils::file::file(source_path_).remove()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
function_name, get_api_path(), source_path_,
|
||||
utils::get_last_error_code(), "failed to delete file");
|
||||
}
|
||||
}
|
||||
@ -96,7 +96,7 @@ auto ring_buffer_open_file::check_start() -> api_error {
|
||||
auto buffer_directory{utils::path::get_parent_path(source_path_)};
|
||||
if (not utils::file::directory(buffer_directory).create_directory()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to create buffer directory|path|{}|err|{}",
|
||||
buffer_directory, utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
@ -105,24 +105,24 @@ auto ring_buffer_open_file::check_start() -> api_error {
|
||||
nf_ = utils::file::file::open_or_create_file(source_path_);
|
||||
if (not nf_ || not *nf_) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to create buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
if (not nf_->truncate(ring_state_.size() * chunk_size_)) {
|
||||
if (not nf_->truncate(ring_state_.size() * get_chunk_size())) {
|
||||
nf_->close();
|
||||
nf_.reset();
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to resize buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, source_path_);
|
||||
event_system::instance().raise<download_begin>(get_api_path(), source_path_);
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
@ -176,14 +176,14 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
unlock_and_notify();
|
||||
|
||||
data_buffer buffer;
|
||||
auto data_offset{chunk * chunk_size_};
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
|
||||
auto res{
|
||||
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
|
||||
stop_requested_),
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
chunk_lock.lock();
|
||||
@ -191,20 +191,21 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(fsi_.api_path,
|
||||
event_system::instance().raise<download_progress>(get_api_path(),
|
||||
source_path_, progress);
|
||||
res = (chunk >= ring_begin_ && chunk <= ring_end_)
|
||||
? do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer,
|
||||
(chunk % ring_state_.size()) * chunk_size_,
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
}
|
||||
res =
|
||||
(chunk >= ring_begin_ && chunk <= ring_end_)
|
||||
? do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer,
|
||||
(chunk % ring_state_.size()) * get_chunk_size(),
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
return api_error::os_error;
|
||||
})
|
||||
: api_error::invalid_ring_buffer_position;
|
||||
return api_error::os_error;
|
||||
})
|
||||
: api_error::invalid_ring_buffer_position;
|
||||
}
|
||||
|
||||
active_downloads_.erase(chunk);
|
||||
@ -244,13 +245,13 @@ void ring_buffer_open_file::forward(std::size_t count) {
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
auto read_state = ring_state_;
|
||||
return read_state.flip();
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return not ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
@ -289,19 +290,20 @@ void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (fsi_.directory) {
|
||||
if (is_directory()) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
|
||||
read_size =
|
||||
utils::calculate_read_size(get_file_size(), read_size, read_offset);
|
||||
if (read_size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
|
||||
read_offset = read_offset - (begin_chunk * chunk_size_);
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
|
||||
read_offset = read_offset - (begin_chunk * get_chunk_size());
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
@ -336,7 +338,7 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
reset_timeout();
|
||||
|
||||
auto to_read{
|
||||
std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
|
||||
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
|
||||
read_size),
|
||||
};
|
||||
|
||||
@ -344,12 +346,13 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
data_buffer buffer(to_read);
|
||||
|
||||
std::size_t bytes_read{};
|
||||
auto result = nf_->read(buffer,
|
||||
(((chunk % ring_state_.size()) * chunk_size_) +
|
||||
read_offset),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
auto result =
|
||||
nf_->read(
|
||||
buffer,
|
||||
(((chunk % ring_state_.size()) * get_chunk_size()) + read_offset),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
|
||||
if (result != api_error::success) {
|
||||
return result;
|
||||
@ -409,7 +412,7 @@ void ring_buffer_open_file::reader_thread() {
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, source_path_,
|
||||
event_system::instance().raise<download_end>(get_api_path(), source_path_,
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user