1 Commits

Author SHA1 Message Date
8dd46b8ad8 v2.0.2-rc (#27)
Some checks reported errors
BlockStorage/repertory/pipeline/head Something is wrong with the build of this commit
## v2.0.2-rc

### BREAKING CHANGES

* Refactored `config.json` - will need to verify configuration settings prior to mounting

### Issues

* \#12 \[Unit Test\] Complete all providers unit tests
* \#14 \[Unit Test\] SQLite mini-ORM unit tests and cleanup
* \#16 Add support for bucket name in Sia provider
* \#17 Update to common c++ build system
  * A single 64-bit Linux Jenkins server is used to build all Linux and Windows versions
  * All dependency sources are now included
  * MSVC is no longer supported
  * MSYS2 is required for building Windows binaries on Windows
  * OS X support is temporarily disabled
* \#19 \[bug\] Rename file is broken for files that are existing
* \#23 \[bug\] Incorrect file size displayed while upload is pending
* \#24 RocksDB implementations should be transactional
* \#25 Writes should block when maximum cache size is reached
* \#26 Complete ring buffer and direct download support

### Changes from v2.0.1-rc

* Ability to choose between RocksDB and SQLite databases
* Added direct reads and implemented download fallback
* Corrected file times on S3 and Sia providers
* Corrected handling of `chown()` and `chmod()`
* Fixed erroneous download of chunks after resize

Reviewed-on: #27
2024-12-28 15:56:40 -06:00
15 changed files with 371 additions and 319 deletions

View File

@ -27,7 +27,6 @@
* Ability to choose between RocksDB and SQLite databases * Ability to choose between RocksDB and SQLite databases
* Added direct reads and implemented download fallback * Added direct reads and implemented download fallback
* Comprehensive WinFSP and FUSE unit tests, including remote testing
* Corrected file times on S3 and Sia providers * Corrected file times on S3 and Sia providers
* Corrected handling of `chown()` and `chmod()` * Corrected handling of `chown()` and `chmod()`
* Fixed erroneous download of chunks after resize * Fixed erroneous download of chunks after resize

View File

@ -22,7 +22,7 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#include "file_manager/ring_file_base.hpp" #include "file_manager/ring_buffer_base.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
@ -30,7 +30,7 @@ namespace repertory {
class i_provider; class i_provider;
class i_upload_manager; class i_upload_manager;
class direct_open_file final : public ring_file_base { class direct_open_file final : public ring_buffer_base {
public: public:
direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider); filesystem_item fsi, i_provider &provider);
@ -49,20 +49,33 @@ private:
std::array<data_buffer, min_ring_size> ring_data_; std::array<data_buffer, min_ring_size> ring_data_;
protected: protected:
[[nodiscard]] auto handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
[[nodiscard]] auto on_check_start() -> bool override; [[nodiscard]] auto on_check_start() -> bool override;
[[nodiscard]] auto [[nodiscard]] auto
use_buffer(std::size_t chunk, on_chunk_downloaded(std::size_t /* chunk */,
std::function<api_error(const data_buffer &buffer)> func) const data_buffer & /* buffer */) -> api_error override {
return api_error::success;
}
[[nodiscard]] auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override; -> api_error override;
public: public:
[[nodiscard]] auto get_source_path() const -> std::string override { [[nodiscard]] auto native_operation(native_operation_callback /* callback */)
return "direct"; -> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
} }
}; };
} // namespace repertory } // namespace repertory

View File

@ -25,6 +25,7 @@
#include "file_manager/open_file_base.hpp" #include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory { namespace repertory {
class i_provider; class i_provider;
@ -68,11 +69,11 @@ private:
private: private:
bool allocated{false}; bool allocated{false};
std::unique_ptr<utils::file::i_file> nf_;
bool notified_{false}; bool notified_{false};
std::size_t read_chunk_{}; std::size_t read_chunk_{};
boost::dynamic_bitset<> read_state_; boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_; std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
mutable std::recursive_mutex rw_mtx_; mutable std::recursive_mutex rw_mtx_;
stop_type stop_requested_{false}; stop_type stop_requested_{false};
@ -93,7 +94,7 @@ private:
void set_read_state(boost::dynamic_bitset<> read_state); void set_read_state(boost::dynamic_bitset<> read_state);
void update_background_reader(std::size_t read_chunk); void update_reader(std::size_t chunk);
public: public:
auto close() -> bool override; auto close() -> bool override;

View File

