15 Commits

Author SHA1 Message Date
934d400cb4 unit tests and fixes
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good
2024-12-26 13:06:10 -06:00
4fcc2ec076 unit tests and fixes 2024-12-26 12:59:56 -06:00
54bfaf70c4 unit tests and fixes 2024-12-26 12:28:21 -06:00
2632388b91 refactor 2024-12-26 11:22:22 -06:00
0ef364b85f unit tests and fixes 2024-12-26 11:21:37 -06:00
05b61410ca unit tests and fixes 2024-12-26 11:17:39 -06:00
d3e2e768c6 unit test fixes 2024-12-26 10:56:51 -06:00
ac1183589c fixes 2024-12-26 09:56:33 -06:00
89daf1d688 fix 2024-12-26 09:24:53 -06:00
f402f2ef9a fix 2024-12-26 09:18:30 -06:00
4260aa9d9b refactor 2024-12-26 09:00:09 -06:00
ee68904585 fixes 2024-12-26 08:58:49 -06:00
6444b407c1 fix 2024-12-26 08:09:47 -06:00
daabab3a1b refactor 2024-12-26 08:06:22 -06:00
bc0e216b75 refactor direct_open_file 2024-12-26 07:57:59 -06:00
13 changed files with 445 additions and 267 deletions

View File

@ -19,8 +19,8 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_
#include "file_manager/open_file_base.hpp"
@ -45,46 +45,55 @@ public:
auto operator=(const direct_open_file &) noexcept
-> direct_open_file & = delete;
public:
static constexpr const auto ring_size{5U};
private:
std::atomic<std::uint64_t> last_progress_{0U};
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_;
std::size_t ring_begin_{};
std::array<data_buffer, ring_size> ring_data_;
std::size_t ring_end_{};
std::size_t ring_pos_{};
stop_type stop_requested_{false};
protected:
[[nodiscard]] auto is_download_complete() const -> bool override {
return false;
}
private:
void background_reader_thread();
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
public:
auto close() -> bool override;
[[nodiscard]] auto is_complete() const -> bool override { return true; }
void forward(std::size_t count);
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return false; }
[[nodiscard]] auto is_write_supported() const -> bool override {
return false;
}
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override {
return {};
}
[[nodiscard]] auto get_read_state(std::size_t /* chunk */) const
-> bool override {
return false;
}
[[nodiscard]] auto get_total_chunks() const -> std::uint64_t {
return total_chunks_;
}
[[nodiscard]] auto native_operation(native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /*callback*/)
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
@ -92,15 +101,21 @@ public:
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t /*size*/) -> api_error override {
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
void reverse(std::size_t count);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t /* write_offset */,
const data_buffer & /* data */,
std::size_t & /* bytes_written */)
-> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#endif // REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE2_HPP_

View File

