12 Commits

Author SHA1 Message Date
c72dec6369 fix
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good
2024-10-18 15:15:27 -05:00
c0b0c5d397 fixes 2024-10-18 14:56:44 -05:00
d34ccc424d refactor 2024-10-18 14:53:31 -05:00
663c89ac53 refactor 2024-10-18 14:30:13 -05:00
414f18a518 fix 2024-10-18 14:28:22 -05:00
9bfdece859 fixes 2024-10-18 14:27:50 -05:00
ad79c5daf5 refactor 2024-10-18 14:22:24 -05:00
48a1bef1ae fix 2024-10-18 13:28:21 -05:00
09cb5d8f19 updated build system 2024-10-18 11:46:59 -05:00
c216df9b73 refactor 2024-10-18 11:38:27 -05:00
a0a5ca3390 refactor 2024-10-18 07:36:52 -05:00
ae0a921ba8 updated build system 2024-10-18 06:50:09 -05:00
35 changed files with 5145 additions and 5197 deletions

View File

@ -7,7 +7,6 @@ on Windows.
* Optimized for [Plex Media Server](https://www.plex.tv/)
* Single application to mount AWS S3 and/or Sia
* Only 1 Sia mount and 1 S3 mount (per bucket) per user is supported.
* Remote mounting of Repertory instances on Linux ~~, OS X~~ and Windows
* Securely share your mounts over TCP/IP (`XChaCha20-Poly1305` stream cipher)
* Cross-platform support (Linux 64-bit, Linux arm64/aarch64, ~~OS X,~~ Windows 64-bit)
@ -29,26 +28,69 @@ on Windows.
* ~~OS X Mojave and above~~
* Windows 64-bit 10, 11
## Usage
### Notable Options
* `-dc`
* Display mount configuration.
* For Sia, `--name` is optional
* For S3, the `-s3` option is required along with `--name`
* `--help`
* Display all mount utility options.
* `--name, -na [name]`
* The `--name` option can be set to any valid value allowed as a file name for your filesystem.
* For Sia, the bucket name will be set to the same value if it is empty in the configuration file.
* If the `--name` option is not specified, `default` will be used.
* For S3, the `--name` option is required and does not affect the bucket name.
* `-set SiaConfig.Bucket`
* Set Sia bucket name for the mount.
* Can be used in combination with `--name` to target a unique configuration.
* `-set S3Config.Bucket`
* S3 bucket name for the mount.
* Must be used in combination with `--name` to target a unique configuration.
* Must be used in combination with `-s3`.
### Sia
* Linux
* `repertory /mnt/location`
* `repertory --name default /mnt/location`
* Windows
* `repertory.exe t:`
* `repertory.exe --name default t:`
### S3
* Linux
* `repertory --name storj -s3 /mnt/location`
* Windows
* `repertory.exe --name storj -s3 t:`
## Compiling
* Successful compilation will result in all required files being placed in the `dist/` directory
* Linux
* Ensure `docker` is installed
* For x86_64:
* Release: `scripts/make_unix.sh x86_64`
* Debug: `scripts/make_unix.sh x86_64 debug`
* RelWithDebInfo: `scripts/make_unix.sh`
* Release: `scripts/make_unix.sh x86_64 Release`
* Debug: `scripts/make_unix.sh x86_64 Debug`
* For aarch64:
* Release: `scripts/make_unix.sh aarch64`
* Debug: `scripts/make_unix.sh aarch64 debug`
* RelWithDebInfo: `scripts/make_unix.sh aarch64`
* Release: `scripts/make_unix.sh aarch64 Release`
* Debug: `scripts/make_unix.sh aarch64 Debug`
* Windows
* RECOMMENDED: Cross-compiling on Linux
* OFFICIAL: Cross-compiling on Linux
* Ensure `docker` is installed
* Release: `scripts/make_win32.sh`
* Debug: `scripts/make_win32.sh debug`
* Compiling on Windows
* RelWithDebInfo: `scripts/make_win32.sh`
* Release: `scripts/make_win32.sh x86_64 Release`
* Debug: `scripts/make_win32.sh x86_64 Debug`
* UNOFFICIAL: Compiling on Windows
* Ensure latest [MSYS2](https://www.msys2.org/) is installed
* Release: `scripts/make_win32.cmd`
* Debug: `scripts/make_win32.cmd debug`
* RelWithDebInfo: `scripts\make_win32.cmd`
* Release: `scripts\make_win32.cmd x86_64 Release`
* Debug: `scripts\make_win32.cmd x86_64 Debug`
## Credits

View File

@ -1,41 +1,19 @@
set(BINUTILS_HASH ae9a5789e23459e59606e6714723f2d3ffc31c03174191ef0d015bdf06007450)
set(BOOST2_HASH 7bd7ddceec1a1dfdcbdb3e609b60d01739c38390a5f956385a12f3122049f0ca)
set(BOOST_HASH be0d91732d5b0cc6fbb275c7939974457e79b54d6f07ce2e3dfdd68bef883b0b)
set(CLI11_HASH f2d893a65c3b1324c50d4e682c0cdc021dd0477ae2c048544f39eed6654b699a)
set(CPP_HTTPLIB_HASH c1742fc7179aaae2a67ad9bba0740b7e9ffaf4f5e62feef53101ecdef1478716)
set(CURL_HASH d714818f6ac41ae9154850158fed44b7a87650a6d52f83d3bcb9aa527be354d7)
set(CXXOPTS_HASH 841f49f2e045b9c6365997c2a8fbf76e6f215042dda4511a5bb04bc5ebc7f88a)
set(EXPAT_HASH fbd032683370d761ba68dba2566d3280a154f5290634172d60a79b24d366d9dc)
set(FLAC_HASH 0a4bb82a30609b606650d538a804a7b40205366ce8fc98871b0ecf3fbb0611ee)
set(FMT_HASH 6cb1e6d37bdcb756dbbe59be438790db409cdb4868c66e888d5df9f13f7c027f)
set(FONTCONFIG_HASH f5f359d6332861bd497570848fcb42520964a9e83d5e3abe397b6b6db9bcaaf4)
set(FREETYPE2_HASH 5c3a8e78f7b24c20b25b54ee575d6daa40007a5f4eea2845861c3409b3021747)
set(GCC_HASH 7d376d445f93126dc545e2c0086d0f647c3094aae081cdb78f42ce2bc25e7293)
set(GTEST_HASH 7b42b4d6ed48810c5362c265a17faebe90dc2373c885e5216439d37927f02926)
set(ICU_HASH 925e6b4b8cf8856e0ac214f6f34e30dee63b7bb7a50460ab4603950eff48f89e)
set(JSON_HASH 0d8ef5af7f9794e3263480193c491549b2ba6cc74bb018906202ada498a79406)
set(LIBDSM_HASH 747c4563d6291303d9b085c9e7dc96ac44f91871dcac3e20480fdcc066eee88a)
set(LIBEVENT_HASH 7180a979aaa7000e1264da484f712d403fcf7679b1e9212c4e3d09f5c93efc24)
set(LIBICONV_HASH 8f74213b56238c85a50a5329f77e06198771e70dd9a739779f4c02f65d971313)
set(LIBJPEG_TURBO_HASH a649205a90e39a548863a3614a9576a3fb4465f8e8e66d54999f127957c25b21)
set(LIBPNG_HASH fecc95b46cf05e8e3fc8a414750e0ba5aad00d89e9fdf175e94ff041caf1a03a)
set(LIBSODIUM_HASH 8e5aeca07a723a27bbecc3beef14b0068d37e7fc0e97f51b3f1c82d2a58005c1)
set(LIBTASN_HASH 1613f0ac1cf484d6ec0ce3b8c06d56263cc7242f1c23b30d82d23de345a63f7a)
set(MINGW_HASH 3f66bce069ee8bed7439a1a13da7cb91a5e67ea6170f21317ac7f5794625ee10)
set(NANA_HASH 56f7b1ed006c750fccf8ef15ab1e83f96751f2dfdcb68d93e5f712a6c9b58bcb)
set(NUSPELL_HASH 5d4baa1daf833a18dc06ae0af0571d9574cc849d47daff6b9ce11dac0a5ded6a)
set(OGG_HASH 0eb4b4b9420a0f51db142ba3f9c64b333f826532dc0f48c6410ae51f4799b664)
set(OPENAL_HASH dfddf3a1f61059853c625b7bb03de8433b455f2f79f89548cbcbd5edca3d4a4a)
set(OPENSSL_HASH 777cd596284c883375a2a7a11bf5d2786fc5413255efab20c50d6ffe6d020b7e)
set(PKG_CONFIG_HASH 6fc69c01688c9458a57eb9a1664c9aba372ccda420a02bf4429fe610e7e7d591)
set(PUGIXML_HASH 2f10e276870c64b1db6809050a75e11a897a8d7456c4be5c6b2e35a11168a015)
set(ROCKSDB_HASH b20780586d3df4a3c5bcbde341a2c1946b03d18237960bda5bc5e9538f42af40)
set(SDL_HASH 254a767aa486fa6308d4473159c1f23c794610be775d63e98084111d96814b85)
set(SECP256K1_HASH 61583939f1f25b92e6401e5b819e399da02562de663873df3056993b40148701)
set(SFML_HASH 82535db9e57105d4f3a8aedabd138631defaedc593cab589c924b7d7a11ffb9d)
set(SPDLOG_HASH 1586508029a7d0670dfcb2d97575dcdc242d3868a259742b69f100801ab4e16b)
set(SQLITE_HASH 77823cb110929c2bcb0f5d48e4833b5c59a8a6e40cdea3936b99e199dbbe5784)
set(STDUUID_HASH b1176597e789531c38481acbbed2a6894ad419aab0979c10410d59eb0ebf40d3)
set(VORBIS_HASH 270c76933d0934e42c5ee0a54a36280e2d87af1de3cc3e584806357e237afd13)
set(WXWIDGETS_HASH 0ad86a3ad3e2e519b6a705248fc9226e3a09bbf069c6c692a02acf7c2d1c6b51)
set(ZLIB_HASH 17e88863f3600672ab49182f217281b6fc4d3c762bde361935e436a95214d05c)

View File

@ -1,15 +1,15 @@
set(BINUTILS_VERSION 2.41)
set(BOOST2_MAJOR_VERSION 1)
set(BOOST2_MINOR_VERSION 76)
set(BOOST2_PATCH_VERSION 0)
set(BOOST_MAJOR_VERSION 1)
set(BOOST_MINOR_VERSION 85)
set(BOOST_PATCH_VERSION 0)
set(BOOST2_MAJOR_VERSION 1)
set(BOOST2_MINOR_VERSION 76)
set(BOOST2_PATCH_VERSION 0)
set(CPP_HTTPLIB_VERSION 0.16.3)
set(CURL2_VERSION 8_9_1)
set(CURL_VERSION 8.9.1)
set(EXPAT2_VERSION 2_6_2)
set(CURL2_VERSION 8_9_1)
set(EXPAT_VERSION 2.6.2)
set(EXPAT2_VERSION 2_6_2)
set(GCC_VERSION 14.2.0)
set(GTEST_VERSION 1.15.2)
set(ICU_VERSION 75-1)
@ -21,7 +21,7 @@ set(OPENSSL_VERSION 3.3.1)
set(PKG_CONFIG_VERSION 0.29.2)
set(PUGIXML_VERSION 1.14)
set(SPDLOG_VERSION 1.14.1)
set(SQLITE2_VERSION 3.46.1)
set(SQLITE_VERSION 3460100)
set(SQLITE2_VERSION 3.46.1)
set(STDUUID_VERSION 1.2.3)
set(ZLIB_VERSION 1.3.1)

View File

@ -221,44 +221,25 @@ using WCHAR = wchar_t;
#define MAX_PATH 260
#define STATUS_SUCCESS \
std::uint32_t { 0U }
#define STATUS_ACCESS_DENIED \
std::uint32_t { 0xC0000022L }
#define STATUS_DEVICE_BUSY \
std::uint32_t { 0x80000011L }
#define STATUS_DEVICE_INSUFFICIENT_RESOURCES \
std::uint32_t { 0xC0000468L }
#define STATUS_DIRECTORY_NOT_EMPTY \
std::uint32_t { 0xC0000101L }
#define STATUS_FILE_IS_A_DIRECTORY \
std::uint32_t { 0xC00000BAL }
#define STATUS_FILE_TOO_LARGE \
std::uint32_t { 0xC0000904L }
#define STATUS_INSUFFICIENT_RESOURCES \
std::uint32_t { 0xC000009AL }
#define STATUS_INTERNAL_ERROR \
std::uint32_t { 0xC00000E5L }
#define STATUS_INVALID_ADDRESS \
std::uint32_t { 0xC0000141L }
#define STATUS_INVALID_HANDLE \
std::uint32_t { 0xC0000006L }
#define STATUS_INVALID_IMAGE_FORMAT \
std::uint32_t { 0xC000007BL }
#define STATUS_INVALID_PARAMETER \
std::uint32_t { 0xC000000DL }
#define STATUS_NO_MEMORY \
std::uint32_t { 0xC0000017L }
#define STATUS_NOT_IMPLEMENTED \
std::uint32_t { 0xC0000002L }
#define STATUS_OBJECT_NAME_EXISTS \
std::uint32_t { 0x40000000L }
#define STATUS_OBJECT_NAME_NOT_FOUND \
std::uint32_t { 0xC0000034L }
#define STATUS_OBJECT_PATH_INVALID \
std::uint32_t { 0xC0000039L }
#define STATUS_UNEXPECTED_IO_ERROR \
std::uint32_t { 0xC00000E9L }
#define STATUS_SUCCESS std::uint32_t{0U}
#define STATUS_ACCESS_DENIED std::uint32_t{0xC0000022L}
#define STATUS_DEVICE_BUSY std::uint32_t{0x80000011L}
#define STATUS_DEVICE_INSUFFICIENT_RESOURCES std::uint32_t{0xC0000468L}
#define STATUS_DIRECTORY_NOT_EMPTY std::uint32_t{0xC0000101L}
#define STATUS_FILE_IS_A_DIRECTORY std::uint32_t{0xC00000BAL}
#define STATUS_FILE_TOO_LARGE std::uint32_t{0xC0000904L}
#define STATUS_INSUFFICIENT_RESOURCES std::uint32_t{0xC000009AL}
#define STATUS_INTERNAL_ERROR std::uint32_t{0xC00000E5L}
#define STATUS_INVALID_ADDRESS std::uint32_t{0xC0000141L}
#define STATUS_INVALID_HANDLE std::uint32_t{0xC0000006L}
#define STATUS_INVALID_IMAGE_FORMAT std::uint32_t{0xC000007BL}
#define STATUS_INVALID_PARAMETER std::uint32_t{0xC000000DL}
#define STATUS_NO_MEMORY std::uint32_t{0xC0000017L}
#define STATUS_NOT_IMPLEMENTED std::uint32_t{0xC0000002L}
#define STATUS_OBJECT_NAME_EXISTS std::uint32_t{0x40000000L}
#define STATUS_OBJECT_NAME_NOT_FOUND std::uint32_t{0xC0000034L}
#define STATUS_OBJECT_PATH_INVALID std::uint32_t{0xC0000039L}
#define STATUS_UNEXPECTED_IO_ERROR std::uint32_t{0xC00000E9L}
#define CONVERT_STATUS_NOT_IMPLEMENTED(e) \
((std::uint32_t(e) == STATUS_NOT_IMPLEMENTED) ? -ENOTSUP : e)
@ -296,8 +277,8 @@ using namespace Fsp;
#define INTERFACE_SETUP(name) \
public: \
name(const name &) noexcept = delete; \
name &operator=(const name &) noexcept = delete; \
name &operator=(name &&) noexcept = delete; \
auto operator=(const name &) noexcept -> name & = delete; \
auto operator=(name &&) noexcept -> name & = delete; \
\
protected: \
name() = default; \

View File

@ -27,6 +27,7 @@
#include "file_manager/i_file_manager.hpp"
#include "file_manager/i_open_file.hpp"
#include "file_manager/i_upload_manager.hpp"
#include "file_manager/upload.hpp"
#include "platform/platform.hpp"
#include "types/repertory.hpp"
#include "utils/db/sqlite/db_common.hpp"
@ -39,408 +40,6 @@ class i_provider;
class file_manager final : public i_file_manager, public i_upload_manager {
E_CONSUMER();
public:
class open_file_base : public i_closeable_open_file {
public:
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider);
~open_file_base() override = default;
public:
open_file_base() = delete;
open_file_base(const open_file_base &) noexcept = delete;
open_file_base(open_file_base &&) noexcept = delete;
auto operator=(open_file_base &&) noexcept -> open_file_base & = delete;
auto
operator=(const open_file_base &) noexcept -> open_file_base & = delete;
public:
class download final {
public:
download() = default;
~download() = default;
public:
download(const download &) noexcept = delete;
download(download &&) noexcept = delete;
auto operator=(download &&) noexcept -> download & = delete;
auto operator=(const download &) noexcept -> download & = delete;
private:
bool complete_{false};
api_error error_{api_error::success};
std::mutex mtx_;
std::condition_variable notify_;
public:
void notify(const api_error &err);
auto wait() -> api_error;
};
class io_item final {
public:
io_item(std::function<api_error()> action) : action_(std::move(action)) {}
~io_item() = default;
public:
io_item() = delete;
io_item(const io_item &) noexcept = delete;
io_item(io_item &&) noexcept = delete;
auto operator=(io_item &&) noexcept -> io_item & = delete;
auto operator=(const io_item &) noexcept -> io_item & = delete;
private:
std::function<api_error()> action_;
std::mutex mtx_;
std::condition_variable notify_;
std::optional<api_error> result_;
public:
void action();
[[nodiscard]] auto get_result() -> api_error;
};
protected:
std::uint64_t chunk_size_;
std::uint8_t chunk_timeout_;
filesystem_item fsi_;
std::size_t last_chunk_size_;
std::map<std::uint64_t, open_file_data> open_data_;
i_provider &provider_;
private:
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
stop_type io_stop_requested_{false};
std::unique_ptr<std::thread> io_thread_;
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>>
active_downloads_;
mutable std::recursive_mutex file_mtx_;
std::atomic<std::chrono::system_clock::time_point> last_access_{
std::chrono::system_clock::now()};
bool modified_{false};
std::unique_ptr<utils::file::i_file> nf_;
mutable std::mutex io_thread_mtx_;
std::condition_variable io_thread_notify_;
std::deque<std::shared_ptr<io_item>> io_thread_queue_;
bool removed_{false};
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
virtual auto is_download_complete() const -> bool = 0;
void reset_timeout();
auto set_api_error(const api_error &e) -> api_error;
public:
void add(std::uint64_t handle, open_file_data ofd) override;
[[nodiscard]] auto can_close() const -> bool override;
auto close() -> bool override;
[[nodiscard]] auto get_api_error() const -> api_error;
[[nodiscard]] auto get_api_path() const -> std::string override;
[[nodiscard]] auto get_chunk_size() const -> std::size_t override {
return chunk_size_;
}
[[nodiscard]] auto get_file_size() const -> std::uint64_t override;
[[nodiscard]] auto get_filesystem_item() const -> filesystem_item override;
[[nodiscard]] auto
get_handles() const -> std::vector<std::uint64_t> override;
[[nodiscard]] auto
get_open_data() -> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto
get_open_data(std::uint64_t handle) -> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle) const
-> const open_file_data & override;
[[nodiscard]] auto get_open_file_count() const -> std::size_t override;
[[nodiscard]] auto get_source_path() const -> std::string override {
return fsi_.source_path;
}
[[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override {
return open_data_.find(handle) != open_data_.end();
}
[[nodiscard]] auto is_directory() const -> bool override {
return fsi_.directory;
}
[[nodiscard]] auto is_modified() const -> bool override;
void remove(std::uint64_t handle) override;
void set_api_path(const std::string &api_path) override;
};
class open_file final : public open_file_base {
public:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
private:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
public:
open_file() = delete;
open_file(const open_file &) noexcept = delete;
open_file(open_file &&) noexcept = delete;
auto operator=(open_file &&) noexcept -> open_file & = delete;
auto operator=(const open_file &) noexcept -> open_file & = delete;
public:
~open_file() override;
private:
i_upload_manager &mgr_;
private:
bool notified_ = false;
std::size_t read_chunk_index_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
stop_type stop_requested_ = false;
private:
void download_chunk(std::size_t chunk, bool skip_active, bool should_reset);
void download_range(std::size_t start_chunk_index,
std::size_t end_chunk_index_inclusive,
bool should_reset);
void set_modified();
void update_background_reader(std::size_t read_chunk);
protected:
auto is_download_complete() const -> bool override {
return read_state_.all();
}
public:
auto close() -> bool override;
[[nodiscard]] auto
get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto is_complete() const -> bool override;
auto is_write_supported() const -> bool override { return true; }
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto
native_operation(std::uint64_t new_file_size,
native_operation_callback callback) -> api_error override;
void remove(std::uint64_t handle) override;
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto
resize(std::uint64_t new_file_size) -> api_error override;
[[nodiscard]] auto write(std::uint64_t write_offset,
const data_buffer &data,
std::size_t &bytes_written) -> api_error override;
};
class ring_buffer_open_file final : public open_file_base {
public:
ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size);
~ring_buffer_open_file() override;
public:
ring_buffer_open_file() = delete;
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
auto operator=(ring_buffer_open_file &&) noexcept
-> ring_buffer_open_file & = delete;
auto operator=(const ring_buffer_open_file &) noexcept
-> ring_buffer_open_file & = delete;
private:
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::unique_ptr<std::thread> chunk_forward_thread_;
std::unique_ptr<std::thread> chunk_reverse_thread_;
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::size_t current_chunk_{};
std::size_t first_chunk_{};
std::size_t last_chunk_;
private:
auto download_chunk(std::size_t chunk) -> api_error;
void forward_reader_thread(std::size_t count);
void reverse_reader_thread(std::size_t count);
protected:
auto is_download_complete() const -> bool override;
public:
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return current_chunk_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return first_chunk_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t {
return last_chunk_;
}
[[nodiscard]] auto
get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; }
auto is_write_supported() const -> bool override { return false; }
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
[[nodiscard]] auto
native_operation(std::uint64_t,
native_operation_callback) -> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &,
std::size_t &) -> api_error override {
return api_error::not_supported;
}
};
class upload final {
public:
upload(filesystem_item fsi, i_provider &provider);
~upload();
public:
upload() = delete;
upload(const upload &) noexcept = delete;
upload(upload &&) noexcept = delete;
auto operator=(upload &&) noexcept -> upload & = delete;
auto operator=(const upload &) noexcept -> upload & = delete;
private:
filesystem_item fsi_;
i_provider &provider_;
private:
bool cancelled_{false};
api_error error_{api_error::success};
std::unique_ptr<std::thread> thread_;
stop_type stop_requested_{false};
private:
void upload_thread();
public:
void cancel();
[[nodiscard]] auto get_api_error() const -> api_error { return error_; }
[[nodiscard]] auto get_api_path() const -> std::string {
return fsi_.api_path;
}
[[nodiscard]] auto get_source_path() const -> std::string {
return fsi_.source_path;
}
[[nodiscard]] auto is_cancelled() const -> bool { return cancelled_; }
void stop();
};
public:
file_manager(app_config &config, i_provider &provider);
@ -458,8 +57,7 @@ private:
i_provider &provider_;
private:
utils::db::sqlite::db3_t db_;
std::uint64_t next_handle_{0U};
std::atomic<std::uint64_t> next_handle_{0U};
mutable std::recursive_mutex open_file_mtx_;
std::unordered_map<std::string, std::shared_ptr<i_closeable_open_file>>
open_file_lookup_;
@ -470,6 +68,8 @@ private:
std::unique_ptr<std::thread> upload_thread_;
private:
[[nodiscard]] auto create_db() const -> utils::db::sqlite::db3_t;
void close_timed_out_files();
auto get_open_file_by_handle(std::uint64_t handle) const
@ -483,12 +83,22 @@ private:
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error;
void queue_upload(const std::string &api_path, const std::string &source_path,
bool no_lock);
bool no_lock, sqlite3 *db);
void remove_upload(const std::string &api_path, bool no_lock);
void remove_resume(const std::string &api_path,
const std::string &source_path, sqlite3 *db);
void remove_upload(const std::string &api_path, bool no_lock, sqlite3 *db);
[[nodiscard]] auto rename_directory(const std::string &from_api_path,
const std::string &to_api_path,
sqlite3 *db) -> api_error;
[[nodiscard]] auto rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite,
sqlite3 *db) -> api_error;
void swap_renamed_items(std::string from_api_path, std::string to_api_path,
bool directory);
bool directory, sqlite3 *db);
void upload_completed(const file_upload_completed &evt);
@ -498,7 +108,8 @@ public:
[[nodiscard]] auto get_next_handle() -> std::uint64_t;
auto handle_file_rename(const std::string &from_api_path,
const std::string &to_api_path) -> api_error;
const std::string &to_api_path, sqlite3 *db)
-> api_error;
void queue_upload(const i_open_file &file) override;
@ -537,8 +148,8 @@ public:
[[nodiscard]] auto has_no_open_file_handles() const -> bool override;
[[nodiscard]] auto
is_processing(const std::string &api_path) const -> bool override;
[[nodiscard]] auto is_processing(const std::string &api_path) const
-> bool override;
#if defined(PROJECT_TESTING)
[[nodiscard]] auto open(std::shared_ptr<i_closeable_open_file> of,
@ -551,13 +162,13 @@ public:
[[nodiscard]] auto remove_file(const std::string &api_path) -> api_error;
[[nodiscard]] auto
rename_directory(const std::string &from_api_path,
const std::string &to_api_path) -> api_error;
[[nodiscard]] auto rename_directory(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error;
[[nodiscard]] auto rename_file(const std::string &from_api_path,
const std::string &to_api_path,
bool overwrite) -> api_error;
const std::string &to_api_path, bool overwrite)
-> api_error;
void start();

View File

@ -0,0 +1,122 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class open_file final : public open_file_base {
public:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
private:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
public:
open_file() = delete;
open_file(const open_file &) noexcept = delete;
open_file(open_file &&) noexcept = delete;
auto operator=(open_file &&) noexcept -> open_file & = delete;
auto operator=(const open_file &) noexcept -> open_file & = delete;
public:
~open_file() override;
private:
i_upload_manager &mgr_;
private:
bool notified_ = false;
std::size_t read_chunk_index_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
stop_type stop_requested_ = false;
private:
void download_chunk(std::size_t chunk, bool skip_active, bool should_reset);
void download_range(std::size_t start_chunk_index,
std::size_t end_chunk_index_inclusive, bool should_reset);
void set_modified();
void update_background_reader(std::size_t read_chunk);
protected:
auto is_download_complete() const -> bool override {
return read_state_.all();
}
public:
auto close() -> bool override;
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto is_complete() const -> bool override;
auto is_write_supported() const -> bool override { return true; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t new_file_size,
native_operation_callback callback)
-> api_error override;
void remove(std::uint64_t handle) override;
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t new_file_size) -> api_error override;
[[nodiscard]] auto write(std::uint64_t write_offset, const data_buffer &data,
std::size_t &bytes_written) -> api_error override;
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_

View File

@ -0,0 +1,194 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_
#include "file_manager/i_open_file.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory {
class i_provider;
class open_file_base : public i_closeable_open_file {
public:
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider);
~open_file_base() override = default;
public:
open_file_base() = delete;
open_file_base(const open_file_base &) noexcept = delete;
open_file_base(open_file_base &&) noexcept = delete;
auto operator=(open_file_base &&) noexcept -> open_file_base & = delete;
auto operator=(const open_file_base &) noexcept -> open_file_base & = delete;
public:
class download final {
public:
download() = default;
~download() = default;
public:
download(const download &) noexcept = delete;
download(download &&) noexcept = delete;
auto operator=(download &&) noexcept -> download & = delete;
auto operator=(const download &) noexcept -> download & = delete;
private:
bool complete_{false};
api_error error_{api_error::success};
std::mutex mtx_;
std::condition_variable notify_;
public:
void notify(const api_error &err);
auto wait() -> api_error;
};
class io_item final {
public:
io_item(std::function<api_error()> action) : action_(std::move(action)) {}
~io_item() = default;
public:
io_item() = delete;
io_item(const io_item &) noexcept = delete;
io_item(io_item &&) noexcept = delete;
auto operator=(io_item &&) noexcept -> io_item & = delete;
auto operator=(const io_item &) noexcept -> io_item & = delete;
private:
std::function<api_error()> action_;
std::mutex mtx_;
std::condition_variable notify_;
std::optional<api_error> result_;
public:
void action();
[[nodiscard]] auto get_result() -> api_error;
};
protected:
std::uint64_t chunk_size_;
std::uint8_t chunk_timeout_;
filesystem_item fsi_;
std::size_t last_chunk_size_;
std::map<std::uint64_t, open_file_data> open_data_;
i_provider &provider_;
private:
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
stop_type io_stop_requested_{false};
std::unique_ptr<std::thread> io_thread_;
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
mutable std::recursive_mutex file_mtx_;
std::atomic<std::chrono::system_clock::time_point> last_access_{
std::chrono::system_clock::now()};
bool modified_{false};
std::unique_ptr<utils::file::i_file> nf_;
mutable std::mutex io_thread_mtx_;
std::condition_variable io_thread_notify_;
std::deque<std::shared_ptr<io_item>> io_thread_queue_;
bool removed_{false};
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
virtual auto is_download_complete() const -> bool = 0;
void reset_timeout();
auto set_api_error(const api_error &e) -> api_error;
public:
void add(std::uint64_t handle, open_file_data ofd) override;
[[nodiscard]] auto can_close() const -> bool override;
auto close() -> bool override;
[[nodiscard]] auto get_api_error() const -> api_error;
[[nodiscard]] auto get_api_path() const -> std::string override;
[[nodiscard]] auto get_chunk_size() const -> std::size_t override {
return chunk_size_;
}
[[nodiscard]] auto get_file_size() const -> std::uint64_t override;
[[nodiscard]] auto get_filesystem_item() const -> filesystem_item override;
[[nodiscard]] auto get_handles() const -> std::vector<std::uint64_t> override;
[[nodiscard]] auto get_open_data()
-> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle)
-> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle) const
-> const open_file_data & override;
[[nodiscard]] auto get_open_file_count() const -> std::size_t override;
[[nodiscard]] auto get_source_path() const -> std::string override {
return fsi_.source_path;
}
[[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override {
return open_data_.find(handle) != open_data_.end();
}
[[nodiscard]] auto is_directory() const -> bool override {
return fsi_.directory;
}
[[nodiscard]] auto is_modified() const -> bool override;
void remove(std::uint64_t handle) override;
void set_api_path(const std::string &api_path) override;
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_

View File

@ -0,0 +1,132 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class ring_buffer_open_file final : public open_file_base {
public:
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider);
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider, std::size_t ring_size);
~ring_buffer_open_file() override;
public:
ring_buffer_open_file() = delete;
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
auto operator=(ring_buffer_open_file &&) noexcept
-> ring_buffer_open_file & = delete;
auto operator=(const ring_buffer_open_file &) noexcept
-> ring_buffer_open_file & = delete;
private:
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::unique_ptr<std::thread> chunk_forward_thread_;
std::unique_ptr<std::thread> chunk_reverse_thread_;
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::size_t current_chunk_{};
std::size_t first_chunk_{};
std::size_t last_chunk_;
private:
auto download_chunk(std::size_t chunk) -> api_error;
void forward_reader_thread(std::size_t count);
void reverse_reader_thread(std::size_t count);
protected:
auto is_download_complete() const -> bool override;
public:
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return current_chunk_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return first_chunk_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t {
return last_chunk_;
}
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; }
auto is_write_supported() const -> bool override { return false; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
-> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_

View File

@ -1,41 +1,75 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
namespace repertory {
void file_manager::open_file_base::io_item::action() {
result_ = action_();
mutex_lock lock(mtx_);
notify_.notify_all();
}
auto file_manager::open_file_base::io_item::get_result() -> api_error {
unique_mutex_lock lock(mtx_);
if (result_.has_value()) {
return result_.value();
}
notify_.wait(lock);
return result_.value_or(api_error::error);
}
} // namespace repertory
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_
#include "types/repertory.hpp"
namespace repertory {
class i_provider;
class upload final {
public:
upload(filesystem_item fsi, i_provider &provider);
~upload();
public:
upload() = delete;
upload(const upload &) noexcept = delete;
upload(upload &&) noexcept = delete;
auto operator=(upload &&) noexcept -> upload & = delete;
auto operator=(const upload &) noexcept -> upload & = delete;
private:
filesystem_item fsi_;
i_provider &provider_;
private:
bool cancelled_{false};
api_error error_{api_error::success};
std::unique_ptr<std::thread> thread_;
stop_type stop_requested_{false};
private:
void upload_thread();
public:
void cancel();
[[nodiscard]] auto get_api_error() const -> api_error { return error_; }
[[nodiscard]] auto get_api_path() const -> std::string {
return fsi_.api_path;
}
[[nodiscard]] auto get_source_path() const -> std::string {
return fsi_.source_path;
}
[[nodiscard]] auto is_cancelled() const -> bool { return cancelled_; }
void stop();
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_

View File

@ -22,11 +22,14 @@
#ifndef REPERTORY_INCLUDE_PLATFORM_PLATFORM_HPP_
#define REPERTORY_INCLUDE_PLATFORM_PLATFORM_HPP_
#include "platform/unix_platform.hpp"
#if defined(_WIN32)
#include "platform/win32_platform.hpp"
#include "utils/unix.hpp"
#include "utils/unix/unix_utils.hpp"
#include "utils/windows.hpp"
#include "utils/windows/windows_utils.hpp"
#else // !defined(_WIN32)
#include "platform/unix_platform.hpp"
#include "utils/unix.hpp"
#include "utils/unix/unix_utils.hpp"
#endif // defined(_WIN32)
#endif // REPERTORY_INCLUDE_PLATFORM_PLATFORM_HPP_

View File

@ -23,9 +23,12 @@
#include "app_config.hpp"
#include "file_manager/events.hpp"
#include "file_manager/open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "file_manager/ring_buffer_open_file.hpp"
#include "file_manager/upload.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "types/startup_exception.hpp"
#include "utils/common.hpp"
#include "utils/db/sqlite/db_common.hpp"
#include "utils/db/sqlite/db_delete.hpp"
@ -40,8 +43,8 @@
#include "utils/time.hpp"
namespace {
[[nodiscard]] auto
create_resume_entry(const repertory::i_open_file &file) -> json {
[[nodiscard]] auto create_resume_entry(const repertory::i_open_file &file)
-> json {
return {
{"chunk_size", file.get_chunk_size()},
{"path", file.get_api_path()},
@ -104,32 +107,6 @@ namespace repertory {
file_manager::file_manager(app_config &config, i_provider &provider)
: config_(config), provider_(provider) {
if (not provider_.is_direct_only()) {
auto db_path =
utils::path::combine(config.get_data_directory(), {"file_manager.db"});
sqlite3 *db3{nullptr};
auto res =
sqlite3_open_v2(db_path.c_str(), &db3,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (res != SQLITE_OK) {
throw startup_exception("failed to open db|" + db_path + '|' +
std::to_string(res) + '|' + sqlite3_errstr(res));
}
db_ = utils::db::sqlite::db3_t{
db3,
utils::db::sqlite::sqlite3_deleter(),
};
for (auto &&create_item : sql_create_tables) {
std::string err;
if (not utils::db::sqlite::execute_sql(*db_, create_item.second, err)) {
db_.reset();
throw startup_exception(err);
}
}
utils::db::sqlite::set_journal_mode(*db_);
E_SUBSCRIBE_EXACT(file_upload_completed,
[this](const file_upload_completed &completed) {
this->upload_completed(completed);
@ -139,7 +116,6 @@ file_manager::file_manager(app_config &config, i_provider &provider)
file_manager::~file_manager() {
stop();
db_.reset();
E_CONSUMER_RELEASE();
}
@ -194,6 +170,38 @@ void file_manager::close_timed_out_files() {
closeable_list.clear();
}
auto file_manager::create_db() const -> utils::db::sqlite::db3_t {
auto db_path =
utils::path::combine(config_.get_data_directory(), {"file_manager.db"});
sqlite3 *db3{nullptr};
auto db_res =
sqlite3_open_v2(db_path.c_str(), &db3,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, nullptr);
if (db_res != SQLITE_OK) {
throw std::runtime_error("failed to open db|" + db_path + '|' +
std::to_string(db_res) + '|' +
sqlite3_errstr(db_res));
}
auto db = utils::db::sqlite::db3_t{
db3,
utils::db::sqlite::sqlite3_deleter(),
};
for (auto &&create_item : sql_create_tables) {
std::string err;
if (not utils::db::sqlite::execute_sql(*db, create_item.second, err)) {
db.reset();
throw std::runtime_error(err);
}
}
utils::db::sqlite::set_journal_mode(*db);
return db;
}
auto file_manager::create(const std::string &api_path, api_meta_map &meta,
open_file_data ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file) -> api_error {
@ -271,7 +279,7 @@ auto file_manager::get_directory_items(const std::string &api_path) const
auto file_manager::get_next_handle() -> std::uint64_t {
if (++next_handle_ == 0U) {
next_handle_++;
++next_handle_;
}
return next_handle_;
@ -349,23 +357,27 @@ auto file_manager::get_open_handle_count() const -> std::size_t {
auto file_manager::get_stored_downloads() const -> std::vector<json> {
REPERTORY_USES_FUNCTION_NAME();
std::vector<json> ret;
if (not provider_.is_direct_only()) {
auto result = utils::db::sqlite::db_select{*db_.get(), resume_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not result.get_row(row)) {
continue;
}
if (not row.has_value()) {
continue;
}
if (provider_.is_direct_only()) {
return {};
}
ret.push_back(row.value().get_column("data").get_value_as_json());
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
auto db = create_db();
std::vector<json> ret;
auto result = utils::db::sqlite::db_select{*db.get(), resume_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not result.get_row(row)) {
continue;
}
if (not row.has_value()) {
continue;
}
ret.push_back(row.value().get_column("data").get_value_as_json());
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
@ -373,8 +385,8 @@ auto file_manager::get_stored_downloads() const -> std::vector<json> {
}
auto file_manager::handle_file_rename(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
const std::string &to_api_path,
sqlite3 *db) -> api_error {
std::string source_path{};
auto file_iter = open_file_lookup_.find(from_api_path);
if (file_iter != open_file_lookup_.end()) {
@ -387,7 +399,7 @@ auto file_manager::handle_file_rename(const std::string &from_api_path,
source_path = upload_lookup_.at(from_api_path)->get_source_path();
}
} else {
auto result = utils::db::sqlite::db_select{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_select{*db, upload_table}
.column("source_path")
.where("api_path")
.equals(from_api_path)
@ -399,22 +411,22 @@ auto file_manager::handle_file_rename(const std::string &from_api_path,
}
}
remove_upload(from_api_path);
remove_upload(from_api_path, true, db);
auto ret = provider_.rename_file(from_api_path, to_api_path);
if (ret != api_error::success) {
queue_upload(from_api_path, source_path, false);
queue_upload(from_api_path, source_path, false, db);
return ret;
}
swap_renamed_items(from_api_path, to_api_path, false);
swap_renamed_items(from_api_path, to_api_path, false, db);
ret = source_path.empty()
? api_error::success
: provider_.set_item_meta(to_api_path, META_SOURCE, source_path);
if (should_upload) {
queue_upload(to_api_path, source_path, false);
queue_upload(to_api_path, source_path, false, db);
}
return ret;
@ -435,7 +447,9 @@ auto file_manager::is_processing(const std::string &api_path) const -> bool {
}
upload_lock.unlock();
utils::db::sqlite::db_select query{*db_.get(), upload_table};
auto db = create_db();
utils::db::sqlite::db_select query{*db.get(), upload_table};
if (query.where("api_path").equals(api_path).go().has_row()) {
return true;
};
@ -455,10 +469,11 @@ auto file_manager::open(const std::string &api_path, bool directory,
return open(api_path, directory, ofd, handle, file, nullptr);
}
auto file_manager::open(
const std::string &api_path, bool directory, const open_file_data &ofd,
std::uint64_t &handle, std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file) -> api_error {
auto file_manager::open(const std::string &api_path, bool directory,
const open_file_data &ofd, std::uint64_t &handle,
std::shared_ptr<i_open_file> &file,
std::shared_ptr<i_closeable_open_file> closeable_file)
-> api_error {
const auto create_and_add_handle =
[&](std::shared_ptr<i_closeable_open_file> cur_file) {
handle = get_next_handle();
@ -502,11 +517,14 @@ auto file_manager::open(
}
void file_manager::queue_upload(const i_open_file &file) {
return queue_upload(file.get_api_path(), file.get_source_path(), false);
auto db = create_db();
return queue_upload(file.get_api_path(), file.get_source_path(), false,
db.get());
}
void file_manager::queue_upload(const std::string &api_path,
const std::string &source_path, bool no_lock) {
const std::string &source_path, bool no_lock,
sqlite3 *db) {
if (provider_.is_direct_only()) {
return;
}
@ -515,10 +533,10 @@ void file_manager::queue_upload(const std::string &api_path,
if (not no_lock) {
lock = std::make_unique<mutex_lock>(upload_mtx_);
}
remove_upload(api_path, true);
remove_upload(api_path, true, db);
auto result =
utils::db::sqlite::db_insert{*db_.get(), upload_table}
utils::db::sqlite::db_insert{*db, upload_table}
.or_replace()
.column_value("api_path", api_path)
.column_value("date_time",
@ -526,7 +544,7 @@ void file_manager::queue_upload(const std::string &api_path,
.column_value("source_path", source_path)
.go();
if (result.ok()) {
remove_resume(api_path, source_path);
remove_resume(api_path, source_path, db);
event_system::instance().raise<file_upload_queued>(api_path, source_path);
} else {
event_system::instance().raise<file_upload_failed>(
@ -552,9 +570,10 @@ auto file_manager::remove_file(const std::string &api_path) -> api_error {
close_all(api_path);
remove_upload(api_path);
auto db = create_db();
remove_upload(api_path, true, db.get());
auto result = utils::db::sqlite::db_delete{*db_.get(), resume_table}
auto result = utils::db::sqlite::db_delete{*db.get(), resume_table}
.where("api_path")
.equals(api_path)
.go();
@ -580,7 +599,13 @@ auto file_manager::remove_file(const std::string &api_path) -> api_error {
void file_manager::remove_resume(const std::string &api_path,
const std::string &source_path) {
auto result = utils::db::sqlite::db_delete{*db_.get(), resume_table}
auto db = create_db();
return remove_resume(api_path, source_path, db.get());
}
void file_manager::remove_resume(const std::string &api_path,
const std::string &source_path, sqlite3 *db) {
auto result = utils::db::sqlite::db_delete{*db, resume_table}
.where("api_path")
.equals(api_path)
.go();
@ -591,10 +616,12 @@ void file_manager::remove_resume(const std::string &api_path,
}
void file_manager::remove_upload(const std::string &api_path) {
remove_upload(api_path, false);
auto db = create_db();
remove_upload(api_path, false, db.get());
}
void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
void file_manager::remove_upload(const std::string &api_path, bool no_lock,
sqlite3 *db) {
REPERTORY_USES_FUNCTION_NAME();
if (provider_.is_direct_only()) {
@ -606,7 +633,7 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
lock = std::make_unique<mutex_lock>(upload_mtx_);
}
auto result = utils::db::sqlite::db_delete{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_delete{*db, upload_table}
.where("api_path")
.equals(api_path)
.go();
@ -616,7 +643,7 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
"failed to remove from upload table");
}
result = utils::db::sqlite::db_delete{*db_.get(), upload_active_table}
result = utils::db::sqlite::db_delete{*db, upload_active_table}
.where("api_path")
.equals(api_path)
.go();
@ -643,6 +670,13 @@ void file_manager::remove_upload(const std::string &api_path, bool no_lock) {
auto file_manager::rename_directory(const std::string &from_api_path,
const std::string &to_api_path)
-> api_error {
auto db = create_db();
return rename_directory(from_api_path, to_api_path, db.get());
}
auto file_manager::rename_directory(const std::string &from_api_path,
const std::string &to_api_path, sqlite3 *db)
-> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
@ -698,8 +732,9 @@ auto file_manager::rename_directory(const std::string &from_api_path,
auto old_api_path = api_path;
auto new_api_path = utils::path::create_api_path(utils::path::combine(
to_api_path, {old_api_path.substr(from_api_path.size())}));
res = list[i].directory ? rename_directory(old_api_path, new_api_path)
: rename_file(old_api_path, new_api_path, false);
res = list[i].directory
? rename_directory(old_api_path, new_api_path, db)
: rename_file(old_api_path, new_api_path, false, db);
}
}
@ -712,13 +747,20 @@ auto file_manager::rename_directory(const std::string &from_api_path,
return res;
}
swap_renamed_items(from_api_path, to_api_path, true);
swap_renamed_items(from_api_path, to_api_path, true, db);
return api_error::success;
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path,
bool overwrite) -> api_error {
const std::string &to_api_path, bool overwrite)
-> api_error {
auto db = create_db();
return rename_file(from_api_path, to_api_path, overwrite, db.get());
}
auto file_manager::rename_file(const std::string &from_api_path,
const std::string &to_api_path, bool overwrite,
sqlite3 *db) -> api_error {
if (not provider_.is_rename_supported()) {
return api_error::not_implemented;
}
@ -786,12 +828,18 @@ auto file_manager::rename_file(const std::string &from_api_path,
}
}
return handle_file_rename(from_api_path, to_api_path);
return handle_file_rename(from_api_path, to_api_path, db);
}
void file_manager::start() {
REPERTORY_USES_FUNCTION_NAME();
if (upload_thread_) {
return;
}
stop_requested_ = false;
polling::instance().set_callback(
{"timed_out_close", polling::frequency::second,
[this]() { this->close_timed_out_files(); }});
@ -801,106 +849,104 @@ void file_manager::start() {
return;
}
if (not upload_thread_) {
stop_requested_ = false;
auto db = create_db();
struct active_item final {
std::string api_path;
std::string source_path;
};
std::vector<active_item> active_items{};
auto result =
utils::db::sqlite::db_select{*db.get(), upload_active_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (result.get_row(row) && row.has_value()) {
active_items.emplace_back(active_item{
row->get_column("api_path").get_value<std::string>(),
row->get_column("source_path").get_value<std::string>(),
});
}
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
for (auto &&active_item : active_items) {
queue_upload(active_item.api_path, active_item.source_path, false,
db.get());
}
active_items.clear();
result = utils::db::sqlite::db_select{*db.get(), resume_table}.go();
if (not result.ok()) {
return;
}
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not(result.get_row(row) && row.has_value())) {
return;
}
auto resume_entry = row.value().get_column("data").get_value_as_json();
struct active_item final {
std::string api_path;
std::string source_path;
};
std::size_t chunk_size{};
boost::dynamic_bitset<> read_state;
restore_resume_entry(resume_entry, api_path, chunk_size, read_state,
source_path);
std::vector<active_item> active_items{};
auto result =
utils::db::sqlite::db_select{*db_.get(), upload_active_table}.go();
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (result.get_row(row) && row.has_value()) {
active_items.emplace_back(active_item{
row->get_column("api_path").get_value<std::string>(),
row->get_column("source_path").get_value<std::string>(),
});
}
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
std::abort();
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res != api_error::success) {
event_system::instance().raise<download_restore_failed>(
api_path, source_path,
"failed to get filesystem item|" + api_error_to_string(res));
continue;
}
}
for (auto &&active_item : active_items) {
queue_upload(active_item.api_path, active_item.source_path, false);
}
active_items.clear();
result = utils::db::sqlite::db_select{*db_.get(), resume_table}.go();
if (not result.ok()) {
return;
}
while (result.has_row()) {
try {
std::optional<utils::db::sqlite::db_select::row> row;
if (not(result.get_row(row) && row.has_value())) {
return;
}
auto resume_entry = row.value().get_column("data").get_value_as_json();
std::string api_path;
std::string source_path;
std::size_t chunk_size{};
boost::dynamic_bitset<> read_state;
restore_resume_entry(resume_entry, api_path, chunk_size, read_state,
source_path);
filesystem_item fsi{};
auto res = provider_.get_filesystem_item(api_path, false, fsi);
if (res != api_error::success) {
event_system::instance().raise<download_restore_failed>(
api_path, source_path,
"failed to get filesystem item|" + api_error_to_string(res));
continue;
}
if (source_path != fsi.source_path) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"source path mismatch|expected|" + source_path + "|actual|" +
fsi.source_path);
continue;
}
auto opt_size = utils::file::file{fsi.source_path}.size();
if (not opt_size.has_value()) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"failed to get file size: " +
std::to_string(utils::get_last_error_code()));
continue;
}
auto file_size{opt_size.value()};
if (file_size != fsi.size) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"file size mismatch|expected|" + std::to_string(fsi.size) +
"|actual|" + std::to_string(file_size));
continue;
}
auto closeable_file = std::make_shared<open_file>(
chunk_size,
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, read_state, *this);
open_file_lookup_[api_path] = closeable_file;
event_system::instance().raise<download_restored>(fsi.api_path,
fsi.source_path);
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
if (source_path != fsi.source_path) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"source path mismatch|expected|" + source_path + "|actual|" +
fsi.source_path);
continue;
}
auto opt_size = utils::file::file{fsi.source_path}.size();
if (not opt_size.has_value()) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"failed to get file size: " +
std::to_string(utils::get_last_error_code()));
continue;
}
auto file_size{opt_size.value()};
if (file_size != fsi.size) {
event_system::instance().raise<download_restore_failed>(
fsi.api_path, fsi.source_path,
"file size mismatch|expected|" + std::to_string(fsi.size) +
"|actual|" + std::to_string(file_size));
continue;
}
auto closeable_file = std::make_shared<open_file>(
chunk_size,
config_.get_enable_chunk_download_timeout()
? config_.get_chunk_downloader_timeout_secs()
: 0U,
fsi, provider_, read_state, *this);
open_file_lookup_[api_path] = closeable_file;
event_system::instance().raise<download_restored>(fsi.api_path,
fsi.source_path);
} catch (const std::exception &ex) {
utils::error::raise_error(function_name, ex, "query error");
}
}
@ -909,40 +955,43 @@ void file_manager::start() {
}
void file_manager::stop() {
if (not stop_requested_) {
event_system::instance().raise<service_shutdown_begin>("file_manager");
polling::instance().remove_callback("timed_out_close");
stop_requested_ = true;
unique_mutex_lock upload_lock(upload_mtx_);
upload_notify_.notify_all();
upload_lock.unlock();
if (upload_thread_) {
upload_thread_->join();
upload_thread_.reset();
}
open_file_lookup_.clear();
upload_lock.lock();
for (auto &&item : upload_lookup_) {
item.second->stop();
}
upload_notify_.notify_all();
upload_lock.unlock();
while (not upload_lookup_.empty()) {
upload_lock.lock();
if (not upload_lookup_.empty()) {
upload_notify_.wait_for(upload_lock, 1ms);
}
upload_notify_.notify_all();
upload_lock.unlock();
}
event_system::instance().raise<service_shutdown_end>("file_manager");
if (stop_requested_) {
return;
}
event_system::instance().raise<service_shutdown_begin>("file_manager");
polling::instance().remove_callback("timed_out_close");
stop_requested_ = true;
unique_mutex_lock upload_lock(upload_mtx_);
upload_notify_.notify_all();
upload_lock.unlock();
if (upload_thread_) {
upload_thread_->join();
}
open_file_lookup_.clear();
upload_lock.lock();
for (auto &&item : upload_lookup_) {
item.second->stop();
}
upload_notify_.notify_all();
upload_lock.unlock();
while (not upload_lookup_.empty()) {
upload_lock.lock();
if (not upload_lookup_.empty()) {
upload_notify_.wait_for(upload_lock, 1ms);
}
upload_notify_.notify_all();
upload_lock.unlock();
}
upload_thread_.reset();
event_system::instance().raise<service_shutdown_end>("file_manager");
}
void file_manager::store_resume(const i_open_file &file) {
@ -950,7 +999,8 @@ void file_manager::store_resume(const i_open_file &file) {
return;
}
auto result = utils::db::sqlite::db_insert{*db_.get(), resume_table}
auto db = create_db();
auto result = utils::db::sqlite::db_insert{*db.get(), resume_table}
.or_replace()
.column_value("api_path", file.get_api_path())
.column_value("data", create_resume_entry(file).dump())
@ -968,7 +1018,8 @@ void file_manager::store_resume(const i_open_file &file) {
}
void file_manager::swap_renamed_items(std::string from_api_path,
std::string to_api_path, bool directory) {
std::string to_api_path, bool directory,
sqlite3 *db) {
REPERTORY_USES_FUNCTION_NAME();
auto file_iter = open_file_lookup_.find(from_api_path);
@ -983,7 +1034,7 @@ void file_manager::swap_renamed_items(std::string from_api_path,
return;
}
auto result = utils::db::sqlite::db_update{*db_.get(), resume_table}
auto result = utils::db::sqlite::db_update{*db, resume_table}
.column_value("api_path", to_api_path)
.where("api_path")
.equals(from_api_path)
@ -1001,13 +1052,14 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
unique_mutex_lock upload_lock(upload_mtx_);
if (not utils::string::to_bool(evt.get_cancelled().get<std::string>())) {
auto db = create_db();
auto err = api_error_from_string(evt.get_result().get<std::string>());
if (err == api_error::success) {
auto result =
utils::db::sqlite::db_delete{*db_.get(), upload_active_table}
.where("api_path")
.equals(evt.get_api_path().get<std::string>())
.go();
auto result = utils::db::sqlite::db_delete{*db.get(), upload_active_table}
.where("api_path")
.equals(evt.get_api_path().get<std::string>())
.go();
if (not result.ok()) {
utils::error::raise_api_path_error(
function_name, evt.get_api_path().get<std::string>(),
@ -1023,12 +1075,12 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
not utils::file::file(evt.get_source().get<std::string>()).exists()) {
event_system::instance().raise<file_upload_not_found>(
evt.get_api_path(), evt.get_source());
remove_upload(evt.get_api_path(), true);
remove_upload(evt.get_api_path(), true, db.get());
} else {
event_system::instance().raise<file_upload_retry>(
evt.get_api_path(), evt.get_source(), err);
queue_upload(evt.get_api_path(), evt.get_source(), true);
queue_upload(evt.get_api_path(), evt.get_source(), true, db.get());
upload_notify_.wait_for(upload_lock, 5s);
}
}
@ -1040,6 +1092,8 @@ void file_manager::upload_completed(const file_upload_completed &evt) {
void file_manager::upload_handler() {
REPERTORY_USES_FUNCTION_NAME();
auto db = create_db();
while (not stop_requested_) {
auto should_wait{true};
unique_mutex_lock upload_lock(upload_mtx_);
@ -1049,7 +1103,7 @@ void file_manager::upload_handler() {
}
if (upload_lookup_.size() < config_.get_max_upload_count()) {
auto result = utils::db::sqlite::db_select{*db_.get(), upload_table}
auto result = utils::db::sqlite::db_select{*db.get(), upload_table}
.order_by("api_path", true)
.limit(1)
.go();
@ -1067,7 +1121,7 @@ void file_manager::upload_handler() {
should_wait = false;
event_system::instance().raise<file_upload_not_found>(api_path,
source_path);
remove_upload(api_path, true);
remove_upload(api_path, true, db.get());
} break;
case api_error::success: {
@ -1075,14 +1129,13 @@ void file_manager::upload_handler() {
upload_lookup_[fsi.api_path] =
std::make_unique<upload>(fsi, provider_);
auto del_res =
utils::db::sqlite::db_delete{*db_.get(), upload_table}
.where("api_path")
.equals(api_path)
.go();
auto del_res = utils::db::sqlite::db_delete{*db.get(), upload_table}
.where("api_path")
.equals(api_path)
.go();
if (del_res.ok()) {
auto ins_res =
utils::db::sqlite::db_insert{*db_.get(), upload_active_table}
utils::db::sqlite::db_insert{*db.get(), upload_active_table}
.column_value("api_path", api_path)
.column_value("source_path", source_path)
.go();
@ -1097,7 +1150,7 @@ void file_manager::upload_handler() {
default: {
event_system::instance().raise<file_upload_retry>(api_path,
source_path, res);
queue_upload(api_path, source_path, true);
queue_upload(api_path, source_path, true, db.get());
} break;
}
}

View File

@ -1,43 +0,0 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
namespace repertory {
void file_manager::open_file_base::download::notify(const api_error &err) {
complete_ = true;
error_ = err;
unique_mutex_lock lock(mtx_);
notify_.notify_all();
}
auto file_manager::open_file_base::download::wait() -> api_error {
if (not complete_) {
unique_mutex_lock lock(mtx_);
if (not complete_) {
notify_.wait(lock);
}
notify_.notify_all();
}
return error_;
}
} // namespace repertory

View File

@ -1,261 +1,292 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "providers/i_provider.hpp"
#include "utils/path.hpp"
namespace repertory {
file_manager::open_file_base::open_file_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, {}, provider) {}
file_manager::open_file_base::open_file_base(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider)
: chunk_size_(chunk_size),
chunk_timeout_(chunk_timeout),
fsi_(std::move(fsi)),
last_chunk_size_(static_cast<std::size_t>(
fsi.size <= chunk_size ? fsi.size
: (fsi.size % chunk_size) == 0U ? chunk_size
: fsi.size % chunk_size)),
open_data_(std::move(open_data)),
provider_(provider) {
if (not fsi.directory) {
io_thread_ = std::make_unique<std::thread>([this] { file_io_thread(); });
}
}
void file_manager::open_file_base::add(std::uint64_t handle,
open_file_data ofd) {
recur_mutex_lock file_lock(file_mtx_);
open_data_[handle] = ofd;
if (open_data_.size() == 1U) {
event_system::instance().raise<filesystem_item_opened>(
fsi_.api_path, fsi_.source_path, fsi_.directory);
}
event_system::instance().raise<filesystem_item_handle_opened>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory);
}
auto file_manager::open_file_base::can_close() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
if (fsi_.directory) {
return true;
}
if (not open_data_.empty()) {
return false;
}
if (modified_) {
return false;
}
if (get_api_error() != api_error::success) {
return true;
}
if (is_download_complete()) {
return true;
}
if (provider_.is_direct_only()) {
return true;
}
const std::chrono::system_clock::time_point last_access = last_access_;
const auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - last_access);
return (duration.count() >= chunk_timeout_);
}
auto file_manager::open_file_base::do_io(std::function<api_error()> action)
-> api_error {
unique_mutex_lock io_lock(io_thread_mtx_);
auto item = std::make_shared<io_item>(action);
io_thread_queue_.emplace_back(item);
io_thread_notify_.notify_all();
io_lock.unlock();
return item->get_result();
}
void file_manager::open_file_base::file_io_thread() {
unique_mutex_lock io_lock(io_thread_mtx_);
io_thread_notify_.notify_all();
io_lock.unlock();
const auto process_queue = [&]() {
io_lock.lock();
if (not io_stop_requested_ && io_thread_queue_.empty()) {
io_thread_notify_.wait(io_lock);
}
while (not io_thread_queue_.empty()) {
auto *item = io_thread_queue_.front().get();
io_thread_notify_.notify_all();
io_lock.unlock();
item->action();
io_lock.lock();
io_thread_queue_.pop_front();
}
io_thread_notify_.notify_all();
io_lock.unlock();
};
while (not io_stop_requested_) {
process_queue();
}
process_queue();
}
auto file_manager::open_file_base::get_api_error() const -> api_error {
mutex_lock error_lock(error_mtx_);
return error_;
}
auto file_manager::open_file_base::get_api_path() const -> std::string {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.api_path;
}
auto file_manager::open_file_base::get_file_size() const -> std::uint64_t {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.size;
}
auto file_manager::open_file_base::get_filesystem_item() const
-> filesystem_item {
recur_mutex_lock file_lock(file_mtx_);
return fsi_;
}
auto file_manager::open_file_base::get_handles() const
-> std::vector<std::uint64_t> {
recur_mutex_lock file_lock(file_mtx_);
std::vector<std::uint64_t> ret;
for (auto &&item : open_data_) {
ret.emplace_back(item.first);
}
return ret;
}
auto file_manager::open_file_base::get_open_data()
-> std::map<std::uint64_t, open_file_data> & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_;
}
auto file_manager::open_file_base::get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_;
}
auto file_manager::open_file_base::get_open_data(std::uint64_t handle)
-> open_file_data & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.at(handle);
}
auto file_manager::open_file_base::get_open_data(std::uint64_t handle) const
-> const open_file_data & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.at(handle);
}
auto file_manager::open_file_base::get_open_file_count() const -> std::size_t {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.size();
}
auto file_manager::open_file_base::is_modified() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return modified_;
}
void file_manager::open_file_base::remove(std::uint64_t handle) {
recur_mutex_lock file_lock(file_mtx_);
open_data_.erase(handle);
event_system::instance().raise<filesystem_item_handle_closed>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory, modified_);
if (open_data_.empty()) {
event_system::instance().raise<filesystem_item_closed>(
fsi_.api_path, fsi_.source_path, fsi_.directory, modified_);
}
}
void file_manager::open_file_base::reset_timeout() {
last_access_ = std::chrono::system_clock::now();
}
auto file_manager::open_file_base::set_api_error(const api_error &err)
-> api_error {
mutex_lock error_lock(error_mtx_);
if (error_ != err) {
return ((error_ = (error_ == api_error::success ||
error_ == api_error::download_incomplete ||
error_ == api_error::download_stopped
? err
: error_)));
}
return error_;
}
void file_manager::open_file_base::set_api_path(const std::string &api_path) {
recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path = api_path;
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
}
auto file_manager::open_file_base::close() -> bool {
unique_mutex_lock io_lock(io_thread_mtx_);
if (not fsi_.directory && not io_stop_requested_) {
io_stop_requested_ = true;
io_thread_notify_.notify_all();
io_lock.unlock();
if (io_thread_) {
io_thread_->join();
io_thread_.reset();
return true;
}
return false;
}
io_thread_notify_.notify_all();
io_lock.unlock();
return false;
}
} // namespace repertory
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/open_file_base.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
#include "providers/i_provider.hpp"
#include "utils/path.hpp"
namespace repertory {
void open_file_base::download::notify(const api_error &err) {
complete_ = true;
error_ = err;
unique_mutex_lock lock(mtx_);
notify_.notify_all();
}
auto open_file_base::download::wait() -> api_error {
if (not complete_) {
unique_mutex_lock lock(mtx_);
if (not complete_) {
notify_.wait(lock);
}
notify_.notify_all();
}
return error_;
}
void open_file_base::io_item::action() {
result_ = action_();
mutex_lock lock(mtx_);
notify_.notify_all();
}
auto open_file_base::io_item::get_result() -> api_error {
unique_mutex_lock lock(mtx_);
if (result_.has_value()) {
return result_.value();
}
notify_.wait(lock);
return result_.value_or(api_error::error);
}
open_file_base::open_file_base(std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, {}, provider) {}
open_file_base::open_file_base(
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider)
: chunk_size_(chunk_size),
chunk_timeout_(chunk_timeout),
fsi_(std::move(fsi)),
last_chunk_size_(static_cast<std::size_t>(
fsi.size <= chunk_size ? fsi.size
: (fsi.size % chunk_size) == 0U ? chunk_size
: fsi.size % chunk_size)),
open_data_(std::move(open_data)),
provider_(provider) {
if (not fsi.directory) {
io_thread_ = std::make_unique<std::thread>([this] { file_io_thread(); });
}
}
void open_file_base::add(std::uint64_t handle, open_file_data ofd) {
recur_mutex_lock file_lock(file_mtx_);
open_data_[handle] = ofd;
if (open_data_.size() == 1U) {
event_system::instance().raise<filesystem_item_opened>(
fsi_.api_path, fsi_.source_path, fsi_.directory);
}
event_system::instance().raise<filesystem_item_handle_opened>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory);
}
auto open_file_base::can_close() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
if (fsi_.directory) {
return true;
}
if (not open_data_.empty()) {
return false;
}
if (modified_) {
return false;
}
if (get_api_error() != api_error::success) {
return true;
}
if (is_download_complete()) {
return true;
}
if (provider_.is_direct_only()) {
return true;
}
const std::chrono::system_clock::time_point last_access = last_access_;
const auto duration = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now() - last_access);
return (duration.count() >= chunk_timeout_);
}
auto open_file_base::do_io(std::function<api_error()> action) -> api_error {
unique_mutex_lock io_lock(io_thread_mtx_);
auto item = std::make_shared<io_item>(action);
io_thread_queue_.emplace_back(item);
io_thread_notify_.notify_all();
io_lock.unlock();
return item->get_result();
}
void open_file_base::file_io_thread() {
unique_mutex_lock io_lock(io_thread_mtx_);
io_thread_notify_.notify_all();
io_lock.unlock();
const auto process_queue = [&]() {
io_lock.lock();
if (not io_stop_requested_ && io_thread_queue_.empty()) {
io_thread_notify_.wait(io_lock);
}
while (not io_thread_queue_.empty()) {
auto *item = io_thread_queue_.front().get();
io_thread_notify_.notify_all();
io_lock.unlock();
item->action();
io_lock.lock();
io_thread_queue_.pop_front();
}
io_thread_notify_.notify_all();
io_lock.unlock();
};
while (not io_stop_requested_) {
process_queue();
}
process_queue();
}
auto open_file_base::get_api_error() const -> api_error {
mutex_lock error_lock(error_mtx_);
return error_;
}
auto open_file_base::get_api_path() const -> std::string {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.api_path;
}
auto open_file_base::get_file_size() const -> std::uint64_t {
recur_mutex_lock file_lock(file_mtx_);
return fsi_.size;
}
auto open_file_base::get_filesystem_item() const -> filesystem_item {
recur_mutex_lock file_lock(file_mtx_);
return fsi_;
}
auto open_file_base::get_handles() const -> std::vector<std::uint64_t> {
recur_mutex_lock file_lock(file_mtx_);
std::vector<std::uint64_t> ret;
for (auto &&item : open_data_) {
ret.emplace_back(item.first);
}
return ret;
}
auto open_file_base::get_open_data()
-> std::map<std::uint64_t, open_file_data> & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_;
}
auto open_file_base::get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_;
}
auto open_file_base::get_open_data(std::uint64_t handle) -> open_file_data & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.at(handle);
}
auto open_file_base::get_open_data(std::uint64_t handle) const
-> const open_file_data & {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.at(handle);
}
auto open_file_base::get_open_file_count() const -> std::size_t {
recur_mutex_lock file_lock(file_mtx_);
return open_data_.size();
}
auto open_file_base::is_modified() const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return modified_;
}
void open_file_base::remove(std::uint64_t handle) {
recur_mutex_lock file_lock(file_mtx_);
open_data_.erase(handle);
event_system::instance().raise<filesystem_item_handle_closed>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory, modified_);
if (open_data_.empty()) {
event_system::instance().raise<filesystem_item_closed>(
fsi_.api_path, fsi_.source_path, fsi_.directory, modified_);
}
}
void open_file_base::reset_timeout() {
last_access_ = std::chrono::system_clock::now();
}
auto open_file_base::set_api_error(const api_error &err) -> api_error {
mutex_lock error_lock(error_mtx_);
if (error_ != err) {
return ((error_ = (error_ == api_error::success ||
error_ == api_error::download_incomplete ||
error_ == api_error::download_stopped
? err
: error_)));
}
return error_;
}
void open_file_base::set_api_path(const std::string &api_path) {
recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path = api_path;
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
}
auto open_file_base::close() -> bool {
unique_mutex_lock io_lock(io_thread_mtx_);
if (not fsi_.directory && not io_stop_requested_) {
io_stop_requested_ = true;
io_thread_notify_.notify_all();
io_lock.unlock();
if (io_thread_) {
io_thread_->join();
io_thread_.reset();
return true;
}
return false;
}
io_thread_notify_.notify_all();
io_lock.unlock();
return false;
}
} // namespace repertory

View File

@ -1,322 +1,323 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "app_config.hpp"
#include "file_manager/events.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/encrypting_reader.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp"
#include "utils/utils.hpp"
namespace repertory {
file_manager::ring_buffer_open_file::ring_buffer_open_file(
std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider)
: ring_buffer_open_file(std::move(buffer_directory), chunk_size,
chunk_timeout, std::move(fsi), provider,
(1024ULL * 1024ULL * 1024ULL) / chunk_size) {}
file_manager::ring_buffer_open_file::ring_buffer_open_file(
std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider,
std::size_t ring_size)
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size_))) {
if ((ring_size % 2U) != 0U) {
throw std::runtime_error("ring size must be a multiple of 2");
}
if (ring_size < 4U) {
throw std::runtime_error("ring size must be greater than or equal to 4");
}
if (fsi.size < (ring_state_.size() * chunk_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
last_chunk_ = ring_state_.size() - 1U;
ring_state_.set(0U, ring_state_.size(), true);
buffer_directory = utils::path::absolute(buffer_directory);
if (not utils::file::directory(buffer_directory).create_directory()) {
throw std::runtime_error("failed to create buffer directory|path|" +
buffer_directory + "|err|" +
std::to_string(utils::get_last_error_code()));
}
fsi_.source_path =
utils::path::combine(buffer_directory, {utils::create_uuid_string()});
nf_ = utils::file::file::open_or_create_file(fsi_.source_path);
if (not*nf_) {
throw std::runtime_error("failed to create buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * chunk_size)) {
nf_->close();
throw std::runtime_error("failed to resize buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
}
file_manager::ring_buffer_open_file::~ring_buffer_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
nf_->close();
if (not utils::file::file(fsi_.source_path).remove()) {
utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to delete file");
}
}
auto file_manager::file_manager::ring_buffer_open_file::download_chunk(
std::size_t chunk) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
if (active_downloads_.find(chunk) != active_downloads_.end()) {
auto active_download = active_downloads_.at(chunk);
chunk_notify_.notify_all();
chunk_lock.unlock();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
auto active_download = std::make_shared<download>();
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all();
chunk_lock.unlock();
data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_
: chunk_size_);
stop_type stop_requested = !!ring_state_[chunk % ring_state_.size()];
auto res =
provider_.read_file_bytes(fsi_.api_path, buffer.size(),
chunk * chunk_size_, buffer, stop_requested);
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (not nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_,
&bytes_written)) {
return api_error::os_error;
}
return api_error::success;
});
}
active_download->notify(res);
chunk_lock.lock();
active_downloads_.erase(chunk);
chunk_notify_.notify_all();
return res;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
return api_error::success;
}
void file_manager::ring_buffer_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((current_chunk_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - current_chunk_;
}
if ((current_chunk_ + count) <= last_chunk_) {
current_chunk_ += count;
} else {
const auto added = count - (last_chunk_ - current_chunk_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
current_chunk_ += count;
first_chunk_ += added;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true;
}
first_chunk_ += added;
current_chunk_ += count;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
}
}
chunk_notify_.notify_all();
}
auto file_manager::ring_buffer_open_file::get_read_state() const
-> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
return read_state.flip();
}
auto file_manager::ring_buffer_open_file::get_read_state(
std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
}
auto file_manager::ring_buffer_open_file::is_download_complete() const -> bool {
return false;
}
auto file_manager::ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
void file_manager::ring_buffer_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if (current_chunk_ < count) {
count = current_chunk_;
}
if ((current_chunk_ - count) >= first_chunk_) {
current_chunk_ -= count;
} else {
const auto removed = count - (current_chunk_ - first_chunk_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
current_chunk_ -= count;
first_chunk_ = current_chunk_;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true;
}
first_chunk_ -= removed;
current_chunk_ -= count;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
}
}
chunk_notify_.notify_all();
}
auto file_manager::ring_buffer_open_file::read(std::size_t read_size,
std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
reset_timeout();
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
const auto start_chunk_index =
static_cast<std::size_t>(read_offset / chunk_size_);
read_offset = read_offset - (start_chunk_index * chunk_size_);
data_buffer buffer(chunk_size_);
auto res = api_error::success;
for (std::size_t chunk = start_chunk_index;
(res == api_error::success) && (read_size > 0U); ++chunk) {
if (chunk > current_chunk_) {
forward(chunk - current_chunk_);
} else if (chunk < current_chunk_) {
reverse(current_chunk_ - chunk);
}
reset_timeout();
res = download_chunk(chunk);
if (res == api_error::success) {
const auto to_read = std::min(
static_cast<std::size_t>(chunk_size_ - read_offset), read_size);
res = do_io([this, &buffer, &chunk, &data, read_offset,
&to_read]() -> api_error {
std::size_t bytes_read{};
auto ret =
nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_),
&bytes_read)
? api_error::success
: api_error::os_error;
if (ret == api_error::success) {
data.insert(data.end(),
buffer.begin() + static_cast<std::int64_t>(read_offset),
buffer.begin() +
static_cast<std::int64_t>(read_offset + to_read));
reset_timeout();
}
return ret;
});
read_offset = 0U;
read_size -= to_read;
}
}
return res;
}
void file_manager::ring_buffer_open_file::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
first_chunk_ = first_chunk;
last_chunk_ = first_chunk_ + ring_state_.size() - 1U;
if (current_chunk > last_chunk_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
current_chunk_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), false);
chunk_notify_.notify_all();
}
void file_manager::ring_buffer_open_file::set_api_path(
const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
} // namespace repertory
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/ring_buffer_open_file.hpp"
#include "app_config.hpp"
#include "file_manager/events.hpp"
#include "file_manager/open_file_base.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/encrypting_reader.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp"
#include "utils/utils.hpp"
namespace repertory {
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider)
: ring_buffer_open_file(std::move(buffer_directory), chunk_size,
chunk_timeout, std::move(fsi), provider,
(1024ULL * 1024ULL * 1024ULL) / chunk_size) {}
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider,
std::size_t ring_size)
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size_))) {
if ((ring_size % 2U) != 0U) {
throw std::runtime_error("ring size must be a multiple of 2");
}
if (ring_size < 4U) {
throw std::runtime_error("ring size must be greater than or equal to 4");
}
if (fsi.size < (ring_state_.size() * chunk_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
last_chunk_ = ring_state_.size() - 1U;
ring_state_.set(0U, ring_state_.size(), true);
buffer_directory = utils::path::absolute(buffer_directory);
if (not utils::file::directory(buffer_directory).create_directory()) {
throw std::runtime_error("failed to create buffer directory|path|" +
buffer_directory + "|err|" +
std::to_string(utils::get_last_error_code()));
}
fsi_.source_path =
utils::path::combine(buffer_directory, {utils::create_uuid_string()});
nf_ = utils::file::file::open_or_create_file(fsi_.source_path);
if (not*nf_) {
throw std::runtime_error("failed to create buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * chunk_size)) {
nf_->close();
throw std::runtime_error("failed to resize buffer file|err|" +
std::to_string(utils::get_last_error_code()));
}
}
ring_buffer_open_file::~ring_buffer_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
nf_->close();
if (not utils::file::file(fsi_.source_path).remove()) {
utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to delete file");
}
}
auto ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
if (active_downloads_.find(chunk) != active_downloads_.end()) {
auto active_download = active_downloads_.at(chunk);
chunk_notify_.notify_all();
chunk_lock.unlock();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
auto active_download = std::make_shared<download>();
active_downloads_[chunk] = active_download;
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all();
chunk_lock.unlock();
data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_
: chunk_size_);
stop_type stop_requested = !!ring_state_[chunk % ring_state_.size()];
auto res =
provider_.read_file_bytes(fsi_.api_path, buffer.size(),
chunk * chunk_size_, buffer, stop_requested);
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (not nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_,
&bytes_written)) {
return api_error::os_error;
}
return api_error::success;
});
}
active_download->notify(res);
chunk_lock.lock();
active_downloads_.erase(chunk);
chunk_notify_.notify_all();
return res;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
return api_error::success;
}
void ring_buffer_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((current_chunk_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - current_chunk_;
}
if ((current_chunk_ + count) <= last_chunk_) {
current_chunk_ += count;
} else {
const auto added = count - (last_chunk_ - current_chunk_);
if (added >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
current_chunk_ += count;
first_chunk_ += added;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
} else {
for (std::size_t idx = 0U; idx < added; ++idx) {
ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true;
}
first_chunk_ += added;
current_chunk_ += count;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
}
}
chunk_notify_.notify_all();
}
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
return read_state.flip();
}
auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
}
auto ring_buffer_open_file::is_download_complete() const -> bool {
return false;
}
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
void ring_buffer_open_file::reverse(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if (current_chunk_ < count) {
count = current_chunk_;
}
if ((current_chunk_ - count) >= first_chunk_) {
current_chunk_ -= count;
} else {
const auto removed = count - (current_chunk_ - first_chunk_);
if (removed >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), true);
current_chunk_ -= count;
first_chunk_ = current_chunk_;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
} else {
for (std::size_t idx = 0U; idx < removed; ++idx) {
ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true;
}
first_chunk_ -= removed;
current_chunk_ -= count;
last_chunk_ =
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
}
}
chunk_notify_.notify_all();
}
auto ring_buffer_open_file::read(std::size_t read_size,
std::uint64_t read_offset, data_buffer &data)
-> api_error {
if (fsi_.directory) {
return api_error::invalid_operation;
}
reset_timeout();
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
const auto start_chunk_index =
static_cast<std::size_t>(read_offset / chunk_size_);
read_offset = read_offset - (start_chunk_index * chunk_size_);
data_buffer buffer(chunk_size_);
auto res = api_error::success;
for (std::size_t chunk = start_chunk_index;
(res == api_error::success) && (read_size > 0U); ++chunk) {
if (chunk > current_chunk_) {
forward(chunk - current_chunk_);
} else if (chunk < current_chunk_) {
reverse(current_chunk_ - chunk);
}
reset_timeout();
res = download_chunk(chunk);
if (res == api_error::success) {
const auto to_read = std::min(
static_cast<std::size_t>(chunk_size_ - read_offset), read_size);
res = do_io([this, &buffer, &chunk, &data, read_offset,
&to_read]() -> api_error {
std::size_t bytes_read{};
auto ret =
nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_),
&bytes_read)
? api_error::success
: api_error::os_error;
if (ret == api_error::success) {
data.insert(data.end(),
buffer.begin() + static_cast<std::int64_t>(read_offset),
buffer.begin() +
static_cast<std::int64_t>(read_offset + to_read));
reset_timeout();
}
return ret;
});
read_offset = 0U;
read_size -= to_read;
}
}
return res;
}
void ring_buffer_open_file::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
first_chunk_ = first_chunk;
last_chunk_ = first_chunk_ + ring_state_.size() - 1U;
if (current_chunk > last_chunk_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
current_chunk_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), false);
chunk_notify_.notify_all();
}
void ring_buffer_open_file::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
} // namespace repertory