@ -24,8 +24,6 @@
#include "file_manager/i_open_file.hpp" #include "file_manager/i_open_file.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory { namespace repertory {
class i_provider; class i_provider;
@ -107,6 +105,7 @@ private:
i_provider &provider_; i_provider &provider_;
private: private:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
api_error error_{api_error::success}; api_error error_{api_error::success};
mutable std::mutex error_mtx_; mutable std::mutex error_mtx_;
mutable std::recursive_mutex file_mtx_; mutable std::recursive_mutex file_mtx_;
@ -121,16 +120,17 @@ private:
bool modified_{false}; bool modified_{false};
bool removed_{false}; bool removed_{false};
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
std::unique_ptr<utils::file::i_file> nf_;
private: private:
void file_io_thread(); void file_io_thread();
protected: protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error; [[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
[[nodiscard]] auto get_active_downloads()
-> std::unordered_map<std::size_t, std::shared_ptr<download>> & {
return active_downloads_;
}
[[nodiscard]] auto get_mutex() const -> std::recursive_mutex & { [[nodiscard]] auto get_mutex() const -> std::recursive_mutex & {
return file_mtx_; return file_mtx_;
} }

View File

@ -19,37 +19,39 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_
#include "file_manager/open_file_base.hpp" #include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/file.hpp"
namespace repertory { namespace repertory {
class i_provider; class i_provider;
class i_upload_manager; class i_upload_manager;
class ring_file_base : public open_file_base { class ring_buffer_base : public open_file_base {
public: public:
ring_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout, ring_buffer_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider, filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io); std::size_t ring_size, bool disable_io);
~ring_file_base() override = default; ~ring_buffer_base() override = default;
public:
ring_buffer_base() = delete;
ring_buffer_base(const ring_buffer_base &) noexcept = delete;
ring_buffer_base(ring_buffer_base &&) noexcept = delete;
auto operator=(ring_buffer_base &&) noexcept -> ring_buffer_base & = delete;
auto
operator=(const ring_buffer_base &) noexcept -> ring_buffer_base & = delete;
public: public:
static constexpr const auto min_ring_size{5U}; static constexpr const auto min_ring_size{5U};
public:
ring_file_base() = delete;
ring_file_base(const ring_file_base &) noexcept = delete;
ring_file_base(ring_file_base &&) noexcept = delete;
auto operator=(ring_file_base &&) noexcept -> ring_file_base & = delete;
auto operator=(const ring_file_base &) noexcept -> ring_file_base & = delete;
private: private:
boost::dynamic_bitset<> ring_state_; boost::dynamic_bitset<> read_state_;
std::size_t total_chunks_; std::size_t total_chunks_;
private: private:
@ -72,34 +74,30 @@ private:
void update_position(std::size_t count, bool is_forward); void update_position(std::size_t count, bool is_forward);
protected: protected:
[[nodiscard]] auto get_read_state_size() const -> std::size_t; [[nodiscard]] auto has_reader_thread() const -> bool {
[[nodiscard]] virtual auto handle_read_buffer(
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error = 0;
[[nodiscard]] auto has_reader_thread() -> bool {
return reader_thread_ != nullptr; return reader_thread_ != nullptr;
} }
[[nodiscard]] auto get_ring_size() const -> std::size_t {
return read_state_.size();
}
[[nodiscard]] virtual auto on_check_start() -> bool = 0; [[nodiscard]] virtual auto on_check_start() -> bool = 0;
[[nodiscard]] virtual auto [[nodiscard]] virtual auto
on_chunk_downloaded(std::size_t /* chunk */, on_chunk_downloaded(std::size_t chunk,
const data_buffer & /* buffer */) -> api_error { const data_buffer &buffer) -> api_error = 0;
return api_error::success;
} [[nodiscard]] virtual auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error = 0;
[[nodiscard]] virtual auto [[nodiscard]] virtual auto
use_buffer(std::size_t chunk, use_buffer(std::size_t chunk,
std::function<api_error(const data_buffer &buffer)> func) std::function<api_error(data_buffer &)> func) -> api_error = 0;
-> api_error = 0;
public: public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool;
auto close() -> bool override; auto close() -> bool override;
void forward(std::size_t count); void forward(std::size_t count);
@ -128,15 +126,6 @@ public:
return false; return false;
} }
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset, [[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override; data_buffer &data) -> api_error override;
@ -158,4 +147,4 @@ public:
}; };
} // namespace repertory } // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_ #endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_BASE_HPP_

View File

@ -22,15 +22,16 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ #ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_ #define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#include "file_manager/ring_file_base.hpp" #include "file_manager/ring_buffer_base.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/file.hpp"
namespace repertory { namespace repertory {
class i_provider; class i_provider;
class i_upload_manager; class i_upload_manager;
class ring_buffer_open_file final : public ring_file_base { class ring_buffer_open_file final : public ring_buffer_base {
public: public:
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size, ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, std::uint8_t chunk_timeout, filesystem_item fsi,
@ -50,11 +51,10 @@ public:
private: private:
std::string source_path_; std::string source_path_;
protected: private:
[[nodiscard]] auto handle_read_buffer( std::unique_ptr<utils::file::i_file> nf_;
std::size_t chunk,
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
protected:
[[nodiscard]] auto on_check_start() -> bool override; [[nodiscard]] auto on_check_start() -> bool override;
[[nodiscard]] auto [[nodiscard]] auto
@ -62,11 +62,28 @@ protected:
const data_buffer &buffer) -> api_error override; const data_buffer &buffer) -> api_error override;
[[nodiscard]] auto [[nodiscard]] auto
use_buffer(std::size_t chunk, on_read_chunk(std::size_t chunk, std::size_t read_size,
std::function<api_error(const data_buffer &buffer)> func) std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override; -> api_error override;
public: public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool;
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto get_source_path() const -> std::string override { [[nodiscard]] auto get_source_path() const -> std::string override {
return source_path_; return source_path_;
} }

View File

@ -81,8 +81,8 @@ auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid,
struct fuse_file_info * /*file_info*/) struct fuse_file_info * /*file_info*/)
-> api_error { -> api_error {
#else #else
auto fuse_drive::chown_impl(std::string api_path, uid_t uid, gid_t gid) auto fuse_drive::chown_impl(std::string api_path, uid_t uid,
-> api_error { gid_t gid) -> api_error {
#endif #endif
return check_and_perform( return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error { api_path, X_OK, [&](api_meta_map &meta) -> api_error {
@ -481,8 +481,8 @@ auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st,
struct fuse_file_info * /*file_info*/) struct fuse_file_info * /*file_info*/)
-> api_error { -> api_error {
#else #else
auto fuse_drive::getattr_impl(std::string api_path, struct stat *unix_st) auto fuse_drive::getattr_impl(std::string api_path,
-> api_error { struct stat *unix_st) -> api_error {
#endif #endif
auto parent = utils::path::get_parent_api_path(api_path); auto parent = utils::path::get_parent_api_path(api_path);
@ -565,8 +565,8 @@ auto fuse_drive::getxtimes_impl(std::string api_path, struct timespec *bkuptime,
#endif // __APPLE__ #endif // __APPLE__
#if FUSE_USE_VERSION >= 30 #if FUSE_USE_VERSION >= 30
auto fuse_drive::init_impl(struct fuse_conn_info *conn, struct fuse_config *cfg) auto fuse_drive::init_impl(struct fuse_conn_info *conn,
-> void * { struct fuse_config *cfg) -> void * {
#else #else
void *fuse_drive::init_impl(struct fuse_conn_info *conn) { void *fuse_drive::init_impl(struct fuse_conn_info *conn) {
#endif #endif
@ -800,9 +800,8 @@ auto fuse_drive::release_impl(std::string /*api_path*/,
return api_error::success; return api_error::success;
} }
auto fuse_drive::releasedir_impl(std::string /*api_path*/, auto fuse_drive::releasedir_impl(
struct fuse_file_info *file_info) std::string /*api_path*/, struct fuse_file_info *file_info) -> api_error {
-> api_error {
auto iter = directory_cache_->get_directory(file_info->fh); auto iter = directory_cache_->get_directory(file_info->fh);
if (iter == nullptr) { if (iter == nullptr) {
return api_error::invalid_handle; return api_error::invalid_handle;
@ -820,8 +819,8 @@ auto fuse_drive::rename_directory(const std::string &from_api_path,
} }
auto fuse_drive::rename_file(const std::string &from_api_path, auto fuse_drive::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite) const std::string &to_api_path,
-> int { bool overwrite) -> int {
auto res = fm_->rename_file(from_api_path, to_api_path, overwrite); auto res = fm_->rename_file(from_api_path, to_api_path, overwrite);
errno = std::abs(utils::from_api_error(res)); errno = std::abs(utils::from_api_error(res));
return (res == api_error::success) ? 0 : -1; return (res == api_error::success) ? 0 : -1;
@ -831,8 +830,8 @@ auto fuse_drive::rename_file(const std::string &from_api_path,
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path, auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path,
unsigned int /*flags*/) -> api_error { unsigned int /*flags*/) -> api_error {
#else #else
auto fuse_drive::rename_impl(std::string from_api_path, std::string to_api_path) auto fuse_drive::rename_impl(std::string from_api_path,
-> api_error { std::string to_api_path) -> api_error {
#endif #endif
auto res = check_parent_access(to_api_path, W_OK | X_OK); auto res = check_parent_access(to_api_path, W_OK | X_OK);
if (res != api_error::success) { if (res != api_error::success) {
@ -946,15 +945,15 @@ auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
} }
#else // __APPLE__ #else // __APPLE__
auto fuse_drive::getxattr_impl(std::string api_path, const char *name, auto fuse_drive::getxattr_impl(std::string api_path, const char *name,
char *value, size_t size, int &attribute_size) char *value, size_t size,
-> api_error { int &attribute_size) -> api_error {
return getxattr_common(api_path, name, value, size, attribute_size, nullptr); return getxattr_common(api_path, name, value, size, attribute_size, nullptr);
} }
#endif // __APPLE__ #endif // __APPLE__
auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size, auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
int &required_size, bool &return_size) int &required_size,
-> api_error { bool &return_size) -> api_error {
auto check_size = (size == 0); auto check_size = (size == 0);
auto res = check_parent_access(api_path, X_OK); auto res = check_parent_access(api_path, X_OK);
@ -994,8 +993,8 @@ auto fuse_drive::listxattr_impl(std::string api_path, char *buffer, size_t size,
return res; return res;
} }
auto fuse_drive::removexattr_impl(std::string api_path, const char *name) auto fuse_drive::removexattr_impl(std::string api_path,
-> api_error { const char *name) -> api_error {
std::string attribute_name; std::string attribute_name;
#if defined(__APPLE__) #if defined(__APPLE__)
auto res = parse_xattr_parameters(name, 0, attribute_name, api_path); auto res = parse_xattr_parameters(name, 0, attribute_name, api_path);
@ -1023,8 +1022,8 @@ auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
uint32_t position) -> api_error { uint32_t position) -> api_error {
#else // __APPLE__ #else // __APPLE__
auto fuse_drive::setxattr_impl(std::string api_path, const char *name, auto fuse_drive::setxattr_impl(std::string api_path, const char *name,
const char *value, size_t size, int flags) const char *value, size_t size,
-> api_error { int flags) -> api_error {
#endif #endif
std::string attribute_name; std::string attribute_name;
#if defined(__APPLE__) #if defined(__APPLE__)
@ -1102,8 +1101,8 @@ void fuse_drive::set_item_meta(const std::string &api_path,
} }
#if defined(__APPLE__) #if defined(__APPLE__)
auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr) auto fuse_drive::setattr_x_impl(std::string api_path,
-> api_error { struct setattr_x *attr) -> api_error {
bool exists{}; bool exists{};
auto res = provider_.is_file(api_path, exists); auto res = provider_.is_file(api_path, exists);
if (res != api_error::success) { if (res != api_error::success) {
@ -1157,7 +1156,7 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
ts[0].tv_sec = attr->acctime.tv_sec; ts[0].tv_sec = attr->acctime.tv_sec;
ts[0].tv_nsec = attr->acctime.tv_nsec; ts[0].tv_nsec = attr->acctime.tv_nsec;
} else { } else {
struct timeval tv{}; struct timeval tv {};
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
ts[0].tv_sec = tv.tv_sec; ts[0].tv_sec = tv.tv_sec;
ts[0].tv_nsec = tv.tv_usec * 1000; ts[0].tv_nsec = tv.tv_usec * 1000;
@ -1202,9 +1201,8 @@ auto fuse_drive::setattr_x_impl(std::string api_path, struct setattr_x *attr)
return api_error::success; return api_error::success;
} }
auto fuse_drive::setbkuptime_impl(std::string api_path, auto fuse_drive::setbkuptime_impl(
const struct timespec *bkuptime) std::string api_path, const struct timespec *bkuptime) -> api_error {
-> api_error {
return check_and_perform( return check_and_perform(
api_path, X_OK, [&](api_meta_map &meta) -> api_error { api_path, X_OK, [&](api_meta_map &meta) -> api_error {
auto nanos = bkuptime->tv_nsec + auto nanos = bkuptime->tv_nsec +
@ -1240,8 +1238,8 @@ auto fuse_drive::setvolname_impl(const char * /*volname*/) -> api_error {
return api_error::success; return api_error::success;
} }
auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf) auto fuse_drive::statfs_x_impl(std::string /*api_path*/,
-> api_error { struct statfs *stbuf) -> api_error {
if (statfs(&config_.get_cache_directory()[0], stbuf) != 0) { if (statfs(&config_.get_cache_directory()[0], stbuf) != 0) {
return api_error::os_error; return api_error::os_error;
} }
@ -1266,8 +1264,8 @@ auto fuse_drive::statfs_x_impl(std::string /*api_path*/, struct statfs *stbuf)
return api_error::success; return api_error::success;
} }
#else // __APPLE__ #else // __APPLE__
auto fuse_drive::statfs_impl(std::string /*api_path*/, struct statvfs *stbuf) auto fuse_drive::statfs_impl(std::string /*api_path*/,
-> api_error { struct statvfs *stbuf) -> api_error {
if (statvfs(config_.get_cache_directory().data(), stbuf) != 0) { if (statvfs(config_.get_cache_directory().data(), stbuf) != 0) {
return api_error::os_error; return api_error::os_error;
} }
@ -1317,6 +1315,10 @@ auto fuse_drive::truncate_impl(std::string api_path, off_t size) -> api_error {
return res; return res;
} }
if (not fm_->get_open_file(handle, true, open_file)) {
return api_error::invalid_handle;
}
res = open_file->resize(static_cast<std::uint64_t>(size)); res = open_file->resize(static_cast<std::uint64_t>(size));
} }
@ -1347,8 +1349,8 @@ auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2],
struct fuse_file_info * /*file_info*/) struct fuse_file_info * /*file_info*/)
-> api_error { -> api_error {
#else #else
auto fuse_drive::utimens_impl(std::string api_path, const struct timespec tv[2]) auto fuse_drive::utimens_impl(std::string api_path,
-> api_error { const struct timespec tv[2]) -> api_error {
#endif #endif
api_meta_map meta; api_meta_map meta;
auto res = provider_.get_item_meta(api_path, meta); auto res = provider_.get_item_meta(api_path, meta);

View File

@ -21,28 +21,43 @@
*/ */
#include "file_manager/direct_open_file.hpp" #include "file_manager/direct_open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
namespace repertory { namespace repertory {
direct_open_file::direct_open_file(std::uint64_t chunk_size, direct_open_file::direct_open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider) filesystem_item fsi, i_provider &provider)
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, min_ring_size, : ring_buffer_base(chunk_size, chunk_timeout, fsi, provider,
true) {} min_ring_size, true) {}
direct_open_file::~direct_open_file() { close(); } direct_open_file::~direct_open_file() {
REPERTORY_USES_FUNCTION_NAME();
auto direct_open_file::handle_read_buffer( close();
std::size_t chunk,
std::function<api_error(data_buffer &data)> func) -> api_error {
return func(ring_data_.at(chunk % get_read_state_size()));
} }
auto direct_open_file::on_check_start() -> bool { auto direct_open_file::on_check_start() -> bool {
return (get_file_size() == 0U || has_reader_thread()); return (get_file_size() == 0U || has_reader_thread());
} }
auto direct_open_file::use_buffer( auto direct_open_file::on_read_chunk(std::size_t chunk, std::size_t read_size,
std::size_t chunk, std::uint64_t read_offset,
std::function<api_error(const data_buffer &data)> func) -> api_error { data_buffer &data,
return func(ring_data_.at(chunk % get_read_state_size())); std::size_t &bytes_read) -> api_error {
auto &buffer = ring_data_.at(chunk % get_ring_size());
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(read_size));
data.insert(data.end(), begin, end);
bytes_read = read_size;
return api_error::success;
}
auto direct_open_file::use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error {
return func(ring_data_.at(chunk % get_ring_size()));
} }
} // namespace repertory } // namespace repertory

View File

@ -39,7 +39,6 @@
#include "utils/file.hpp" #include "utils/file.hpp"
#include "utils/path.hpp" #include "utils/path.hpp"
#include "utils/polling.hpp" #include "utils/polling.hpp"
#include "utils/time.hpp"
namespace repertory { namespace repertory {
file_manager::file_manager(app_config &config, i_provider &provider) file_manager::file_manager(app_config &config, i_provider &provider)
@ -220,7 +219,7 @@ auto file_manager::get_open_file_by_handle(std::uint64_t handle) const
-> std::shared_ptr<i_closeable_open_file> { -> std::shared_ptr<i_closeable_open_file> {
auto file_iter = auto file_iter =
std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(), std::find_if(open_file_lookup_.begin(), open_file_lookup_.end(),
[&handle](const auto &item) -> bool { [&handle](auto &&item) -> bool {
return item.second->has_handle(handle); return item.second->has_handle(handle);
}); });
return (file_iter == open_file_lookup_.end()) ? nullptr : file_iter->second; return (file_iter == open_file_lookup_.end()) ? nullptr : file_iter->second;
@ -381,11 +380,10 @@ auto file_manager::open(const std::string &api_path, bool directory,
return open(api_path, directory, ofd, handle, file, nullptr); return open(api_path, directory, ofd, handle, file, nullptr);
} }
auto file_manager::open(const std::string &api_path, bool directory, auto file_manager::open(
const open_file_data &ofd, std::uint64_t &handle, const std::string &api_path, bool directory, const open_file_data &ofd,
std::shared_ptr<i_open_file> &file, std::uint64_t &handle, std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file) std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error {
-> api_error {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
const auto create_and_add_handle = const auto create_and_add_handle =
@ -437,7 +435,7 @@ auto file_manager::open(const std::string &api_path, bool directory,
auto ring_size{ring_buffer_file_size / chunk_size}; auto ring_size{ring_buffer_file_size / chunk_size};
const auto get_download_type = [&](download_type type) -> download_type { const auto get_download_type = [&](download_type type) -> download_type {
if (directory || fsi.size == 0U) { if (directory || fsi.size == 0U || is_processing(api_path)) {
return download_type::default_; return download_type::default_;
} }
@ -749,8 +747,8 @@ auto file_manager::rename_directory(const std::string &from_api_path,
} }
auto file_manager::rename_file(const std::string &from_api_path, auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite) const std::string &to_api_path,
-> api_error { bool overwrite) -> api_error {
if (not provider_.is_rename_supported()) { if (not provider_.is_rename_supported()) {
return api_error::not_implemented; return api_error::not_implemented;
} }

View File

@ -28,13 +28,10 @@
#include "platform/platform.hpp" #include "platform/platform.hpp"
#include "providers/i_provider.hpp" #include "providers/i_provider.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/common.hpp" #include "utils/common.hpp"
#include "utils/error_utils.hpp" #include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp" #include "utils/path.hpp"
#include "utils/time.hpp" #include "utils/time.hpp"
#include "utils/utils.hpp"
namespace repertory { namespace repertory {
open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout, open_file::open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
@ -281,12 +278,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto read_state = get_read_state(); auto read_state = get_read_state();
if ((get_api_error() == api_error::success) && (chunk < read_state.size()) && if ((get_api_error() == api_error::success) && (chunk < read_state.size()) &&
not read_state[chunk]) { not read_state[chunk]) {
if (active_downloads_.find(chunk) != active_downloads_.end()) { if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) { if (skip_active) {
return; return;
} }
auto active_download = active_downloads_.at(chunk); auto active_download = get_active_downloads().at(chunk);
rw_lock.unlock(); rw_lock.unlock();
active_download->wait(); active_download->wait();
@ -296,12 +293,12 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto data_offset = chunk * get_chunk_size(); auto data_offset = chunk * get_chunk_size();
auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size() auto data_size = (chunk == read_state.size() - 1U) ? get_last_chunk_size()
: get_chunk_size(); : get_chunk_size();
if (active_downloads_.empty() && (read_state.count() == 0U)) { if (get_active_downloads().empty() && (read_state.count() == 0U)) {
event_system::instance().raise<download_begin>(get_api_path(), event_system::instance().raise<download_begin>(get_api_path(),
get_source_path()); get_source_path());
} }
active_downloads_[chunk] = std::make_shared<download>(); get_active_downloads()[chunk] = std::make_shared<download>();
rw_lock.unlock(); rw_lock.unlock();
if (should_reset) { if (should_reset) {
@ -314,8 +311,8 @@ void open_file::download_chunk(std::size_t chunk, bool skip_active,
auto state = get_read_state(); auto state = get_read_state();
unique_recur_mutex_lock lock(rw_mtx_); unique_recur_mutex_lock lock(rw_mtx_);
auto active_download = active_downloads_.at(chunk); auto active_download = get_active_downloads().at(chunk);
active_downloads_.erase(chunk); get_active_downloads().erase(chunk);
if (get_api_error() == api_error::success) { if (get_api_error() == api_error::success) {
auto progress = (static_cast<double>(state.count()) / auto progress = (static_cast<double>(state.count()) /
static_cast<double>(state.size())) * static_cast<double>(state.size())) *
@ -452,7 +449,7 @@ auto open_file::native_operation(
auto read_state = get_read_state(); auto read_state = get_read_state();
if (not is_empty_file && (last_chunk < read_state.size())) { if (not is_empty_file && (last_chunk < read_state.size())) {
rw_lock.unlock(); rw_lock.unlock();
update_background_reader(0U); update_reader(0U);
download_chunk(last_chunk, false, true); download_chunk(last_chunk, false, true);
if (get_api_error() != api_error::success) { if (get_api_error() != api_error::success) {
@ -578,7 +575,7 @@ auto open_file::read(std::size_t read_size, std::uint64_t read_offset,
auto end_chunk = auto end_chunk =
static_cast<std::size_t>((read_size + read_offset) / get_chunk_size()); static_cast<std::size_t>((read_size + read_offset) / get_chunk_size());
update_background_reader(begin_chunk); update_reader(begin_chunk);
download_range(begin_chunk, end_chunk, true); download_range(begin_chunk, end_chunk, true);
if (get_api_error() != api_error::success) { if (get_api_error() != api_error::success) {
@ -655,18 +652,23 @@ void open_file::set_read_state(boost::dynamic_bitset<> read_state) {
read_state_ = std::move(read_state); read_state_ = std::move(read_state);
} }
void open_file::update_background_reader(std::size_t read_chunk) { void open_file::update_reader(std::size_t chunk) {
recur_mutex_lock rw_lock(rw_mtx_); recur_mutex_lock rw_lock(rw_mtx_);
read_chunk_ = read_chunk; read_chunk_ = chunk;
if (reader_thread_ || stop_requested_) { if (reader_thread_ || stop_requested_) {
return; return;
} }
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread_ = std::make_unique<std::thread>([this]() {
std::size_t next_chunk{}; unique_recur_mutex_lock lock(rw_mtx_);
auto next_chunk{read_chunk_};
auto read_chunk{read_chunk_};
lock.unlock();
while (not stop_requested_) { while (not stop_requested_) {
unique_recur_mutex_lock lock(rw_mtx_); lock.lock();
auto read_state = get_read_state(); auto read_state = get_read_state();
if ((get_file_size() == 0U) || read_state.all()) { if ((get_file_size() == 0U) || read_state.all()) {
lock.unlock(); lock.unlock();
@ -674,12 +676,11 @@ void open_file::update_background_reader(std::size_t read_chunk) {
continue; continue;
} }
do { if (read_chunk != read_chunk_) {
next_chunk = read_chunk_ = next_chunk = read_chunk = read_chunk_;
((read_chunk_ + 1U) >= read_state.size()) ? 0U : read_chunk_ + 1U; }
} while ((next_chunk != 0U) &&
(active_downloads_.find(next_chunk) != active_downloads_.end()));
next_chunk = next_chunk + 1U >= read_state.size() ? 0U : next_chunk + 1U;
lock.unlock(); lock.unlock();
download_chunk(next_chunk, true, false); download_chunk(next_chunk, true, false);
@ -714,7 +715,7 @@ auto open_file::write(std::uint64_t write_offset, const data_buffer &data,
auto end_chunk = auto end_chunk =
static_cast<std::size_t>((write_offset + data.size()) / get_chunk_size()); static_cast<std::size_t>((write_offset + data.size()) / get_chunk_size());
update_background_reader(begin_chunk); update_reader(begin_chunk);
download_range(begin_chunk, std::min(get_read_state().size() - 1U, end_chunk), download_range(begin_chunk, std::min(get_read_state().size() - 1U, end_chunk),
true); true);

View File

@ -19,7 +19,7 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "file_manager/ring_file_base.hpp" #include "file_manager/ring_buffer_base.hpp"
#include "events/event_system.hpp" #include "events/event_system.hpp"
#include "file_manager/events.hpp" #include "file_manager/events.hpp"
@ -31,41 +31,33 @@
#include "utils/error_utils.hpp" #include "utils/error_utils.hpp"
namespace repertory { namespace repertory {
ring_file_base::ring_file_base(std::uint64_t chunk_size, ring_buffer_base::ring_buffer_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, std::uint8_t chunk_timeout,
i_provider &provider, std::size_t ring_size, filesystem_item fsi, i_provider &provider,
bool disable_io) std::size_t ring_size, bool disable_io)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io), : open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io),
ring_state_(ring_size), read_state_(ring_size),
total_chunks_(static_cast<std::size_t>( total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) { utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (disable_io) { if (disable_io) {
if (fsi.size > 0U) { if (fsi.size > 0U) {
ring_state_.resize(std::min(total_chunks_, ring_state_.size())); read_state_.resize(std::min(total_chunks_, read_state_.size()));
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
read_state_.set(0U, read_state_.size(), false);
} }
} else { } else {
if (ring_state_.size() < min_ring_size) { if (ring_size < min_ring_size) {
throw std::runtime_error(fmt::format( throw std::runtime_error("ring size must be greater than or equal to 5");
"ring size must be greater than or equal to {}", min_ring_size));
} }
if (not can_handle_file(fsi.size, chunk_size, get_read_state_size())) { ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
throw std::runtime_error("file size is less than ring buffer size"); read_state_.set(0U, ring_size, false);
}
} }
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), false);
} }
auto ring_file_base::can_handle_file(std::uint64_t file_size, auto ring_buffer_base::check_start() -> api_error {
std::size_t chunk_size,
std::size_t ring_size) -> bool {
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
}
auto ring_file_base::check_start() -> api_error {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
try { try {
@ -82,30 +74,29 @@ auto ring_file_base::check_start() -> api_error {
utils::error::raise_api_path_error(function_name, get_api_path(), utils::error::raise_api_path_error(function_name, get_api_path(),
get_source_path(), ex, get_source_path(), ex,
"failed to start"); "failed to start");
return api_error::error;
} }
return api_error::error;
} }
auto ring_file_base::close() -> bool { auto ring_buffer_base::close() -> bool {
stop_requested_ = true; stop_requested_ = true;
unique_mutex_lock chunk_lock(chunk_mtx_); unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
chunk_lock.unlock(); chunk_lock.unlock();
auto ret = open_file_base::close(); auto res = open_file_base::close();
if (reader_thread_) { if (reader_thread_) {
reader_thread_->join(); reader_thread_->join();
reader_thread_.reset(); reader_thread_.reset();
} }
return ret; return res;
} }
auto ring_file_base::download_chunk(std::size_t chunk, auto ring_buffer_base::download_chunk(std::size_t chunk,
bool skip_active) -> api_error { bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_); unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() { const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all(); chunk_notify_.notify_all();
@ -122,86 +113,78 @@ auto ring_file_base::download_chunk(std::size_t chunk,
return unlock_and_return(api_error::invalid_ring_buffer_position); return unlock_and_return(api_error::invalid_ring_buffer_position);
} }
if (active_downloads_.find(chunk) != active_downloads_.end()) { if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) { if (skip_active) {
return unlock_and_return(api_error::success); return unlock_and_return(api_error::success);
} }
auto active_download = active_downloads_.at(chunk); auto active_download = get_active_downloads().at(chunk);
unlock_and_notify(); unlock_and_notify();
return active_download->wait(); return active_download->wait();
} }
if (ring_state_[chunk % ring_state_.size()]) { if (read_state_[chunk % read_state_.size()]) {
return unlock_and_return(api_error::success); return unlock_and_return(api_error::success);
} }
auto active_download{std::make_shared<download>()}; auto active_download{std::make_shared<download>()};
active_downloads_[chunk] = active_download; get_active_downloads()[chunk] = active_download;
auto res = handle_read_buffer(chunk, [&](auto &&buffer) { return use_buffer(chunk, [&](data_buffer &buffer) -> api_error {
auto data_offset{chunk * get_chunk_size()}; auto data_offset{chunk * get_chunk_size()};
auto data_size{ auto data_size{
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() chunk == (total_chunks_ - 1U) ? get_last_chunk_size()
: get_chunk_size(), : get_chunk_size(),
}; };
unlock_and_notify();
auto result{ auto result{
get_provider().read_file_bytes(get_api_path(), data_size, data_offset, get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
buffer, stop_requested_), buffer, stop_requested_),
}; };
if (result != api_error::success) { chunk_lock.lock();
return result; if (chunk < ring_begin_ || chunk > ring_end_) {
} result = api_error::invalid_ring_buffer_position;
result = on_chunk_downloaded(chunk, buffer);
if (result != api_error::success) {
return result;
} }
ring_state_[chunk % ring_state_.size()] = true; if (result == api_error::success) {
auto progress = result = on_chunk_downloaded(chunk, buffer);
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) * if (result == api_error::success) {
100.0; read_state_[chunk % read_state_.size()] = true;
event_system::instance().raise<download_progress>( auto progress = (static_cast<double>(chunk + 1U) /
get_api_path(), get_source_path(), progress); static_cast<double>(total_chunks_)) *
return api_error::success; 100.0;
event_system::instance().raise<download_progress>(
get_api_path(), get_source_path(), progress);
}
}
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(result);
return result;
}); });
active_downloads_.erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
} }
void ring_file_base::forward(std::size_t count) { void ring_buffer_base::forward(std::size_t count) {
update_position(count, true); update_position(count, true);
} }
auto ring_file_base::get_read_state() const -> boost::dynamic_bitset<> { auto ring_buffer_base::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex()); recur_mutex_lock file_lock(get_mutex());
return ring_state_; return read_state_;
} }
auto ring_file_base::get_read_state(std::size_t chunk) const -> bool { auto ring_buffer_base::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex()); recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()]; return read_state_[chunk % read_state_.size()];
} }
auto ring_file_base::get_read_state_size() const -> std::size_t { auto ring_buffer_base::read(std::size_t read_size, std::uint64_t read_offset,
recur_mutex_lock file_lock(get_mutex()); data_buffer &data) -> api_error {
return ring_state_.size();
}
auto ring_file_base::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) { if (is_directory()) {
return api_error::invalid_operation; return api_error::invalid_operation;
} }
@ -236,8 +219,7 @@ auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
res = download_chunk(chunk, false); res = download_chunk(chunk, false);
if (res != api_error::success) { if (res != api_error::success) {
if (not stop_requested_ && if (res == api_error::invalid_ring_buffer_position) {
res == api_error::invalid_ring_buffer_position) {
read_lock.unlock(); read_lock.unlock();
// TODO limit retry // TODO limit retry
@ -249,31 +231,28 @@ auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
reset_timeout(); reset_timeout();
auto to_read{ std::size_t bytes_read{};
res = on_read_chunk(
chunk,
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset), std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
read_size), read_size),
}; read_offset, data, bytes_read);
if (res != api_error::success) {
res = use_buffer( return res;
chunk, [&data, &read_offset, &to_read](auto &&buffer) -> api_error { }
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
data.insert(data.end(), begin, end);
return api_error::success;
});
reset_timeout(); reset_timeout();
read_size -= to_read;
read_size -= bytes_read;
read_offset = 0U; read_offset = 0U;
} }
return stop_requested_ ? api_error::download_stopped : res; return stop_requested_ ? api_error::download_stopped : res;
} }
void ring_file_base::reader_thread() { void ring_buffer_base::reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_); unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_; auto next_chunk{ring_pos_};
chunk_notify_.notify_all(); chunk_notify_.notify_all();
chunk_lock.unlock(); chunk_lock.unlock();
@ -297,7 +276,7 @@ void ring_file_base::reader_thread() {
chunk_lock.unlock(); chunk_lock.unlock();
}; };
if (ring_state_[next_chunk % ring_state_.size()]) { if (read_state_[next_chunk % read_state_.size()]) {
check_and_wait(); check_and_wait();
continue; continue;
} }
@ -306,20 +285,17 @@ void ring_file_base::reader_thread() {
chunk_lock.unlock(); chunk_lock.unlock();
download_chunk(next_chunk, true); download_chunk(next_chunk, true);
chunk_lock.lock();
check_and_wait();
} }
event_system::instance().raise<download_end>( event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped); get_api_path(), get_source_path(), api_error::download_stopped);
} }
void ring_file_base::reverse(std::size_t count) { void ring_buffer_base::reverse(std::size_t count) {
update_position(count, false); update_position(count, false);
} }
void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) { void ring_buffer_base::set(std::size_t first_chunk, std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_); mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) { if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all(); chunk_notify_.notify_all();
@ -328,7 +304,7 @@ void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
ring_begin_ = first_chunk; ring_begin_ = first_chunk;
ring_end_ = ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
if (current_chunk > ring_end_) { if (current_chunk > ring_end_) {
chunk_notify_.notify_all(); chunk_notify_.notify_all();
@ -337,18 +313,18 @@ void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
} }
ring_pos_ = current_chunk; ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), true); read_state_.set(0U, read_state_.size(), true);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
} }
void ring_file_base::set_api_path(const std::string &api_path) { void ring_buffer_base::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_); mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path); open_file_base::set_api_path(api_path);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
} }
void ring_file_base::update_position(std::size_t count, bool is_forward) { void ring_buffer_base::update_position(std::size_t count, bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_); mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) { if (is_forward) {
@ -366,25 +342,24 @@ void ring_file_base::update_position(std::size_t count, bool is_forward) {
auto delta = is_forward ? count - (ring_end_ - ring_pos_) auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_); : count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) { if (delta >= read_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false); read_state_.set(0U, read_state_.size(), false);
ring_pos_ += is_forward ? count : -count; ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta; ring_begin_ += is_forward ? delta : -delta;
} else { } else {
for (std::size_t idx = 0U; idx < delta; ++idx) { for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) { if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false; read_state_[(ring_begin_ + idx) % read_state_.size()] = false;
} else { } else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false; read_state_[(ring_end_ - idx) % read_state_.size()] = false;
} }
} }
ring_begin_ += is_forward ? delta : -delta; ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count; ring_pos_ += is_forward ? count : -count;
} }
ring_end_ = ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U); std::min(total_chunks_ - 1U, ring_begin_ + read_state_.size() - 1U);
} }
chunk_notify_.notify_all(); chunk_notify_.notify_all();

View File

@ -21,10 +21,13 @@
*/ */
#include "file_manager/ring_buffer_open_file.hpp" #include "file_manager/ring_buffer_open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "platform/platform.hpp" #include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp" #include "utils/common.hpp"
#include "utils/error_utils.hpp" #include "utils/error_utils.hpp"
#include "utils/file.hpp" #include "utils/path.hpp"
namespace repertory { namespace repertory {
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory, ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
@ -33,37 +36,45 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
filesystem_item fsi, filesystem_item fsi,
i_provider &provider, i_provider &provider,
std::size_t ring_size) std::size_t ring_size)
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, ring_size, : ring_buffer_base(chunk_size, chunk_timeout, fsi, provider, ring_size,
false), false),
source_path_(utils::path::combine(buffer_directory, source_path_(utils::path::combine(buffer_directory,
{ {
utils::create_uuid_string(), utils::create_uuid_string(),
})) {} })) {
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
}
ring_buffer_open_file::~ring_buffer_open_file() { ring_buffer_open_file::~ring_buffer_open_file() {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
close(); close();
if (nf_) { if (not nf_) {
nf_->close();
nf_.reset();
}
if (utils::file::file(get_source_path()).remove()) {
return; return;
} }
utils::error::raise_api_path_error( nf_->close();
function_name, get_api_path(), get_source_path(), nf_.reset();
utils::get_last_error_code(), "failed to delete file");
if (not utils::file::file(source_path_).remove()) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
utils::get_last_error_code(), "failed to delete file");
}
} }
auto ring_buffer_open_file::handle_read_buffer( auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
std::size_t /* chunk */, std::size_t chunk_size,
std::function<api_error(data_buffer &data)> func) -> api_error { std::size_t ring_size) -> bool {
data_buffer buffer; return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
return func(buffer); }
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
} }
auto ring_buffer_open_file::on_check_start() -> bool { auto ring_buffer_open_file::on_check_start() -> bool {
@ -73,25 +84,25 @@ auto ring_buffer_open_file::on_check_start() -> bool {
return true; return true;
} }
auto buffer_directory{utils::path::get_parent_path(get_source_path())}; auto buffer_directory{utils::path::get_parent_path(source_path_)};
if (not utils::file::directory(buffer_directory).create_directory()) { if (not utils::file::directory(buffer_directory).create_directory()) {
throw std::runtime_error( throw std::runtime_error(
fmt::format("failed to create buffer directory|path|{}|err|{}", fmt::format("failed to create buffer directory|path|{}|err|{}",
buffer_directory, utils::get_last_error_code())); buffer_directory, utils::get_last_error_code()));
} }
nf_ = utils::file::file::open_or_create_file(get_source_path()); nf_ = utils::file::file::open_or_create_file(source_path_);
if (not nf_ || not *nf_) { if (not nf_ || not *nf_) {
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}", throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
utils::get_last_error_code())); utils::get_last_error_code()));
} }
if (not nf_->truncate(get_read_state_size() * get_chunk_size())) { if (not nf_->truncate(get_ring_size() * get_chunk_size())) {
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
nf_->close(); nf_->close();
nf_.reset(); nf_.reset();
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
} }
return false; return false;
@ -101,7 +112,7 @@ auto ring_buffer_open_file::on_chunk_downloaded(
std::size_t chunk, const data_buffer &buffer) -> api_error { std::size_t chunk, const data_buffer &buffer) -> api_error {
return do_io([&]() -> api_error { return do_io([&]() -> api_error {
std::size_t bytes_written{}; std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % get_read_state_size()) * get_chunk_size(), if (nf_->write(buffer, (chunk % get_ring_size()) * get_chunk_size(),
&bytes_written)) { &bytes_written)) {
return api_error::success; return api_error::success;
} }
@ -110,26 +121,31 @@ auto ring_buffer_open_file::on_chunk_downloaded(
}); });
} }
auto ring_buffer_open_file::use_buffer( auto ring_buffer_open_file::on_read_chunk(
std::size_t chunk, std::size_t chunk, std::size_t read_size, std::uint64_t read_offset,
std::function<api_error(const data_buffer &data)> func) -> api_error { data_buffer &data, std::size_t &bytes_read) -> api_error {
data_buffer buffer; data_buffer buffer(read_size);
buffer.resize(get_chunk_size());
auto res = do_io([&]() -> api_error { auto res = do_io([&]() -> api_error {
std::size_t bytes_read{}; return nf_->read(
auto result = buffer,
nf_->read(buffer, (chunk % get_read_state_size()) * get_chunk_size(), (((chunk % get_ring_size()) * get_chunk_size()) + read_offset),
&bytes_read) &bytes_read)
? api_error::success ? api_error::success
: api_error::os_error; : api_error::os_error;
buffer.resize(bytes_read);
return result;
}); });
if (res != api_error::success) { if (res != api_error::success) {
return res; return res;
} }
data.insert(data.end(), buffer.begin(), buffer.end());
return api_error::success;
}
auto ring_buffer_open_file::use_buffer(
std::size_t /* chunk */,
std::function<api_error(data_buffer &)> func) -> api_error {
data_buffer buffer;
return func(buffer); return func(buffer);
} }
} // namespace repertory } // namespace repertory

View File

@ -52,6 +52,8 @@ auto from_api_error(const api_error &err) -> int {
return -EEXIST; return -EEXIST;
case api_error::file_in_use: case api_error::file_in_use:
return -EBUSY; return -EBUSY;
case api_error::invalid_handle:
return -EBADF;
case api_error::invalid_operation: case api_error::invalid_operation:
return -EINVAL; return -EINVAL;
case api_error::item_not_found: case api_error::item_not_found:

View File

@ -52,11 +52,14 @@ TEST_F(direct_open_file_test, read_full_file) {
fsi.directory = false; fsi.directory = false;
fsi.size = test_chunk_size * 32U; fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes) EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &source_file](
std::size_t size, std::uint64_t offset, const std::string & /* api_path */, std::size_t size,
data_buffer &data, std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -111,11 +114,14 @@ TEST_F(direct_open_file_test, read_full_file_in_reverse) {
fsi.directory = false; fsi.directory = false;
fsi.size = test_chunk_size * 32U; fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes) EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &source_file](
std::size_t size, std::uint64_t offset, const std::string & /* api_path */, std::size_t size,
data_buffer &data, std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -170,11 +176,14 @@ TEST_F(direct_open_file_test, read_full_file_in_partial_chunks) {
fsi.api_path = "/test.txt"; fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 32U; fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes) EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &source_file](
std::size_t size, std::uint64_t offset, const std::string & /* api_path */, std::size_t size,
data_buffer &data, std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -226,11 +235,14 @@ TEST_F(direct_open_file_test, read_full_file_in_partial_chunks_in_reverse) {
fsi.api_path = "/test.txt"; fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 32U; fsi.size = test_chunk_size * 32U;
std::mutex read_mtx;
EXPECT_CALL(provider, read_file_bytes) EXPECT_CALL(provider, read_file_bytes)
.WillRepeatedly([&source_file](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &source_file](
std::size_t size, std::uint64_t offset, const std::string & /* api_path */, std::size_t size,
data_buffer &data, std::uint64_t offset, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);

View File

@ -308,11 +308,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file) {
fsi.size = test_chunk_size * 33u + 11u; fsi.size = test_chunk_size * 33u + 11u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file"); fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes) EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset, std::size_t size, std::uint64_t offset,
data_buffer &data, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -371,11 +374,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file_in_reverse) {
fsi.size = test_chunk_size * 32u; fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file"); fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes) EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset, std::size_t size, std::uint64_t offset,
data_buffer &data, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -434,11 +440,14 @@ TEST_F(ring_buffer_open_file_test, read_full_file_in_partial_chunks) {
fsi.size = test_chunk_size * 32u; fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("test"); fsi.source_path = test::generate_test_file_name("test");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes) EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset, std::size_t size, std::uint64_t offset,
data_buffer &data, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);
@ -499,11 +508,14 @@ TEST_F(ring_buffer_open_file_test,
fsi.size = test_chunk_size * 32u; fsi.size = test_chunk_size * 32u;
fsi.source_path = test::generate_test_file_name("ring_buffer_open_file"); fsi.source_path = test::generate_test_file_name("ring_buffer_open_file");
std::mutex read_mtx;
EXPECT_CALL(mp, read_file_bytes) EXPECT_CALL(mp, read_file_bytes)
.WillRepeatedly([&nf](const std::string & /* api_path */, .WillRepeatedly([&read_mtx, &nf](const std::string & /* api_path */,
std::size_t size, std::uint64_t offset, std::size_t size, std::uint64_t offset,
data_buffer &data, data_buffer &data,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
mutex_lock lock(read_mtx);
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
std::size_t bytes_read{}; std::size_t bytes_read{};
data.resize(size); data.resize(size);