@ -32,23 +32,6 @@ E_SIMPLE2(download_begin, info, true,
std::string, dest_path, dest, E_FROM_STRING
);
E_SIMPLE5(download_chunk_begin, debug, true,
std::string, api_path, ap, E_FROM_STRING,
std::string, dest_path, dest, E_FROM_STRING,
std::size_t, chunk, chunk, E_FROM_SIZE_T,
std::size_t, total, total, E_FROM_SIZE_T,
std::size_t, complete, complete, E_FROM_SIZE_T
);
E_SIMPLE6(download_chunk_end, debug, true,
std::string, api_path, ap, E_FROM_STRING,
std::string, dest_path, dest, E_FROM_STRING,
std::size_t, chunk, chunk, E_FROM_SIZE_T,
std::size_t, total, total, E_FROM_SIZE_T,
std::size_t, complete, complete, E_FROM_SIZE_T,
api_error, result, result, E_FROM_API_FILE_ERROR
);
E_SIMPLE3(download_end, info, true,
std::string, api_path, ap, E_FROM_STRING,
std::string, dest_path, dest, E_FROM_STRING,

View File

@ -62,8 +62,12 @@ public:
[[nodiscard]] virtual auto get_source_path() const -> std::string = 0;
[[nodiscard]] virtual auto is_complete() const -> bool = 0;
[[nodiscard]] virtual auto is_directory() const -> bool = 0;
[[nodiscard]] virtual auto is_write_supported() const -> bool = 0;
[[nodiscard]] virtual auto has_handle(std::uint64_t handle) const -> bool = 0;
[[nodiscard]] virtual auto
@ -82,9 +86,6 @@ public:
virtual void set_api_path(const std::string &api_path) = 0;
virtual void
set_open_data(std::map<std::uint64_t, open_file_data> open_data) = 0;
[[nodiscard]] virtual auto write(std::uint64_t write_offset,
const data_buffer &data,
std::size_t &bytes_written) -> api_error = 0;
@ -105,12 +106,8 @@ public:
[[nodiscard]] virtual auto get_handles() const
-> std::vector<std::uint64_t> = 0;
[[nodiscard]] virtual auto is_complete() const -> bool = 0;
[[nodiscard]] virtual auto is_modified() const -> bool = 0;
[[nodiscard]] virtual auto is_write_supported() const -> bool = 0;
virtual void remove(std::uint64_t handle) = 0;
virtual void remove_all() = 0;

View File

@ -95,9 +95,6 @@ private:
void update_background_reader(std::size_t read_chunk);
protected:
[[nodiscard]] auto is_download_complete() const -> bool override;
public:
auto close() -> bool override;

View File

@ -131,8 +131,6 @@ private:
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
[[nodiscard]] virtual auto is_download_complete() const -> bool = 0;
void reset_timeout();
auto set_api_error(const api_error &e) -> api_error;
@ -191,9 +189,6 @@ public:
void remove_all() override;
void set_api_path(const std::string &api_path) override;
void
set_open_data(std::map<std::uint64_t, open_file_data> open_data) override;
};
} // namespace repertory

View File

@ -65,14 +65,9 @@ private:
stop_type stop_requested_{false};
private:
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void background_reader_thread();
protected:
[[nodiscard]] auto is_download_complete() const -> bool override {
return false;
}
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
@ -101,7 +96,7 @@ public:
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; }
[[nodiscard]] auto is_complete() const -> bool override { return false; }
[[nodiscard]] auto is_write_supported() const -> bool override {
return false;
@ -110,7 +105,8 @@ public:
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback)
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
@ -128,7 +124,9 @@ public:
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
[[nodiscard]] auto write(std::uint64_t /* write_offset */,
const data_buffer & /* data */,
std::size_t & /* bytes_written */)
-> api_error override {
return api_error::not_supported;
}

View File