View File

@ -1,65 +1,65 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
namespace repertory {
using std::bind;
file_manager::upload::upload(filesystem_item fsi, i_provider &provider)
: fsi_(std::move(fsi)), provider_(provider) {
thread_ = std::make_unique<std::thread>([this] { upload_thread(); });
}
file_manager::upload::~upload() {
stop();
thread_->join();
thread_.reset();
}
void file_manager::upload::cancel() {
cancelled_ = true;
stop();
}
void file_manager::upload::stop() { stop_requested_ = true; }
void file_manager::upload::upload_thread() {
REPERTORY_USES_FUNCTION_NAME();
error_ =
provider_.upload_file(fsi_.api_path, fsi_.source_path, stop_requested_);
if (not utils::file::reset_modified_time(fsi_.source_path)) {
utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to reset modified time");
}
event_system::instance().raise<file_upload_completed>(
get_api_path(), get_source_path(), get_api_error(), cancelled_);
}
} // namespace repertory
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/upload.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
namespace repertory {
upload::upload(filesystem_item fsi, i_provider &provider)
: fsi_(std::move(fsi)), provider_(provider) {
thread_ = std::make_unique<std::thread>([this] { upload_thread(); });
}
upload::~upload() {
stop();
thread_->join();
thread_.reset();
}
void upload::cancel() {
cancelled_ = true;
stop();
}
void upload::stop() { stop_requested_ = true; }
void upload::upload_thread() {
REPERTORY_USES_FUNCTION_NAME();
error_ =
provider_.upload_file(fsi_.api_path, fsi_.source_path, stop_requested_);
if (not utils::file::reset_modified_time(fsi_.source_path)) {
utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to reset modified time");
}
event_system::instance().raise<file_upload_completed>(
get_api_path(), get_source_path(), get_api_error(), cancelled_);
}
} // namespace repertory

