Compare commits
2 Commits
2a80d4053c
...
66df44a8dd
Author | SHA1 | Date | |
---|---|---|---|
66df44a8dd | |||
7ec7544995 |
@ -50,7 +50,7 @@ private:
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
public:
|
||||
[[nodiscard]] auto expand(std::uint64_t size) -> api_error;
|
||||
[[nodiscard]] auto expand(std::uint64_t size, bool should_wait) -> api_error;
|
||||
|
||||
void initialize(app_config *cfg);
|
||||
|
||||
|
@ -72,6 +72,7 @@ private:
|
||||
boost::dynamic_bitset<> read_state_;
|
||||
std::unique_ptr<std::thread> reader_thread_;
|
||||
std::unique_ptr<std::thread> download_thread_;
|
||||
mutable std::recursive_mutex rw_mtx_;
|
||||
stop_type stop_requested_ = false;
|
||||
|
||||
private:
|
||||
@ -102,12 +103,12 @@ public:
|
||||
return true;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto
|
||||
native_operation(native_operation_callback callback) -> api_error override;
|
||||
[[nodiscard]] auto native_operation(native_operation_callback callback)
|
||||
-> api_error override;
|
||||
|
||||
[[nodiscard]] auto
|
||||
native_operation(std::uint64_t new_file_size,
|
||||
native_operation_callback callback) -> api_error override;
|
||||
[[nodiscard]] auto native_operation(std::uint64_t new_file_size,
|
||||
native_operation_callback callback)
|
||||
-> api_error override;
|
||||
|
||||
void remove(std::uint64_t handle) override;
|
||||
|
||||
|
@ -42,7 +42,7 @@ E_SIMPLE2(max_cache_size_reached, warn, true,
|
||||
|
||||
cache_size_mgr cache_size_mgr::instance_{};
|
||||
|
||||
auto cache_size_mgr::expand(std::uint64_t size) -> api_error {
|
||||
auto cache_size_mgr::expand(std::uint64_t size, bool should_wait) -> api_error {
|
||||
if (size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
@ -56,16 +56,21 @@ auto cache_size_mgr::expand(std::uint64_t size) -> api_error {
|
||||
|
||||
auto max_cache_size = cfg_->get_max_cache_size_bytes();
|
||||
|
||||
while (not stop_requested_ && cache_size_ > max_cache_size) {
|
||||
auto cache_dir = utils::file::directory{cfg_->get_cache_directory()};
|
||||
while (not stop_requested_ && cache_size_ > max_cache_size &&
|
||||
cache_dir.count() > 1U) {
|
||||
event_system::instance().raise<max_cache_size_reached>(cache_size_,
|
||||
max_cache_size);
|
||||
notify_.notify_all();
|
||||
if (not should_wait) {
|
||||
break;
|
||||
}
|
||||
|
||||
notify_.wait(lock);
|
||||
}
|
||||
|
||||
notify_.notify_all();
|
||||
|
||||
return stop_requested_ ? api_error::error : api_error::success;
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
void cache_size_mgr::initialize(app_config *cfg) {
|
||||
@ -104,7 +109,7 @@ auto cache_size_mgr::shrink(std::uint64_t size) -> api_error {
|
||||
|
||||
notify_.notify_all();
|
||||
|
||||
return stop_requested_ ? api_error::error : api_error::success;
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto cache_size_mgr::size() const -> std::uint64_t {
|
||||
|
@ -64,12 +64,15 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
i_upload_manager &mgr)
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, open_data, provider),
|
||||
mgr_(mgr) {
|
||||
if (fsi_.directory && read_state.has_value()) {
|
||||
throw startup_exception(
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (fsi_.directory) {
|
||||
if (read_state.has_value()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi.api_path, fsi.source_path,
|
||||
fmt::format("cannot resume a directory|sp|", fsi.api_path));
|
||||
}
|
||||
|
||||
if (fsi.directory) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -88,16 +91,15 @@ open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
utils::divide_with_ceiling(fsi_.size, chunk_size)),
|
||||
false);
|
||||
|
||||
auto file_size = nf_->size();
|
||||
if (provider_.is_read_only() || file_size.value() == fsi.size) {
|
||||
auto file_size = nf_->size().value_or(0U);
|
||||
if (provider_.is_read_only() || file_size == fsi.size) {
|
||||
read_state_.set(0U, read_state_.size(), true);
|
||||
} else if (nf_->truncate(fsi.size)) {
|
||||
if (file_size.value() > fsi.size) {
|
||||
set_api_error(
|
||||
cache_size_mgr::instance().shrink(file_size.value() - fsi.size));
|
||||
if (file_size > fsi.size) {
|
||||
set_api_error(cache_size_mgr::instance().shrink(file_size - fsi.size));
|
||||
} else {
|
||||
set_api_error(
|
||||
cache_size_mgr::instance().expand(fsi.size - file_size.value()));
|
||||
cache_size_mgr::instance().expand(fsi.size - file_size, false));
|
||||
}
|
||||
} else {
|
||||
set_api_error(api_error::os_error);
|
||||
@ -202,7 +204,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
reset_timeout();
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock download_lock(file_mtx_);
|
||||
unique_recur_mutex_lock file_lock(rw_mtx_);
|
||||
if ((get_api_error() == api_error::success) && (chunk < read_state_.size()) &&
|
||||
not read_state_[chunk]) {
|
||||
if (active_downloads_.find(chunk) != active_downloads_.end()) {
|
||||
@ -211,7 +213,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
}
|
||||
|
||||
auto active_download = active_downloads_.at(chunk);
|
||||
download_lock.unlock();
|
||||
file_lock.unlock();
|
||||
|
||||
active_download->wait();
|
||||
return;
|
||||
@ -229,7 +231,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
read_state_.count());
|
||||
|
||||
active_downloads_[chunk] = std::make_shared<download>();
|
||||
download_lock.unlock();
|
||||
file_lock.unlock();
|
||||
|
||||
if (should_reset) {
|
||||
reset_timeout();
|
||||
@ -238,7 +240,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
std::async(std::launch::async, [this, chunk, data_size, data_offset,
|
||||
should_reset]() {
|
||||
const auto notify_complete = [this, chunk, should_reset]() {
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
unique_recur_mutex_lock lock(rw_mtx_);
|
||||
auto active_download = active_downloads_.at(chunk);
|
||||
active_downloads_.erase(chunk);
|
||||
event_system::instance().raise<download_chunk_end>(
|
||||
@ -260,7 +262,7 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
event_system::instance().raise<download_end>(
|
||||
fsi_.api_path, fsi_.source_path, get_api_error());
|
||||
}
|
||||
file_lock.unlock();
|
||||
lock.unlock();
|
||||
|
||||
active_download->notify(get_api_error());
|
||||
|
||||
@ -299,9 +301,9 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
|
||||
return;
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
unique_recur_mutex_lock lock(rw_mtx_);
|
||||
read_state_.set(chunk);
|
||||
file_lock.unlock();
|
||||
lock.unlock();
|
||||
|
||||
notify_complete();
|
||||
}).wait();
|
||||
@ -318,28 +320,26 @@ void open_file::download_range(std::size_t start_chunk, std::size_t end_chunk,
|
||||
}
|
||||
|
||||
auto open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(rw_mtx_);
|
||||
return read_state_;
|
||||
}
|
||||
|
||||
auto open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
return read_state_[chunk];
|
||||
return get_read_state()[chunk];
|
||||
}
|
||||
|
||||
auto open_file::is_complete() const -> bool {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(rw_mtx_);
|
||||
return read_state_.all();
|
||||
}
|
||||
|
||||
auto open_file::native_operation(
|
||||
i_open_file::native_operation_callback callback) -> api_error {
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
if (stop_requested_) {
|
||||
return api_error::download_stopped;
|
||||
}
|
||||
file_lock.unlock();
|
||||
|
||||
unique_recur_mutex_lock file_lock(rw_mtx_);
|
||||
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
|
||||
}
|
||||
|
||||
@ -352,11 +352,9 @@ auto open_file::native_operation(
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
if (stop_requested_) {
|
||||
return api_error::download_stopped;
|
||||
}
|
||||
file_lock.unlock();
|
||||
|
||||
auto is_empty_file = new_file_size == 0U;
|
||||
auto last_chunk = is_empty_file
|
||||
@ -365,7 +363,7 @@ auto open_file::native_operation(
|
||||
new_file_size, chunk_size_)) -
|
||||
1U;
|
||||
|
||||
file_lock.lock();
|
||||
unique_recur_mutex_lock file_lock(rw_mtx_);
|
||||
if (not is_empty_file && (last_chunk < read_state_.size())) {
|
||||
file_lock.unlock();
|
||||
update_background_reader(0U);
|
||||
@ -465,12 +463,10 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
});
|
||||
};
|
||||
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
if (read_state_.all()) {
|
||||
reset_timeout();
|
||||
return read_from_source();
|
||||
}
|
||||
file_lock.unlock();
|
||||
|
||||
auto start_chunk = static_cast<std::size_t>(read_offset / chunk_size_);
|
||||
auto end_chunk =
|
||||
@ -483,14 +479,15 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
return get_api_error();
|
||||
}
|
||||
|
||||
file_lock.lock();
|
||||
unique_recur_mutex_lock file_lock(rw_mtx_);
|
||||
return get_api_error() == api_error::success ? read_from_source()
|
||||
: get_api_error();
|
||||
}
|
||||
|
||||
void open_file::remove(std::uint64_t handle) {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
open_file_base::remove(handle);
|
||||
|
||||
recur_mutex_lock file_lock(rw_mtx_);
|
||||
if (modified_ && read_state_.all() &&
|
||||
(get_api_error() == api_error::success)) {
|
||||
mgr_.queue_upload(*this);
|
||||
@ -503,9 +500,9 @@ void open_file::remove(std::uint64_t handle) {
|
||||
}
|
||||
|
||||
void open_file::remove_all() {
|
||||
recur_mutex_lock file_lock(file_mtx_);
|
||||
open_file_base::remove_all();
|
||||
|
||||
recur_mutex_lock file_lock(rw_mtx_);
|
||||
modified_ = false;
|
||||
removed_ = true;
|
||||
|
||||
@ -524,7 +521,8 @@ auto open_file::resize(std::uint64_t new_file_size) -> api_error {
|
||||
}
|
||||
|
||||
if (new_file_size > fsi_.size) {
|
||||
auto res = cache_size_mgr::instance().expand(new_file_size - fsi_.size);
|
||||
auto res =
|
||||
cache_size_mgr::instance().expand(new_file_size - fsi_.size, true);
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
@ -555,7 +553,7 @@ void open_file::set_modified() {
|
||||
}
|
||||
|
||||
void open_file::update_background_reader(std::size_t read_chunk) {
|
||||
recur_mutex_lock reader_lock(file_mtx_);
|
||||
recur_mutex_lock file_lock(rw_mtx_);
|
||||
read_chunk_ = read_chunk;
|
||||
|
||||
if (reader_thread_ || stop_requested_) {
|
||||
@ -565,9 +563,9 @@ void open_file::update_background_reader(std::size_t read_chunk) {
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() {
|
||||
std::size_t next_chunk{};
|
||||
while (not stop_requested_) {
|
||||
unique_recur_mutex_lock file_lock(file_mtx_);
|
||||
unique_recur_mutex_lock lock(rw_mtx_);
|
||||
if ((fsi_.size == 0U) || read_state_.all()) {
|
||||
file_lock.unlock();
|
||||
lock.unlock();
|
||||
|
||||
unique_mutex_lock io_lock(io_thread_mtx_);
|
||||
if (not stop_requested_ && io_thread_queue_.empty()) {
|
||||
@ -584,7 +582,7 @@ void open_file::update_background_reader(std::size_t read_chunk) {
|
||||
} while ((next_chunk != 0U) &&
|
||||
(active_downloads_.find(next_chunk) != active_downloads_.end()));
|
||||
|
||||
file_lock.unlock();
|
||||
lock.unlock();
|
||||
download_chunk(next_chunk, true, false);
|
||||
}
|
||||
});
|
||||
@ -604,11 +602,9 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
unique_recur_mutex_lock write_lock(file_mtx_);
|
||||
if (stop_requested_) {
|
||||
return api_error::download_stopped;
|
||||
}
|
||||
write_lock.unlock();
|
||||
|
||||
auto start_chunk = static_cast<std::size_t>(write_offset / chunk_size_);
|
||||
auto end_chunk =
|
||||
@ -622,7 +618,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
|
||||
return get_api_error();
|
||||
}
|
||||
|
||||
write_lock.lock();
|
||||
unique_recur_mutex_lock file_lock(rw_mtx_);
|
||||
if ((write_offset + data.size()) > fsi_.size) {
|
||||
auto res = resize(write_offset + data.size());
|
||||
if (res != api_error::success) {
|
||||
|
@ -494,7 +494,8 @@ TEST_F(open_file_test, resize_file_to_0_bytes) {
|
||||
fsi.size = test_chunk_size * 4U;
|
||||
fsi.source_path = source_path;
|
||||
|
||||
EXPECT_EQ(api_error::success, cache_size_mgr::instance().expand(fsi.size));
|
||||
EXPECT_EQ(api_error::success,
|
||||
cache_size_mgr::instance().expand(fsi.size, false));
|
||||
|
||||
open_file file(test_chunk_size, 0U, fsi, provider, upload_mgr);
|
||||
test_closeable_open_file(file, false, api_error::success, fsi.size,
|
||||
@ -547,7 +548,8 @@ TEST_F(open_file_test, resize_file_by_full_chunk) {
|
||||
fsi.size = test_chunk_size * 4U;
|
||||
fsi.source_path = source_path;
|
||||
|
||||
EXPECT_EQ(api_error::success, cache_size_mgr::instance().expand(fsi.size));
|
||||
EXPECT_EQ(api_error::success,
|
||||
cache_size_mgr::instance().expand(fsi.size, false));
|
||||
|
||||
EXPECT_CALL(upload_mgr, store_resume)
|
||||
.WillOnce([&fsi](const i_open_file &file) {
|
||||
|
Reference in New Issue
Block a user