@ -27,28 +27,225 @@
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/time.hpp"
namespace repertory {
direct_open_file::direct_open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, true),
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
event_system::instance().raise<download_begin>(fsi_.api_path, "");
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), true);
if (fsi_.size > 0U) {
reader_thread_ =
std::make_unique<std::thread>([this]() { background_reader_thread(); });
}
event_system::instance().raise<download_begin>(fsi_.api_path, "direct");
}
direct_open_file::~direct_open_file() { close(); }
direct_open_file::~direct_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
}
void direct_open_file::background_reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
chunk_notify_.notify_all();
chunk_lock.unlock();
while (not stop_requested_) {
chunk_lock.lock();
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
if (stop_requested_) {
chunk_notify_.notify_all();
chunk_lock.unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
next_chunk = ring_pos_;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
};
if (not ring_state_[next_chunk % ring_state_.size()]) {
check_and_wait();
continue;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
download_chunk(next_chunk, true);
chunk_lock.lock();
check_and_wait();
}
event_system::instance().raise<download_end>(fsi_.api_path, "direct",
api_error::download_stopped);
}
auto direct_open_file::close() -> bool {
stop_requested_ = true;
last_progress_ = 0U;
auto ret = open_file_base::close();
event_system::instance().raise<download_end>(fsi_.api_path, "",
api_error::download_stopped);
return ret;
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
chunk_lock.unlock();
return open_file_base::close();
}
auto direct_open_file::download_chunk(std::size_t chunk, bool skip_active)
-> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = active_downloads_.at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (not ring_state_[chunk % ring_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
auto &buffer = ring_data_.at(chunk % ring_state_.size());
auto data_offset{chunk * chunk_size_};
auto data_size{
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
unlock_and_notify();
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
};
chunk_lock.lock();
if (res == api_error::success) {
auto progress =
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(fsi_.api_path, "direct",
progress);
res = (chunk >= ring_begin_ && chunk <= ring_end_)
? res
: api_error::invalid_ring_buffer_position;
}
active_downloads_.erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
}
void direct_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
if ((ring_pos_ + count) <= ring_end_) {
ring_pos_ += count;
} else {
auto added = count - (ring_end_ - ring_pos_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
ring_pos_ += count;
ring_begin_ += added;
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = true;
}
ring_begin_ += added;
ring_pos_ += count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
return read_state.flip();
}
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
}
void direct_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
count = std::min(ring_pos_, count);
if ((ring_pos_ - count) >= ring_begin_) {
ring_pos_ -= count;
} else {
auto removed = count - (ring_pos_ - ring_begin_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
ring_pos_ -= count;
ring_begin_ = ring_pos_;
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = true;
}
ring_begin_ -= removed;
ring_pos_ -= count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
@ -64,23 +261,59 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
return api_error::success;
}
auto res = provider_.read_file_bytes(fsi_.api_path, read_size, read_offset,
data, stop_requested_);
if (res != api_error::success) {
return res;
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
read_offset = read_offset - (begin_chunk * chunk_size_);
auto res{api_error::success};
unique_mutex_lock read_lock(read_mtx_);
for (std::size_t chunk = begin_chunk;
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
++chunk) {
reset_timeout();
if (chunk > ring_pos_) {
forward(chunk - ring_pos_);
} else if (chunk < ring_pos_) {
reverse(ring_pos_ - chunk);
}
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (not stop_requested_ &&
res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res;
}
reset_timeout();
auto to_read{
std::min(static_cast<std::size_t>(chunk_size_ - read_offset),
read_size),
};
auto &buffer = ring_data_.at(chunk % ring_state_.size());
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
data.insert(data.end(), begin, end);
read_offset = 0U;
read_size -= static_cast<std::uint64_t>(std::distance(begin, end));
}
reset_timeout();
if ((utils::time::get_time_now() - last_progress_.load()) >
(2U * utils::time::NANOS_PER_SECOND)) {
last_progress_ = utils::time::get_time_now();
auto progress = (static_cast<double>(read_offset + read_size) /
static_cast<double>(fsi_.size)) *
100.0;
event_system::instance().raise<download_progress>(fsi_.api_path, "",
progress);
}
return stop_requested_ ? api_error::download_stopped : res;
}
return res;
void direct_open_file::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
} // namespace repertory

View File