File diff suppressed because it is too large Load Diff

View File

@ -1,182 +1,184 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "test_common.hpp"
#include "file_manager/file_manager.hpp"
#include "mocks/mock_provider.hpp"
#include "utils/event_capture.hpp"
namespace repertory {
static constexpr const std::size_t test_chunk_size{1024U};
TEST(upload, can_upload_a_valid_file) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_prov;
EXPECT_CALL(mock_prov, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_com("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("success", comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", comp_evt.get_cancelled().get<std::string>().c_str());
});
EXPECT_CALL(mock_prov, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested);
return api_error::success;
});
file_manager::upload upload(fsi, mock_prov);
event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty();
EXPECT_EQ(api_error::success, upload.get_api_error());
EXPECT_FALSE(upload.is_cancelled());
event_system::instance().stop();
}
TEST(upload, can_cancel_upload) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error",
comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("1", comp_evt.get_cancelled().get<std::string>().c_str());
});
std::mutex mtx;
std::condition_variable notify;
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([&notify, &mtx](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested);
unique_mutex_lock lock(mtx);
notify.notify_one();
lock.unlock();
lock.lock();
notify.wait(lock);
lock.unlock();
EXPECT_TRUE(stop_requested);
return api_error::comm_error;
});
unique_mutex_lock lock(mtx);
file_manager::upload upload(fsi, mock_provider);
notify.wait(lock);
upload.cancel();
notify.notify_one();
lock.unlock();
event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty();
EXPECT_EQ(api_error::comm_error, upload.get_api_error());
EXPECT_TRUE(upload.is_cancelled());
event_system::instance().stop();
}
TEST(upload, can_stop_upload) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &evt_com = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
evt_com.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
evt_com.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error", evt_com.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", evt_com.get_cancelled().get<std::string>().c_str());
});
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
std::this_thread::sleep_for(3s);
EXPECT_TRUE(stop_requested);
return api_error::comm_error;
});
event_capture evt_cap({"file_upload_completed"});
{ file_manager::upload upload(fsi, mock_provider); }
evt_cap.wait_for_empty();
event_system::instance().stop();
}
} // namespace repertory
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "test_common.hpp"
#include "file_manager/upload.hpp"
#include "mocks/mock_provider.hpp"
#include "utils/event_capture.hpp"
namespace repertory {
static constexpr const std::size_t test_chunk_size{1024U};
TEST(upload, can_upload_a_valid_file) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_prov;
EXPECT_CALL(mock_prov, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_com("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("success", comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", comp_evt.get_cancelled().get<std::string>().c_str());
});
EXPECT_CALL(mock_prov, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested);
return api_error::success;
});
upload upload(fsi, mock_prov);
event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty();
EXPECT_EQ(api_error::success, upload.get_api_error());
EXPECT_FALSE(upload.is_cancelled());
event_system::instance().stop();
}
TEST(upload, can_cancel_upload) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error",
comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("1", comp_evt.get_cancelled().get<std::string>().c_str());
});
std::mutex mtx;
std::condition_variable notify;
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([&notify, &mtx](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested);
unique_mutex_lock lock(mtx);
notify.notify_one();
lock.unlock();
lock.lock();
notify.wait(lock);
lock.unlock();
EXPECT_TRUE(stop_requested);
return api_error::comm_error;
});
unique_mutex_lock lock(mtx);
upload upload(fsi, mock_provider);
notify.wait(lock);
upload.cancel();
notify.notify_one();
lock.unlock();
event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty();
EXPECT_EQ(api_error::comm_error, upload.get_api_error());
EXPECT_TRUE(upload.is_cancelled());
event_system::instance().stop();
}
TEST(upload, can_stop_upload) {
console_consumer con;
event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi;
fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &evt_com = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(),
evt_com.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(),
evt_com.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error", evt_com.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", evt_com.get_cancelled().get<std::string>().c_str());
});
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error {
std::this_thread::sleep_for(3s);
EXPECT_TRUE(stop_requested);
return api_error::comm_error;
});
event_capture evt_cap({"file_upload_completed"});
{
upload upload(fsi, mock_provider);
}
evt_cap.wait_for_empty();
event_system::instance().stop();
}
} // namespace repertory

