1 Commits

Author SHA1 Message Date
8dd46b8ad8 v2.0.2-rc (#27)
Some checks reported errors
BlockStorage/repertory/pipeline/head Something is wrong with the build of this commit
## v2.0.2-rc

### BREAKING CHANGES

* Refactored `config.json` - will need to verify configuration settings prior to mounting

### Issues

* \#12 \[Unit Test\] Complete all providers unit tests
* \#14 \[Unit Test\] SQLite mini-ORM unit tests and cleanup
* \#16 Add support for bucket name in Sia provider
* \#17 Update to common c++ build system
  * A single 64-bit Linux Jenkins server is used to build all Linux and Windows versions
  * All dependency sources are now included
  * MSVC is no longer supported
  * MSYS2 is required for building Windows binaries on Windows
  * OS X support is temporarily disabled
* \#19 \[bug\] Rename file is broken for files that are existing
* \#23 \[bug\] Incorrect file size displayed while upload is pending
* \#24 RocksDB implementations should be transactional
* \#25 Writes should block when maximum cache size is reached
* \#26 Complete ring buffer and direct download support

### Changes from v2.0.1-rc

* Ability to choose between RocksDB and SQLite databases
* Added direct reads and implemented download fallback
* Corrected file times on S3 and Sia providers
* Corrected handling of `chown()` and `chmod()`
* Fixed erroneous download of chunks after resize

Reviewed-on: #27
2024-12-28 15:56:40 -06:00
15 changed files with 371 additions and 319 deletions

View File

@ -27,7 +27,6 @@
* Ability to choose between RocksDB and SQLite databases
* Added direct reads and implemented download fallback
* Comprehensive WinFSP and FUSE unit tests, including remote testing
* Corrected file times on S3 and Sia providers
* Corrected handling of `chown()` and `chmod()`
* Fixed erroneous download of chunks after resize

View File

@ -22,7 +22,7 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#include "file_manager/ring_file_base.hpp"
#include "file_manager/ring_buffer_base.hpp"
#include "types/repertory.hpp"
@ -30,7 +30,7 @@ namespace repertory {
class i_provider;
class i_upload_manager;
class direct_open_file final : public ring_file_base {
class direct_open_file final : public ring_buffer_base {
public:
direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
@ -49,20 +49,33 @@ private:
std::array<data_buffer, min_ring_size> ring_data_;
protected:
[[nodiscard]] auto handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
[[nodiscard]] auto on_check_start() -> bool override;
[[nodiscard]] auto
use_buffer(std::size_t chunk,
std::function<api_error(const data_buffer &buffer)> func)
on_chunk_downloaded(std::size_t /* chunk */,
const data_buffer & /* buffer */) -> api_error override {
return api_error::success;
}
[[nodiscard]] auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override;
public:
[[nodiscard]] auto get_source_path() const -> std::string override {
return "direct";
[[nodiscard]] auto native_operation(native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory

View File

@ -25,6 +25,7 @@
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory {
class i_provider;
@ -68,11 +69,11 @@ private:
private:
bool allocated{false};
std::unique_ptr<utils::file::i_file> nf_;
bool notified_{false};
std::size_t read_chunk_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
mutable std::recursive_mutex rw_mtx_;
stop_type stop_requested_{false};
@ -93,7 +94,7 @@ private:
void set_read_state(boost::dynamic_bitset<> read_state);
void update_background_reader(std::size_t read_chunk);
void update_reader(std::size_t chunk);
public:
auto close() -> bool override;

View File

@ -24,8 +24,6 @@
#include "file_manager/i_open_file.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory {
class i_provider;
@ -107,6 +105,7 @@ private:
i_provider &provider_;
private:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
mutable std::recursive_mutex file_mtx_;
@ -121,16 +120,17 @@ private:
bool modified_{false};
bool removed_{false};
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
std::unique_ptr<utils::file::i_file> nf_;
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
[[nodiscard]] auto get_active_downloads()
-> std::unordered_map<std::size_t, std::shared_ptr<download>> & {
return active_downloads_;
}
[[nodiscard]] auto get_mutex() const -> std::recursive_mutex & {
return file_mtx_;
}

View File

@ -19,37 +19,39 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
#include "utils/file.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class ring_file_base : public open_file_base {
class ring_buffer_base : public open_file_base {
public:
ring_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io);
ring_buffer_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io);
~ring_file_base() override = default;
~ring_buffer_base() override = default;
public:
ring_buffer_base() = delete;
ring_buffer_base(const ring_buffer_base &) noexcept = delete;
ring_buffer_base(ring_buffer_base &&) noexcept = delete;
auto operator=(ring_buffer_base &&) noexcept -> ring_buffer_base & = delete;
auto
operator=(const ring_buffer_base &) noexcept -> ring_buffer_base & = delete;
public:
static constexpr const auto min_ring_size{5U};
public:
ring_file_base() = delete;
ring_file_base(const ring_file_base &) noexcept = delete;
ring_file_base(ring_file_base &&) noexcept = delete;
auto operator=(ring_file_base &&) noexcept -> ring_file_base & = delete;
auto operator=(const ring_file_base &) noexcept -> ring_file_base & = delete;
private:
boost::dynamic_bitset<> ring_state_;
boost::dynamic_bitset<> read_state_;
std::size_t total_chunks_;
private:
@ -72,34 +74,30 @@ private:
void update_position(std::size_t count, bool is_forward);
protected:
[[nodiscard]] auto get_read_state_size() const -> std::size_t;
[[nodiscard]] virtual auto handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error = 0;
[[nodiscard]] auto has_reader_thread() -> bool {
[[nodiscard]] auto has_reader_thread() const -> bool {
return reader_thread_ != nullptr;
}
[[nodiscard]] auto get_ring_size() const -> std::size_t {
return read_state_.size();
}
[[nodiscard]] virtual auto on_check_start() -> bool = 0;
[[nodiscard]] virtual auto
on_chunk_downloaded(std::size_t /* chunk */,
const data_buffer & /* buffer */) -> api_error {
return api_error::success;
}
on_chunk_downloaded(std::size_t chunk,
const data_buffer &buffer) -> api_error = 0;
[[nodiscard]] virtual auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error = 0;
[[nodiscard]] virtual auto
use_buffer(std::size_t chunk,
std::function<api_error(const data_buffer &buffer)> func)
-> api_error = 0;
std::function<api_error(data_buffer &)> func) -> api_error = 0;
public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool;
auto close() -> bool override;
void forward(std::size_t count);
@ -128,15 +126,6 @@ public:
return false;
}
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
@ -158,4 +147,4 @@ public:
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_

View File

@ -22,15 +22,16 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#include "file_manager/ring_file_base.hpp"
#include "file_manager/ring_buffer_base.hpp"
#include "types/repertory.hpp"
#include "utils/file.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class ring_buffer_open_file final : public ring_file_base {
class ring_buffer_open_file final : public ring_buffer_base {
public:
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
@ -50,11 +51,10 @@ public:
private:
std::string source_path_;
protected:
[[nodiscard]] auto handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
private:
std::unique_ptr<utils::file::i_file> nf_;
protected:
[[nodiscard]] auto on_check_start() -> bool override;
[[nodiscard]] auto
@ -62,11 +62,28 @@ protected:
const data_buffer &buffer) -> api_error override;
[[nodiscard]] auto
use_buffer(std::size_t chunk,
std::function<api_error(const data_buffer &buffer)> func)
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override;
public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool;
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto get_source_path() const -> std::string override {
return source_path_;
}

View File

@ -81,8 +81,8 @@ auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid,
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid)
-> api_error {
auto fuse_drive::chown_impl(std::string api_path, uid_t uid,
gid_t gid) -> api_error {
#endif
return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error {
@ -481,8 +481,8 @@ auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st,
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st)
-> api_error {
auto fuse_drive::getattr_impl(std::string api_path,
struct stat *unix_st) -> api_error {
#endif
auto parent = utils::path::get_parent_api_path(api_path);
@ -565,8 +565,8 @@ auto fuse_drive::getxtimes_impl(std::string api_path, struct timespec *bkuptime,
#endif // __APPLE__
#if FUSE_USE_VERSION >= 30
auto fuse_drive::init_impl(struct fuse_conn_info *conn, struct fuse_config *cfg)
-> void * {
auto fuse_drive::init_impl(struct fuse_conn_info *conn,
struct fuse_config *cfg) -> void * {
#else
void *fuse_drive::init_impl(struct fuse_conn_info *conn) {
#endif
@ -800,9 +800,8 @@ auto fuse_drive::release_impl(std::string /*api_path*/,
return api_error::success;
}
auto fuse_drive::releasedir_impl(std::string /*api_path*/,
struct fuse_file_info *file_info)
-> api_error {
auto fuse_drive::releasedir_impl(
std::string /*api_path*/, struct fuse_file_info *file_info) -> api_error {
auto iter = directory_cache_->get_directory(file_info->fh);
if (iter == nullptr) {
return api_error::invalid_handle;
@ -820,8 +819,8 @@ auto fuse_drive::rename_directory(const std::string &from_api_path,
}
auto fuse_drive::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite)
-> int {
const std::string &to_api_path,
bool overwrite) -> int {
auto res = fm_->rename_file(from_api_path, to_api_path, overwrite);
errno = std::abs(utils::from_api_error(res));
return (res == api_error::success) ? 0 : -1;
@ -831,8 +830,8 @@ auto fuse_drive::rename_file(const std::string &from_api_path,
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path,
unsigned int /*flags*/) -> api_error {
#else
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path)
-> api_error {
auto fuse_drive::rename_impl(std::string from_api_path,
std::string to_api_path) -> api_error {
#endif
auto res = check_parent_access(to_api_path, W_OK | X_OK);
if (res != api_error::success) {
@ -946,15 +945,15 @@ auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
}
#else // __APPLE__
auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
char *value, size_t size, int &attribute_size)
-> api_error {
char *value, size_t size,
int &attribute_size) -> api_error {
return getxattr_common(api_path, name, value, size, attribute_size, nullptr);
}
#endif // __APPLE__
auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
int &required_size, bool &return_size)
-> api_error {
int &required_size,
bool &return_size) -> api_error {
auto check_size = (size == 0);
auto res = check_parent_access(api_path, X_OK);
@ -994,8 +993,8 @@ auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
return res;
}
auto fuse_drive::removexattr_impl(std::string api_path, const char *name)
-> api_error {
auto fuse_drive::removexattr_impl(std::string api_path,
const char *name) -> api_error {
std::string attribute_name;
#if defined(__APPLE__)
auto res = parse_xattr_parameters(name, 0, attribute_name, api_path);
@ -1023,8 +1022,8 @@ auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
uint32_t position) -> api_error {
#else // __APPLE__
auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
const char *value, size_t size, int flags)
-> api_error {
const char *value, size_t size,
int flags) -> api_error {
#endif
std::string attribute_name;
#if defined(__APPLE__)
@ -1102,8 +1101,8 @@ void fuse_drive::set_item_meta(const std::string &api_path,
}
#if defined(__APPLE__)
auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
-> api_error {
auto fuse_drive::setattr_x_impl(std::string api_path,
struct setattr_x *attr) -> api_error {
bool exists{};
auto res = provider_.is_file(api_path, exists);
if (res != api_error::success) {
@ -1157,7 +1156,7 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
ts[0].tv_sec = attr->acctime.tv_sec;
ts[0].tv_nsec = attr->acctime.tv_nsec;
} else {
struct timeval tv{};
struct timeval tv {};
gettimeofday(&tv, NULL);
ts[0].tv_sec = tv.tv_sec;
ts[0].tv_nsec = tv.tv_usec * 1000;
@ -1202,9 +1201,8 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
return api_error::success;
}
auto fuse_drive::setbkuptime_impl(std::string api_path,
const struct timespec *bkuptime)
-> api_error {
auto fuse_drive::setbkuptime_impl(
std::string api_path, const struct timespec *bkuptime) -> api_error {
return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error {
auto nanos = bkuptime->tv_nsec +
@ -1240,8 +1238,8 @@ auto fuse_drive::setvolname_impl(const char * /*volname*/) -> api_error {
return api_error::success;
}
auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf)
-> api_error {
auto fuse_drive::statfs_x_impl(std::string /*api_path*/,
struct statfs *stbuf) -> api_error {
if (statfs(&config_.get_cache_directory()[0], stbuf) != 0) {
return api_error::os_error;
}
@ -1266,8 +1264,8 @@ auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf)
return api_error::success;
}
#else // __APPLE__
auto fuse_drive::statfs_impl(std::string /*api_path*/, struct statvfs *stbuf)
-> api_error {
auto fuse_drive::statfs_impl(std::string /*api_path*/,
struct statvfs *stbuf) -> api_error {
if (statvfs(config_.get_cache_directory().data(), stbuf) != 0) {
return api_error::os_error;
}
@ -1317,6 +1315,10 @@ auto fuse_drive::truncate_impl(std::string api_path, off_t size) -> api_error {
return res;
}
if (not fm_->get_open_file(handle, true, open_file)) {
return api_error::invalid_handle;
}
res = open_file->resize(static_cast<std::uint64_t>(size));
}
@ -1347,8 +1349,8 @@ auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2],
struct fuse_file_info * /*file_info*/)
-> api_error {
#else
auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2])
-> api_error {
auto fuse_drive::utimens_impl(std::string api_path,
const struct timespec tv[2]) -> api_error {
#endif
api_meta_map meta;
auto res = provider_.get_item_meta(api_path, meta);

View File

@ -21,28 +21,43 @@
*/
#include "file_manager/direct_open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
namespace repertory {
direct_open_file::direct_open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider)
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, min_ring_size,
true) {}
: ring_buffer_base(chunk_size, chunk_timeout, fsi, provider,
min_ring_size, true) {}
direct_open_file::~direct_open_file() { close(); }
direct_open_file::~direct_open_file() {
REPERTORY_USES_FUNCTION_NAME();
auto direct_open_file::handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &data)> func) -> api_error {
return func(ring_data_.at(chunk % get_read_state_size()));
close();
}
auto direct_open_file::on_check_start() -> bool {
return (get_file_size() == 0U || has_reader_thread());
}
auto direct_open_file::use_buffer(
std::size_t chunk,
std::function<api_error(const data_buffer &data)> func) -> api_error {
return func(ring_data_.at(chunk % get_read_state_size()));
auto direct_open_file::on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset,
data_buffer &data,
std::size_t &bytes_read) -> api_error {
auto &buffer = ring_data_.at(chunk % get_ring_size());
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(read_size));
data.insert(data.end(), begin, end);
bytes_read = read_size;
return api_error::success;
}
auto direct_open_file::use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error {
return func(ring_data_.at(chunk % get_ring_size()));
}
} // namespace repertory

View File

@ -39,7 +39,6 @@
#include "utils/file.hpp"
#include "utils/path.hpp"
#include "utils/polling.hpp"
#include "utils/time.hpp"
namespace repertory {
file_manager::file_manager(app_config &config, i_provider &provider)
@ -220,7 +219,7 @@ auto file_manager::get_open_file_by_handle(std::uint64_t handle) const
-> std::shared_ptr<i_closeable_open_file> {
auto file_iter =
std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(),
[&handle](const auto &item) -> bool {
[&handle](auto &&item) -> bool {
return item.second->has_handle(handle);
});
return (file_iter == open_file_lookup_.end()) ? nullptr : file_iter->second;
@ -381,11 +380,10 @@ auto file_manager::open(const std::string &api_path, bool directory,
return open(api_path, directory, ofd, handle, file, nullptr);
}
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file)
-> api_error {
auto file_manager::open(
const std::string &api_path, bool directory, const open_file_data &ofd,
std::uint64_t &handle, std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error {
REPERTORY_USES_FUNCTION_NAME();
const auto create_and_add_handle =
@ -437,7 +435,7 @@ auto file_manager::open(const std::string &api_path, bool directory,
auto ring_size{ring_buffer_file_size / chunk_size};
const auto get_download_type = [&](download_type type) -> download_type {
if (directory || fsi.size == 0U) {
if (directory || fsi.size == 0U || is_processing(api_path)) {
return download_type::default_;
}
@ -749,8 +747,8 @@ auto file_manager::rename_directory(const std::string &from_api_path,
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite)
-> api_error {
const std::string &to_api_path,
bool overwrite) -> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}

View File

@ -28,13 +28,10 @@
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp"
#include "utils/time.hpp"
#include "utils/utils.hpp"
namespace repertory {
open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
@ -281,12 +278,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto read_state = get_read_state();
if ((get_api_error() == api_error::success) && (chunk < read_state.size()) &&
not read_state[chunk]) {
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return;
}
auto active_download = active_downloads_.at(chunk);
auto active_download = get_active_downloads().at(chunk);
rw_lock.unlock();
active_download->wait();
@ -296,12 +293,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto data_offset = chunk * get_chunk_size();
auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size()
: get_chunk_size();
if (active_downloads_.empty() && (read_state.count() == 0U)) {
if (get_active_downloads().empty() && (read_state.count() == 0U)) {
event_system::instance().raise<download_begin>(get_api_path(),
get_source_path());
}
active_downloads_[chunk] = std::make_shared<download>();
get_active_downloads()[chunk] = std::make_shared<download>();
rw_lock.unlock();
if (should_reset) {
@ -314,8 +311,8 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto state = get_read_state();
unique_recur_mutex_lock lock(rw_mtx_);
auto active_download = active_downloads_.at(chunk);
active_downloads_.erase(chunk);
auto active_download = get_active_downloads().at(chunk);
get_active_downloads().erase(chunk);
if (get_api_error() == api_error::success) {
auto progress = (static_cast<double>(state.count()) /
static_cast<double>(state.size())) *
@ -452,7 +449,7 @@ auto open_file::native_operation(
auto read_state = get_read_state();
if (not is_empty_file && (last_chunk < read_state.size())) {
rw_lock.unlock();
update_background_reader(0U);
update_reader(0U);
download_chunk(last_chunk, false, true);
if (get_api_error() != api_error::success) {
@ -578,7 +575,7 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
auto end_chunk =
static_cast<std::size_t>((read_size + read_offset) / get_chunk_size());
update_background_reader(begin_chunk);
update_reader(begin_chunk);
download_range(begin_chunk, end_chunk, true);
if (get_api_error() != api_error::success) {
@ -655,18 +652,23 @@ void open_file::set_read_state(boost::dynamic_bitset<> read_state) {
read_state_ = std::move(read_state);
}
void open_file::update_background_reader(std::size_t read_chunk) {
void open_file::update_reader(std::size_t chunk) {
recur_mutex_lock rw_lock(rw_mtx_);
read_chunk_ = read_chunk;
read_chunk_ = chunk;
if (reader_thread_ || stop_requested_) {
return;
}
reader_thread_ = std::make_unique<std::thread>([this]() {
std::size_t next_chunk{};
unique_recur_mutex_lock lock(rw_mtx_);
auto next_chunk{read_chunk_};
auto read_chunk{read_chunk_};
lock.unlock();
while (not stop_requested_) {
unique_recur_mutex_lock lock(rw_mtx_);
lock.lock();
auto read_state = get_read_state();
if ((get_file_size() == 0U) || read_state.all()) {
lock.unlock();
@ -674,12 +676,11 @@ void open_file::update_background_reader(std::size_t read_chunk) {
continue;
}
do {
next_chunk = read_chunk_ =
((read_chunk_ + 1U) >= read_state.size()) ? 0U : read_chunk_ + 1U;
} while ((next_chunk != 0U) &&
(active_downloads_.find(next_chunk) != active_downloads_.end()));
if (read_chunk != read_chunk_) {
next_chunk = read_chunk = read_chunk_;
}
next_chunk = next_chunk + 1U >= read_state.size() ? 0U : next_chunk + 1U;
lock.unlock();
download_chunk(next_chunk, true, false);
@ -714,7 +715,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
auto end_chunk =
static_cast<std::size_t>((write_offset + data.size()) / get_chunk_size());
update_background_reader(begin_chunk);
update_reader(begin_chunk);
download_range(begin_chunk, std::min(get_read_state().size() - 1U, end_chunk),
true);

View File

@ -19,7 +19,7 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/ring_file_base.hpp"
#include "file_manager/ring_buffer_base.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
@ -31,41 +31,33 @@
#include "utils/error_utils.hpp"
namespace repertory {
ring_file_base::ring_file_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider, std::size_t ring_size,
bool disable_io)
ring_buffer_base::ring_buffer_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io),
ring_state_(ring_size),
read_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (disable_io) {
if (fsi.size > 0U) {
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
read_state_.resize(std::min(total_chunks_, read_state_.size()));
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
read_state_.set(0U, read_state_.size(), false);
}
} else {
if (ring_state_.size() < min_ring_size) {
throw std::runtime_error(fmt::format(
"ring size must be greater than or equal to {}", min_ring_size));
if (ring_size < min_ring_size) {
throw std::runtime_error("ring size must be greater than or equal to 5");
}
if (not can_handle_file(fsi.size, chunk_size, get_read_state_size())) {
throw std::runtime_error("file size is less than ring buffer size");
}
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
read_state_.set(0U, ring_size, false);
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), false);
}
auto ring_file_base::can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool {
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
}
auto ring_file_base::check_start() -> api_error {
auto ring_buffer_base::check_start() -> api_error {
REPERTORY_USES_FUNCTION_NAME();
try {
@ -82,30 +74,29 @@ auto ring_file_base::check_start() -> api_error {
utils::error::raise_api_path_error(function_name, get_api_path(),
get_source_path(), ex,
"failed to start");
return api_error::error;
}
return api_error::error;
}
auto ring_file_base::close() -> bool {
auto ring_buffer_base::close() -> bool {
stop_requested_ = true;
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
chunk_lock.unlock();
auto ret = open_file_base::close();
auto res = open_file_base::close();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
return ret;
return res;
}
auto ring_file_base::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
auto ring_buffer_base::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
@ -122,86 +113,78 @@ auto ring_file_base::download_chunk(std::size_t chunk,
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (active_downloads_.find(chunk) != active_downloads_.end()) {
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = active_downloads_.at(chunk);
auto active_download = get_active_downloads().at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
if (read_state_[chunk % read_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download;
get_active_downloads()[chunk] = active_download;
auto res = handle_read_buffer(chunk, [&](auto &&buffer) {
return use_buffer(chunk, [&](data_buffer &buffer) -> api_error {
auto data_offset{chunk * get_chunk_size()};
auto data_size{
chunk == (total_chunks_ - 1U) ? get_last_chunk_size()
: get_chunk_size(),
};
unlock_and_notify();
auto result{
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
buffer, stop_requested_),
};
if (result != api_error::success) {
return result;
}
result = on_chunk_downloaded(chunk, buffer);
if (result != api_error::success) {
return result;
chunk_lock.lock();
if (chunk < ring_begin_ || chunk > ring_end_) {
result = api_error::invalid_ring_buffer_position;
}
ring_state_[chunk % ring_state_.size()] = true;
auto progress =
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(
get_api_path(), get_source_path(), progress);
return api_error::success;
if (result == api_error::success) {
result = on_chunk_downloaded(chunk, buffer);
if (result == api_error::success) {
read_state_[chunk % read_state_.size()] = true;
auto progress = (static_cast<double>(chunk + 1U) /
static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(
get_api_path(), get_source_path(), progress);
}
}
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(result);
return result;
});
active_downloads_.erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
}
void ring_file_base::forward(std::size_t count) {
void ring_buffer_base::forward(std::size_t count) {
update_position(count, true);
}
auto ring_file_base::get_read_state() const -> boost::dynamic_bitset<> {
auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return ring_state_;
return read_state_;
}
auto ring_file_base::get_read_state(std::size_t chunk) const -> bool {
auto ring_buffer_base::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()];
return read_state_[chunk % read_state_.size()];
}
auto ring_file_base::get_read_state_size() const -> std::size_t {
recur_mutex_lock file_lock(get_mutex());
return ring_state_.size();
}
auto ring_file_base::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) {
return api_error::invalid_operation;
}
@ -236,8 +219,7 @@ auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (not stop_requested_ &&
res == api_error::invalid_ring_buffer_position) {
if (res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
@ -249,31 +231,28 @@ auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
reset_timeout();
auto to_read{
std::size_t bytes_read{};
res = on_read_chunk(
chunk,
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
read_size),
};
res = use_buffer(
chunk, [&data, &read_offset, &to_read](auto &&buffer) -> api_error {
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
data.insert(data.end(), begin, end);
return api_error::success;
});
read_offset, data, bytes_read);
if (res != api_error::success) {
return res;
}
reset_timeout();
read_size -= to_read;
read_size -= bytes_read;
read_offset = 0U;
}
return stop_requested_ ? api_error::download_stopped : res;
}
void ring_file_base::reader_thread() {
void ring_buffer_base::reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
auto next_chunk{ring_pos_};
chunk_notify_.notify_all();
chunk_lock.unlock();
@ -297,7 +276,7 @@ void ring_file_base::reader_thread() {
chunk_lock.unlock();
};
if (ring_state_[next_chunk % ring_state_.size()]) {
if (read_state_[next_chunk % read_state_.size()]) {
check_and_wait();
continue;
}
@ -306,20 +285,17 @@ void ring_file_base::reader_thread() {
chunk_lock.unlock();
download_chunk(next_chunk, true);
chunk_lock.lock();
check_and_wait();
}
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped);
}
void ring_file_base::reverse(std::size_t count) {
void ring_buffer_base::reverse(std::size_t count) {
update_position(count, false);
}
void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
void ring_buffer_base::set(std::size_t first_chunk, std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
@ -328,7 +304,7 @@ void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
ring_begin_ = first_chunk;
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
if (current_chunk > ring_end_) {
chunk_notify_.notify_all();
@ -337,18 +313,18 @@ void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
}
ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), true);
read_state_.set(0U, read_state_.size(), true);
chunk_notify_.notify_all();
}
void ring_file_base::set_api_path(const std::string &api_path) {
void ring_buffer_base::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void ring_file_base::update_position(std::size_t count, bool is_forward) {
void ring_buffer_base::update_position(std::size_t count, bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
@ -366,25 +342,24 @@ void ring_file_base::update_position(std::size_t count, bool is_forward) {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
if (delta >= read_state_.size()) {
read_state_.set(0U, read_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
read_state_[(ring_begin_ + idx) % read_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
read_state_[(ring_end_ - idx) % read_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
}
chunk_notify_.notify_all();

View File

@ -21,10 +21,13 @@
*/
#include "file_manager/ring_buffer_open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
#include "utils/file.hpp"
#include "utils/path.hpp"
namespace repertory {
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
@ -33,37 +36,45 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
filesystem_item fsi,
i_provider &provider,
std::size_t ring_size)
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, ring_size,
false),
: ring_buffer_base(chunk_size, chunk_timeout, fsi, provider, ring_size,
false),
source_path_(utils::path::combine(buffer_directory,
{
utils::create_uuid_string(),
})) {}
})) {
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
}
ring_buffer_open_file::~ring_buffer_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
if (nf_) {
nf_->close();
nf_.reset();
}
if (utils::file::file(get_source_path()).remove()) {
if (not nf_) {
return;
}
utils::error::raise_api_path_error(
function_name, get_api_path(), get_source_path(),
utils::get_last_error_code(), "failed to delete file");
nf_->close();
nf_.reset();
if (not utils::file::file(source_path_).remove()) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
utils::get_last_error_code(), "failed to delete file");
}
}
auto ring_buffer_open_file::handle_read_buffer(
std::size_t /* chunk */,
std::function<api_error(data_buffer &data)> func) -> api_error {
data_buffer buffer;
return func(buffer);
auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool {
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
}
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_buffer_open_file::on_check_start() -> bool {
@ -73,25 +84,25 @@ auto ring_buffer_open_file::on_check_start() -> bool {
return true;
}
auto buffer_directory{utils::path::get_parent_path(get_source_path())};
auto buffer_directory{utils::path::get_parent_path(source_path_)};
if (not utils::file::directory(buffer_directory).create_directory()) {
throw std::runtime_error(
fmt::format("failed to create buffer directory|path|{}|err|{}",
buffer_directory, utils::get_last_error_code()));
}
nf_ = utils::file::file::open_or_create_file(get_source_path());
nf_ = utils::file::file::open_or_create_file(source_path_);
if (not nf_ || not *nf_) {
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
utils::get_last_error_code()));
}
if (not nf_->truncate(get_read_state_size() * get_chunk_size())) {
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
if (not nf_->truncate(get_ring_size() * get_chunk_size())) {
nf_->close();
nf_.reset();
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
}
return false;
@ -101,7 +112,7 @@ auto ring_buffer_open_file::on_chunk_downloaded(
std::size_t chunk, const data_buffer &buffer) -> api_error {
return do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % get_read_state_size()) * get_chunk_size(),
if (nf_->write(buffer, (chunk % get_ring_size()) * get_chunk_size(),
&bytes_written)) {
return api_error::success;
}
@ -110,26 +121,31 @@ auto ring_buffer_open_file::on_chunk_downloaded(
});
}
auto ring_buffer_open_file::use_buffer(
std::size_t chunk,
std::function<api_error(const data_buffer &data)> func) -> api_error {
data_buffer buffer;
buffer.resize(get_chunk_size());
auto ring_buffer_open_file::on_read_chunk(
std::size_t chunk, std::size_t read_size, std::uint64_t read_offset,
data_buffer &data, std::size_t &bytes_read) -> api_error {
data_buffer buffer(read_size);
auto res = do_io([&]() -> api_error {
std::size_t bytes_read{};
auto result =
nf_->read(buffer, (chunk % get_read_state_size()) * get_chunk_size(),
&bytes_read)
? api_error::success
: api_error::os_error;
buffer.resize(bytes_read);
return result;
return nf_->read(
buffer,
(((chunk % get_ring_size()) * get_chunk_size()) + read_offset),
&bytes_read)
? api_error::success
: api_error::os_error;
});
if (res != api_error::success) {
return res;
}
data.insert(data.end(), buffer.begin(), buffer.end());
return api_error::success;
}
auto ring_buffer_open_file::use_buffer(
std::size_t /* chunk */,
std::function<api_error(data_buffer &)> func) -> api_error {
data_buffer buffer;
return func(buffer);
}
} // namespace repertory

View File

@ -52,6 +52,8 @@ auto from_api_error(const api_error &err) -> int {
return -EEXIST;
case api_error::file_in_use:
return -EBUSY;
case api_error::invalid_handle:
return -EBADF;
case api_error::invalid_operation:
return -EINVAL;
case api_error::item_not_found:

View File

@ -52,11 +52,14 @@ TEST_F(direct_open_file_test, read_full_file) {
fsi.directory = false;
fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &source_file](
const std::string & /* api_path */, std::size_t size,
std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -111,11 +114,14 @@ TEST_F(direct_open_file_test, read_full_file_in_reverse) {
fsi.directory = false;
fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &source_file](
const std::string & /* api_path */, std::size_t size,
std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -170,11 +176,14 @@ TEST_F(direct_open_file_test, read_full_file_in_partial_chunks) {
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &source_file](
const std::string & /* api_path */, std::size_t size,
std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -226,11 +235,14 @@ TEST_F(direct_open_file_test, read_full_file_in_partial_chunks_in_reverse) {
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &source_file](
const std::string & /* api_path */, std::size_t size,
std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);

View File

@ -308,11 +308,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file) {
fsi.size = test_chunk_size * 33u + 11u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -371,11 +374,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file_in_reverse) {
fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -434,11 +440,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file_in_partial_chunks) {
fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("test");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);
@ -499,11 +508,14 @@ TEST_F(ring_buffer_open_file_test,
fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
.WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset,
data_buffer &data,
stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested);
std::size_t bytes_read{};
data.resize(size);