@ -158,6 +158,10 @@ auto file_manager::evict_file(const std::string &api_path) -> bool {
return false;
}
if (fsi.source_path.empty()) {
return false;
}
std::string pinned;
res = provider_.get_item_meta(api_path, META_PINNED, pinned);
if (res != api_error::success && res != api_error::item_not_found) {
@ -170,35 +174,22 @@ auto file_manager::evict_file(const std::string &api_path) -> bool {
return false;
}
std::string source_path{};
res = provider_.get_item_meta(api_path, META_SOURCE, source_path);
if (res != api_error::success) {
utils::error::raise_api_path_error(std::string{function_name}, api_path,
res, "failed to get source path");
return false;
}
if (source_path.empty()) {
return false;
}
std::shared_ptr<i_closeable_open_file> closeable_file;
if (open_file_lookup_.contains(api_path)) {
closeable_file = open_file_lookup_.at(api_path);
}
open_file_lookup_.erase(api_path);
auto allocated = closeable_file ? closeable_file->get_allocated() : true;
auto removed = remove_source_and_shrink_cache(api_path, source_path, fsi.size,
allocated);
open_lock.unlock();
auto allocated = closeable_file ? closeable_file->get_allocated() : true;
closeable_file.reset();
auto removed = remove_source_and_shrink_cache(api_path, fsi.source_path,
fsi.size, allocated);
if (removed) {
event_system::instance().raise<filesystem_item_evicted>(api_path,
source_path);
fsi.source_path);
}
return removed;
@ -377,7 +368,10 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
auto closeable_file = file_iter->second;
open_lock.unlock();
return closeable_file->is_modified() || not closeable_file->is_complete();
return closeable_file->is_write_supported()
? closeable_file->is_modified() ||
not closeable_file->is_complete()
: false;
}
auto file_manager::open(const std::string &api_path, bool directory,
@ -387,10 +381,11 @@ auto file_manager::open(const std::string &api_path, bool directory,
return open(api_path, directory, ofd, handle, file, nullptr);
}
auto file_manager::open(
const std::string &api_path, bool directory, const open_file_data &ofd,
std::uint64_t &handle, std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error {
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file)
-> api_error {
REPERTORY_USES_FUNCTION_NAME();
const auto create_and_add_handle =
@ -754,8 +749,8 @@ auto file_manager::rename_directory(const std::string &from_api_path,
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path,
bool overwrite) -> api_error {
const std::string &to_api_path, bool overwrite)
-> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}

View File

@ -300,9 +300,6 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
event_system::instance().raise<download_begin>(fsi_.api_path,
fsi_.source_path);
}
event_system::instance().raise<download_chunk_begin>(
fsi_.api_path, fsi_.source_path, chunk, read_state.size(),
read_state.count());
active_downloads_[chunk] = std::make_shared<download>();
rw_lock.unlock();
@ -319,9 +316,6 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
unique_recur_mutex_lock lock(rw_mtx_);
auto active_download = active_downloads_.at(chunk);
active_downloads_.erase(chunk);
event_system::instance().raise<download_chunk_end>(
fsi_.api_path, fsi_.source_path, chunk, state.size(), state.count(),
get_api_error());
if (get_api_error() == api_error::success) {
auto progress = (static_cast<double>(state.count()) /
static_cast<double>(state.size())) *
@ -409,10 +403,6 @@ auto open_file::get_read_state(std::size_t chunk) const -> bool {
auto open_file::is_complete() const -> bool { return get_read_state().all(); }
auto open_file::is_download_complete() const -> bool {
return get_read_state().all();
}
auto open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
if (stop_requested_) {

View File

@ -119,7 +119,7 @@ auto open_file_base::can_close() const -> bool {
return true;
}
if (is_download_complete()) {
if (is_complete()) {
return true;
}
@ -303,12 +303,6 @@ void open_file_base::set_api_path(const std::string &api_path) {
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
}
void open_file_base::set_open_data(
std::map<std::uint64_t, open_file_data> open_data) {
recur_mutex_lock file_lock(file_mtx_);
open_data_ = std::move(open_data);
}
auto open_file_base::close() -> bool {
unique_mutex_lock io_lock(io_thread_mtx_);
if (io_stop_requested_ || not io_thread_) {

View File

@ -45,12 +45,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
if ((ring_size % 2U) != 0U) {
throw std::runtime_error("ring size must be a multiple of 2");
}
if (ring_size < 4U) {
throw std::runtime_error("ring size must be greater than or equal to 4");
if (ring_size < 5U) {
throw std::runtime_error("ring size must be greater than or equal to 5");
}
if (not can_handle_file(fsi_.size, chunk_size, ring_size)) {
@ -149,7 +145,7 @@ void ring_buffer_open_file::background_reader_thread() {
auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool {
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size * 2U);
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
}
auto ring_buffer_open_file::close() -> bool {
@ -206,10 +202,6 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active)
chunk == (total_chunks_ - 1U) ? last_chunk_size_ : chunk_size_,
};
event_system::instance().raise<download_chunk_begin>(
fsi_.api_path, source_path_, chunk, get_read_state().size(),
get_read_state().count());
auto res{
provider_.read_file_bytes(fsi_.api_path, data_size, data_offset, buffer,
stop_requested_),
@ -236,10 +228,6 @@ auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active)
: api_error::invalid_ring_buffer_position;
}
event_system::instance().raise<download_chunk_end>(
fsi_.api_path, source_path_, chunk, get_read_state().size(),
get_read_state().count(), res);
active_downloads_.erase(chunk);
unlock_and_notify();
@ -340,7 +328,10 @@ auto ring_buffer_open_file::read(std::size_t read_size,
unique_mutex_lock read_lock(read_mtx_);
for (std::size_t chunk = begin_chunk;
(res == api_error::success) && (read_size > 0U); ++chunk) {
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
++chunk) {
reset_timeout();
if (chunk > ring_pos_) {
forward(chunk - ring_pos_);
} else if (chunk < ring_pos_) {
@ -393,7 +384,7 @@ auto ring_buffer_open_file::read(std::size_t read_size,
read_offset = 0U;
}
return res;
return stop_requested_ ? api_error::download_stopped : res;
}
void ring_buffer_open_file::set(std::size_t first_chunk,

View File

@ -29,6 +29,13 @@
namespace repertory {
class mock_open_file : public virtual i_closeable_open_file {
public:
MOCK_METHOD(void, add, (std::uint64_t handle, open_file_data ofd),
(override));
MOCK_METHOD(bool, can_close, (), (const, override));
MOCK_METHOD(bool, close, (), (override));
MOCK_METHOD(std::string, get_api_path, (), (const, override));
MOCK_METHOD(std::size_t, get_chunk_size, (), (const, override));
@ -49,14 +56,28 @@ public:
MOCK_METHOD(bool, get_allocated, (), (const, override));
MOCK_METHOD(std::vector<std::uint64_t>, get_handles, (), (const, override));
MOCK_METHOD((std::map<std::uint64_t, open_file_data> &), get_open_data, (),
(override));
MOCK_METHOD((const std::map<std::uint64_t, open_file_data> &), get_open_data,
(), (const, override));
MOCK_METHOD(bool, get_read_state, (std::size_t chunk), (const, override));
MOCK_METHOD(std::string, get_source_path, (), (const, override));
MOCK_METHOD(bool, has_handle, (std::uint64_t handle), (const, override));
MOCK_METHOD(bool, is_complete, (), (const, override));
MOCK_METHOD(bool, is_directory, (), (const, override));
MOCK_METHOD(bool, is_modified, (), (const, override));
MOCK_METHOD(bool, is_write_supported, (), (const, override));
MOCK_METHOD(api_error, native_operation, (native_operation_callback callback),
(override));
@ -69,6 +90,10 @@ public:
data_buffer &data),
(override));
MOCK_METHOD(void, remove, (std::uint64_t handle), (override));
MOCK_METHOD(void, remove_all, (), (override));
MOCK_METHOD(api_error, resize, (std::uint64_t new_file_size), (override));
MOCK_METHOD(void, set_api_path, (const std::string &api_path), (override));
@ -77,35 +102,6 @@ public:
(std::uint64_t write_offset, const data_buffer &data,
std::size_t &bytes_written),
(override));
MOCK_METHOD(void, add, (std::uint64_t handle, open_file_data ofd),
(override));
MOCK_METHOD(bool, can_close, (), (const, override));
MOCK_METHOD(bool, close, (), (override));
MOCK_METHOD(std::vector<std::uint64_t>, get_handles, (), (const, override));
MOCK_METHOD((std::map<std::uint64_t, open_file_data> &), get_open_data, (),
(override));
MOCK_METHOD((const std::map<std::uint64_t, open_file_data> &), get_open_data,
(), (const, override));
MOCK_METHOD(bool, is_complete, (), (const, override));
MOCK_METHOD(bool, is_modified, (), (const, override));
MOCK_METHOD(bool, is_write_supported, (), (const, override));
MOCK_METHOD(void, remove, (std::uint64_t handle), (override));
MOCK_METHOD(void, remove_all, (), (override));
MOCK_METHOD(void, set_open_data,
((std::map<std::uint64_t, open_file_data> open_data)),
(override));
};
} // namespace repertory

View File

@ -431,16 +431,6 @@ TEST_F(file_manager_test,
return api_error::success;
});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_write_partial_download.txt",
false, {}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_write_partial_download.txt",
false, O_RDWR, handle, open_file));
#endif
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
@ -465,6 +455,17 @@ TEST_F(file_manager_test,
return api_error::download_stopped;
});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_write_partial_download.txt",
false, {}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_write_partial_download.txt",
false, O_RDWR, handle, open_file));
#endif
EXPECT_CALL(mp, set_item_meta("/test_write_partial_download.txt", _))
.WillOnce(
[](const std::string &, const api_meta_map &meta2) -> api_error {
@ -475,6 +476,10 @@ TEST_F(file_manager_test,
});
EXPECT_CALL(mp, upload_file).Times(0u);
if (not open_file->is_write_supported()) {
EXPECT_TRUE(mgr.get_open_file(handle, true, open_file));
}
std::size_t bytes_written{};
data_buffer data = {0, 1, 2};
EXPECT_EQ(api_error::success, open_file->write(0u, data, bytes_written));
@ -560,7 +565,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
EXPECT_STREQ(source_path.c_str(),
evt2.get_source().get<std::string>().c_str());
});
event_capture capture({"download_end"});
auto now = utils::time::get_time_now();
auto meta = create_meta_attributes(
@ -584,16 +588,6 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
return api_error::success;
});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_write_full_download.txt", false,
{}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_write_full_download.txt", false,
O_RDWR, handle, open_file));
#endif
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
@ -606,6 +600,17 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
EXPECT_EQ(bytes_read, data.size());
return ret;
});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_write_full_download.txt", false,
{}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_write_full_download.txt", false,
O_RDWR, handle, open_file));
#endif
EXPECT_CALL(mp, set_item_meta("/test_write_full_download.txt", _))
.WillOnce(
[](const std::string &, const api_meta_map &meta2) -> api_error {
@ -614,25 +619,33 @@ TEST_F(file_manager_test, upload_occurs_after_write_if_fully_downloaded) {
EXPECT_NO_THROW(EXPECT_FALSE(meta2.at(META_WRITTEN).empty()));
return api_error::success;
});
std::size_t bytes_written{};
data_buffer data = {0, 1, 2};
EXPECT_EQ(api_error::success, open_file->write(0u, data, bytes_written));
EXPECT_EQ(std::size_t(3u), bytes_written);
open_file.reset();
capture.wait_for_empty();
if (not open_file->is_write_supported()) {
EXPECT_TRUE(mgr.get_open_file(handle, true, open_file));
}
EXPECT_CALL(mp, upload_file("/test_write_full_download.txt", source_path, _))
.WillOnce(Return(api_error::success));
event_capture ec2({
event_capture capture({
"item_timeout",
"file_upload_queued",
"file_upload_completed",
});
EXPECT_CALL(mp, upload_file("/test_write_full_download.txt", source_path, _))
.WillOnce(Return(api_error::success));
std::size_t bytes_written{};
data_buffer data = {0, 1, 2};
EXPECT_EQ(api_error::success, open_file->write(0u, data, bytes_written));
EXPECT_EQ(std::size_t(3u), bytes_written);
while (not open_file->is_complete()) {
std::this_thread::sleep_for(10ms);
}
open_file.reset();
mgr.close(handle);
ec2.wait_for_empty();
capture.wait_for_empty();
EXPECT_EQ(std::size_t(0U), mgr.get_open_file_count());
EXPECT_EQ(std::size_t(0U), mgr.get_open_handle_count());
@ -697,6 +710,10 @@ TEST_F(file_manager_test, can_evict_file) {
.WillRepeatedly(Return(api_error::success));
EXPECT_CALL(mp, upload_file(_, _, _)).WillOnce(Return(api_error::success));
if (not open_file->is_write_supported()) {
EXPECT_TRUE(mgr.get_open_file(handle, true, open_file));
}
data_buffer data{{0, 1, 1}};
std::size_t bytes_written{};
auto res = open_file->write(0U, data, bytes_written);
@ -713,15 +730,6 @@ TEST_F(file_manager_test, can_evict_file) {
EXPECT_TRUE(utils::retry_action(
[&mgr]() -> bool { return not mgr.is_processing("/test_evict.txt"); }));
EXPECT_CALL(mp, get_item_meta(_, META_SOURCE, _))
.WillOnce([&source_path](const std::string &api_path,
const std::string &key,
std::string &value) -> api_error {
EXPECT_STREQ("/test_evict.txt", api_path.c_str());
EXPECT_STREQ(META_SOURCE.c_str(), key.c_str());
value = source_path;
return api_error::success;
});
EXPECT_CALL(mp, get_item_meta(_, META_PINNED, _))
.WillOnce([](const std::string &api_path, const std::string &key,
std::string &value) -> api_error {
@ -798,27 +806,14 @@ TEST_F(file_manager_test, evict_file_fails_if_file_is_open) {
mgr.close(handle);
}
TEST_F(file_manager_test,
evict_file_fails_if_unable_to_get_source_path_from_item_meta) {
TEST_F(file_manager_test, evict_file_fails_if_unable_to_get_filesystem_item) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
file_manager mgr(*cfg, mp);
EXPECT_CALL(mp, get_item_meta(_, META_SOURCE, _))
.WillOnce([](const std::string &api_path, const std::string &key,
std::string & /*value*/) -> api_error {
EXPECT_STREQ("/test_open.txt", api_path.c_str());
EXPECT_STREQ(META_SOURCE.c_str(), key.c_str());
return api_error::error;
});
EXPECT_CALL(mp, get_item_meta(_, META_PINNED, _))
.WillOnce([](const std::string &api_path, const std::string &key,
std::string &value) -> api_error {
EXPECT_STREQ("/test_open.txt", api_path.c_str());
EXPECT_STREQ(META_PINNED.c_str(), key.c_str());
value = "0";
return api_error::success;
});
EXPECT_CALL(mp, get_filesystem_item)
.WillRepeatedly(
[](const std::string &api_path, bool directory,
filesystem_item &fsi) -> api_error { return api_error::error; });
EXPECT_FALSE(mgr.evict_file("/test_open.txt"));
}
@ -827,20 +822,13 @@ TEST_F(file_manager_test, evict_file_fails_if_source_path_is_empty) {
EXPECT_CALL(mp, is_read_only()).WillRepeatedly(Return(false));
file_manager mgr(*cfg, mp);
EXPECT_CALL(mp, get_item_meta(_, META_SOURCE, _))
.WillOnce([](const std::string &api_path, const std::string &key,
std::string &value) -> api_error {
EXPECT_STREQ("/test_open.txt", api_path.c_str());
EXPECT_STREQ(META_SOURCE.c_str(), key.c_str());
value = "";
return api_error::success;
});
EXPECT_CALL(mp, get_item_meta(_, META_PINNED, _))
.WillOnce([](const std::string &api_path, const std::string &key,
std::string &value) -> api_error {
EXPECT_STREQ("/test_open.txt", api_path.c_str());
EXPECT_STREQ(META_PINNED.c_str(), key.c_str());
value = "0";
EXPECT_CALL(mp, get_filesystem_item)
.WillRepeatedly([](const std::string &api_path, bool directory,
filesystem_item &fsi) -> api_error {
fsi.api_path = api_path;
fsi.api_parent = utils::path::get_parent_api_path(api_path);
fsi.directory = directory;
fsi.size = 20U;
return api_error::success;
});
@ -908,6 +896,10 @@ TEST_F(file_manager_test, evict_file_fails_if_file_is_uploading) {
return api_error::success;
});
if (not open_file->is_write_supported()) {
EXPECT_TRUE(mgr.get_open_file(handle, true, open_file));
}
data_buffer data{{0, 1, 1}};
std::size_t bytes_written{};
EXPECT_EQ(api_error::success, open_file->write(0U, data, bytes_written));
@ -1461,26 +1453,16 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) {
event_capture capture({"item_timeout"});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_download_timeout.txt", false,
{}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_download_timeout.txt", false,
O_RDWR, handle, open_file));
#endif
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([](const std::string & /* api_path */,
std::size_t /*size*/, std::uint64_t offset,
data_buffer & /*data*/,
.WillRepeatedly([](const std::string & /* api_path */, std::size_t size,
std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error {
if (stop_requested) {
return api_error::download_stopped;
}
if (offset == 0U) {
data.resize(size);
return api_error::success;
}
@ -1491,13 +1473,25 @@ TEST_F(file_manager_test, file_is_closed_after_download_timeout) {
return api_error::download_stopped;
});
std::uint64_t handle{};
std::shared_ptr<i_open_file> open_file;
#if defined(_WIN32)
EXPECT_EQ(api_error::success, mgr.open("/test_download_timeout.txt", false,
{}, handle, open_file));
#else
EXPECT_EQ(api_error::success, mgr.open("/test_download_timeout.txt", false,
O_RDWR, handle, open_file));
#endif
data_buffer data{};
EXPECT_EQ(api_error::success, open_file->read(1U, 0U, data));
mgr.close(handle);
EXPECT_CALL(mp, set_item_meta("/test_download_timeout.txt", META_SOURCE, _))
.WillOnce(Return(api_error::success));
if (open_file->is_write_supported()) {
EXPECT_CALL(mp, set_item_meta("/test_download_timeout.txt", META_SOURCE, _))
.WillOnce(Return(api_error::success));
}
EXPECT_EQ(std::size_t(1U), mgr.get_open_file_count());
capture.wait_for_empty();