View File

@ -28,17 +28,13 @@
namespace repertory::utils::db::sqlite {
using db_types_t = std::variant<std::int64_t, std::string>;
struct sqlite3_deleter {
void operator()(sqlite3 *db3) const {
if (db3 != nullptr) {
sqlite3_close_v2(db3);
}
}
struct sqlite3_deleter final {
void operator()(sqlite3 *db3) const;
};
using db3_t = std::unique_ptr<sqlite3, sqlite3_deleter>;
struct sqlite3_statement_deleter {
struct sqlite3_statement_deleter final {
void operator()(sqlite3_stmt *stmt) const {
if (stmt != nullptr) {
sqlite3_finalize(stmt);
@ -104,16 +100,7 @@ public:
}
#if defined(PROJECT_ENABLE_JSON)
[[nodiscard]] auto get_value_as_json() const -> nlohmann::json {
return std::visit(
overloaded{
[this](std::int64_t value) -> auto {
return nlohmann::json({{name_, value}});
},
[](auto &&value) -> auto { return nlohmann::json::parse(value); },
},
value_);
}
[[nodiscard]] auto get_value_as_json() const -> nlohmann::json;
#endif // defined(PROJECT_ENABLE_JSON)
};
@ -211,7 +198,8 @@ public:
[[nodiscard]] auto get_error() const -> std::int32_t { return res_; }
[[nodiscard]] auto get_error_str() const -> std::string {
return sqlite3_errstr(res_);
auto &&err_msg = sqlite3_errstr(res_);
return err_msg == nullptr ? std::to_string(res_) : err_msg;
}
[[nodiscard]] auto get_row(std::optional<db_row<ctx_t>> &row) const -> bool {

View File

@ -157,8 +157,8 @@ template <typename ctx_t, typename op_t> struct db_where_t final {
using action_t = std::variant<db_comp_data_t, n_t, db_where_t>;
[[nodiscard]] static auto dump(std::int32_t &idx,
auto &&actions) -> std::string {
[[nodiscard]] static auto dump(std::int32_t &idx, auto &&actions)
-> std::string {
std::stringstream stream;
for (auto &&action : actions) {

View File

@ -48,7 +48,7 @@ protected:
struct iostream_exception_handler final : i_exception_handler {
void handle_error(std::string_view function_name,
std::string_view msg) const override {
std::cerr << function_name << '|' << msg;
std::cerr << function_name << '|' << msg << std::endl;
}
void handle_exception(std::string_view function_name) const override {
@ -72,8 +72,7 @@ extern std::atomic<const i_exception_handler *> exception_handler;
}
#endif // defined(PROJECT_ENABLE_TESTING)
[[nodiscard]] auto create_error_message(std::string_view function_name,
std::vector<std::string_view> items)
[[nodiscard]] auto create_error_message(std::vector<std::string_view> items)
-> std::string;
void handle_error(std::string_view function_name, std::string_view msg);

View File

@ -19,13 +19,60 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_common.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_common.hpp"
#include "utils/common.hpp"
#include "utils/error.hpp"
namespace repertory::utils::db::sqlite {
auto execute_sql(sqlite3 &db3, const std::string &sql,
std::string &err) -> bool {
void sqlite3_deleter::operator()(sqlite3 *db3) const {
REPERTORY_USES_FUNCTION_NAME();
if (db3 == nullptr) {
return;
}
utils::error::handle_error(function_name, "closing database handle");
if (not utils::retry_action(
[&db3]() -> bool {
auto res = sqlite3_close_v2(db3);
if (res == SQLITE_OK) {
return true;
}
auto &&err_str = sqlite3_errstr(res);
utils::error::handle_error(
function_name,
utils::error::create_error_message({
"failed to close database",
(err_str == nullptr ? std::to_string(res) : err_str),
}));
return false;
},
60U)) {
repertory::utils::error::handle_error(function_name,
"failed to close database");
}
}
#if defined(PROJECT_ENABLE_JSON)
auto db_column::get_value_as_json() const -> nlohmann::json {
return std::visit(
overloaded{
[this](std::int64_t value) -> auto {
return nlohmann::json({{name_, value}});
},
[](auto &&value) -> auto { return nlohmann::json::parse(value); },
},
value_);
}
#endif // defined(PROJECT_ENABLE_JSON)
auto execute_sql(sqlite3 &db3, const std::string &sql, std::string &err)
-> bool {
char *err_msg{nullptr};
auto res = sqlite3_exec(&db3, sql.c_str(), nullptr, nullptr, &err_msg);
if (err_msg != nullptr) {

View File

@ -19,10 +19,10 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_delete.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_delete.hpp"
namespace repertory::utils::db::sqlite {
void db_delete::context::clear() { where_data.reset(); }

View File

@ -19,10 +19,10 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_insert.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_insert.hpp"
namespace repertory::utils::db::sqlite {
void db_insert::context::clear() { values.clear(); }

View File

@ -19,10 +19,10 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_select.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_select.hpp"
namespace repertory::utils::db::sqlite {
void db_select::context::clear() {
columns.clear();

View File

@ -19,10 +19,10 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/db/sqlite/db_update.hpp"
#if defined(PROJECT_ENABLE_SQLITE)
#include "utils/db/sqlite/db_update.hpp"
namespace repertory::utils::db::sqlite {
void db_update::context::clear() {
column_values.clear();

View File

@ -19,6 +19,8 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#if defined(PROJECT_ENABLE_LIBSODIUM) && defined(PROJECT_ENABLE_BOOST)
#include "utils/encrypting_reader.hpp"
#include "utils/collection.hpp"
@ -29,8 +31,6 @@
#include "utils/unix.hpp"
#include "utils/windows.hpp"
#if defined(PROJECT_ENABLE_LIBSODIUM) && defined(PROJECT_ENABLE_BOOST)
#if !defined(CURL_READFUNC_ABORT)
#define CURL_READFUNC_ABORT (-1)
#endif // !defined(CURL_READFUNC_ABORT)

View File

@ -19,13 +19,13 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#if defined(PROJECT_ENABLE_LIBSODIUM) && defined(PROJECT_ENABLE_BOOST)
#include "utils/encryption.hpp"
#include "utils/collection.hpp"
#include "utils/encrypting_reader.hpp"
#if defined(PROJECT_ENABLE_LIBSODIUM) && defined(PROJECT_ENABLE_BOOST)
namespace repertory::utils::encryption {
auto decrypt_file_path(std::string_view encryption_token,
std::string &file_path) -> bool {

View File

@ -25,12 +25,14 @@ namespace repertory::utils::error {
std::atomic<const i_exception_handler *> exception_handler{
&default_exception_handler};
auto create_error_message(std::string_view function_name,
std::vector<std::string_view> items) -> std::string {
auto create_error_message(std::vector<std::string_view> items) -> std::string {
std::stringstream stream{};
stream << function_name;
for (auto &&item : items) {
stream << '|' << item;
for (std::size_t idx = 0U; idx < items.size(); ++idx) {
if (idx > 0) {
stream << '|';
}
stream << items.at(idx);
}
return stream.str();

View File

@ -377,10 +377,16 @@ auto directory::remove() -> bool {
return utils::retry_action([this]() -> bool {
try {
#if defined(_WIN32)
return not exists() || (::RemoveDirectoryA(path_.c_str()) != 0);
auto ret = not exists() || (::RemoveDirectoryA(path_.c_str()) != 0);
#else // !defined(_WIN32)
return not exists() || (rmdir(path_.c_str()) == 0);
auto ret = not exists() || (rmdir(path_.c_str()) == 0);
#endif // defined(_WIN32)
if (not ret) {
utils::error::handle_error(function_name,
"failed to remove directory|" + path_);
}
return ret;
} catch (const std::exception &e) {
utils::error::handle_exception(function_name, e);
} catch (...) {

View File

@ -397,15 +397,31 @@ auto file::sha256() -> std::optional<std::string> {
#endif // defined(PROJECT_ENABLE_LIBSODIUM)
auto file::remove() -> bool {
REPERTORY_USES_FUNCTION_NAME();
close();
return utils::retry_action([this]() -> bool {
try {
#if defined(_WIN32)
return not exists() || (::DeleteFileA(path_.c_str()) != 0);
auto ret = not exists() || (::DeleteFileA(path_.c_str()) != 0);
#else // !defined(_WIN32)
std::error_code ec{};
return not exists() || std::filesystem::remove(path_, ec);
std::error_code ec{};
auto ret = not exists() || std::filesystem::remove(path_, ec);
#endif // defined(_WIN32)
if (not ret) {
utils::error::handle_error(function_name,
"failed to remove file|" + path_);
}
return ret;
} catch (const std::exception &e) {
utils::error::handle_exception(function_name, e);
} catch (...) {
utils::error::handle_exception(function_name);
}
return false;
});
}
@ -458,17 +474,18 @@ auto file::write(const unsigned char *data, std::size_t to_write,
std::size_t bytes_written{0U};
while (bytes_written != to_write) {
res = fwrite(reinterpret_cast<const char *>(&data[bytes_written]), 1U,
to_write - bytes_written, file_.get());
auto written =
fwrite(reinterpret_cast<const char *>(&data[bytes_written]), 1U,
to_write - bytes_written, file_.get());
if (not feof(file_.get()) && ferror(file_.get())) {
throw std::runtime_error("failed to write file bytes");
}
if (res == 0) {
if (written == 0U) {
break;
}
bytes_written += static_cast<std::size_t>(res);
bytes_written += written;
}
flush();

View File

@ -19,10 +19,10 @@
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "utils/hash.hpp"
#if defined(PROJECT_ENABLE_LIBSODIUM)
#include "utils/hash.hpp"
namespace repertory::utils::encryption {
auto create_hash_blake2b_256(std::string_view data) -> hash_256_t {
return create_hash_blake2b_t<hash_256_t>(

View File

@ -20,6 +20,7 @@
SOFTWARE.
*/
#if defined(_WIN32)
#include "utils/windows.hpp"
#include "utils/com_init_wrapper.hpp"
@ -27,7 +28,7 @@
namespace repertory::utils {
void create_console() {
if (AllocConsole() == 0) {
if (::AllocConsole() == 0) {
return;
}
@ -56,7 +57,7 @@ void create_console() {
}
void free_console() {
FreeConsole();
::FreeConsole();
}
auto get_last_error_code() -> DWORD { return ::GetLastError(); }