This commit is contained in:
Scott E. Graves 2024-10-18 14:22:24 -05:00
parent 48a1bef1ae
commit ad79c5daf5
17 changed files with 3263 additions and 3191 deletions

View File

@ -27,6 +27,7 @@
#include "file_manager/i_file_manager.hpp" #include "file_manager/i_file_manager.hpp"
#include "file_manager/i_open_file.hpp" #include "file_manager/i_open_file.hpp"
#include "file_manager/i_upload_manager.hpp" #include "file_manager/i_upload_manager.hpp"
#include "file_manager/upload.hpp"
#include "platform/platform.hpp" #include "platform/platform.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/db/sqlite/db_common.hpp" #include "utils/db/sqlite/db_common.hpp"
@ -39,408 +40,6 @@ class i_provider;
class file_manager final : public i_file_manager, public i_upload_manager { class file_manager final : public i_file_manager, public i_upload_manager {
E_CONSUMER(); E_CONSUMER();
public:
class open_file_base : public i_closeable_open_file {
public:
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider);
~open_file_base() override = default;
public:
open_file_base() = delete;
open_file_base(const open_file_base &) noexcept = delete;
open_file_base(open_file_base &&) noexcept = delete;
auto operator=(open_file_base &&) noexcept -> open_file_base & = delete;
auto operator=(const open_file_base &) noexcept
-> open_file_base & = delete;
public:
class download final {
public:
download() = default;
~download() = default;
public:
download(const download &) noexcept = delete;
download(download &&) noexcept = delete;
auto operator=(download &&) noexcept -> download & = delete;
auto operator=(const download &) noexcept -> download & = delete;
private:
bool complete_{false};
api_error error_{api_error::success};
std::mutex mtx_;
std::condition_variable notify_;
public:
void notify(const api_error &err);
auto wait() -> api_error;
};
class io_item final {
public:
io_item(std::function<api_error()> action) : action_(std::move(action)) {}
~io_item() = default;
public:
io_item() = delete;
io_item(const io_item &) noexcept = delete;
io_item(io_item &&) noexcept = delete;
auto operator=(io_item &&) noexcept -> io_item & = delete;
auto operator=(const io_item &) noexcept -> io_item & = delete;
private:
std::function<api_error()> action_;
std::mutex mtx_;
std::condition_variable notify_;
std::optional<api_error> result_;
public:
void action();
[[nodiscard]] auto get_result() -> api_error;
};
protected:
std::uint64_t chunk_size_;
std::uint8_t chunk_timeout_;
filesystem_item fsi_;
std::size_t last_chunk_size_;
std::map<std::uint64_t, open_file_data> open_data_;
i_provider &provider_;
private:
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
stop_type io_stop_requested_{false};
std::unique_ptr<std::thread> io_thread_;
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>>
active_downloads_;
mutable std::recursive_mutex file_mtx_;
std::atomic<std::chrono::system_clock::time_point> last_access_{
std::chrono::system_clock::now()};
bool modified_{false};
std::unique_ptr<utils::file::i_file> nf_;
mutable std::mutex io_thread_mtx_;
std::condition_variable io_thread_notify_;
std::deque<std::shared_ptr<io_item>> io_thread_queue_;
bool removed_{false};
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
virtual auto is_download_complete() const -> bool = 0;
void reset_timeout();
auto set_api_error(const api_error &e) -> api_error;
public:
void add(std::uint64_t handle, open_file_data ofd) override;
[[nodiscard]] auto can_close() const -> bool override;
auto close() -> bool override;
[[nodiscard]] auto get_api_error() const -> api_error;
[[nodiscard]] auto get_api_path() const -> std::string override;
[[nodiscard]] auto get_chunk_size() const -> std::size_t override {
return chunk_size_;
}
[[nodiscard]] auto get_file_size() const -> std::uint64_t override;
[[nodiscard]] auto get_filesystem_item() const -> filesystem_item override;
[[nodiscard]] auto get_handles() const
-> std::vector<std::uint64_t> override;
[[nodiscard]] auto get_open_data()
-> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle)
-> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle) const
-> const open_file_data & override;
[[nodiscard]] auto get_open_file_count() const -> std::size_t override;
[[nodiscard]] auto get_source_path() const -> std::string override {
return fsi_.source_path;
}
[[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override {
return open_data_.find(handle) != open_data_.end();
}
[[nodiscard]] auto is_directory() const -> bool override {
return fsi_.directory;
}
[[nodiscard]] auto is_modified() const -> bool override;
void remove(std::uint64_t handle) override;
void set_api_path(const std::string &api_path) override;
};
class open_file final : public open_file_base {
public:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
private:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
public:
open_file() = delete;
open_file(const open_file &) noexcept = delete;
open_file(open_file &&) noexcept = delete;
auto operator=(open_file &&) noexcept -> open_file & = delete;
auto operator=(const open_file &) noexcept -> open_file & = delete;
public:
~open_file() override;
private:
i_upload_manager &mgr_;
private:
bool notified_ = false;
std::size_t read_chunk_index_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
stop_type stop_requested_ = false;
private:
void download_chunk(std::size_t chunk, bool skip_active, bool should_reset);
void download_range(std::size_t start_chunk_index,
std::size_t end_chunk_index_inclusive,
bool should_reset);
void set_modified();
void update_background_reader(std::size_t read_chunk);
protected:
auto is_download_complete() const -> bool override {
return read_state_.all();
}
public:
auto close() -> bool override;
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto is_complete() const -> bool override;
auto is_write_supported() const -> bool override { return true; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t new_file_size,
native_operation_callback callback)
-> api_error override;
void remove(std::uint64_t handle) override;
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t new_file_size)
-> api_error override;
[[nodiscard]] auto write(std::uint64_t write_offset,
const data_buffer &data,
std::size_t &bytes_written) -> api_error override;
};
class ring_buffer_open_file final : public open_file_base {
public:
ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
ring_buffer_open_file(std::string buffer_directory,
std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size);
~ring_buffer_open_file() override;
public:
ring_buffer_open_file() = delete;
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
auto operator=(ring_buffer_open_file &&) noexcept
-> ring_buffer_open_file & = delete;
auto operator=(const ring_buffer_open_file &) noexcept
-> ring_buffer_open_file & = delete;
private:
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::unique_ptr<std::thread> chunk_forward_thread_;
std::unique_ptr<std::thread> chunk_reverse_thread_;
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::size_t current_chunk_{};
std::size_t first_chunk_{};
std::size_t last_chunk_;
private:
auto download_chunk(std::size_t chunk) -> api_error;
void forward_reader_thread(std::size_t count);
void reverse_reader_thread(std::size_t count);
protected:
auto is_download_complete() const -> bool override;
public:
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return current_chunk_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return first_chunk_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t {
return last_chunk_;
}
[[nodiscard]] auto get_read_state() const
-> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; }
auto is_write_supported() const -> bool override { return false; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t,
native_operation_callback)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
-> api_error override {
return api_error::not_supported;
}
};
class upload final {
public:
upload(filesystem_item fsi, i_provider &provider);
~upload();
public:
upload() = delete;
upload(const upload &) noexcept = delete;
upload(upload &&) noexcept = delete;
auto operator=(upload &&) noexcept -> upload & = delete;
auto operator=(const upload &) noexcept -> upload & = delete;
private:
filesystem_item fsi_;
i_provider &provider_;
private:
bool cancelled_{false};
api_error error_{api_error::success};
std::unique_ptr<std::thread> thread_;
stop_type stop_requested_{false};
private:
void upload_thread();
public:
void cancel();
[[nodiscard]] auto get_api_error() const -> api_error { return error_; }
[[nodiscard]] auto get_api_path() const -> std::string {
return fsi_.api_path;
}
[[nodiscard]] auto get_source_path() const -> std::string {
return fsi_.source_path;
}
[[nodiscard]] auto is_cancelled() const -> bool { return cancelled_; }
void stop();
};
public: public:
file_manager(app_config &config, i_provider &provider); file_manager(app_config &config, i_provider &provider);

View File

@ -0,0 +1,122 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class open_file final : public open_file_base {
public:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider, i_upload_manager &mgr);
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
private:
open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider,
std::optional<boost::dynamic_bitset<>> read_state,
i_upload_manager &mgr);
public:
open_file() = delete;
open_file(const open_file &) noexcept = delete;
open_file(open_file &&) noexcept = delete;
auto operator=(open_file &&) noexcept -> open_file & = delete;
auto operator=(const open_file &) noexcept -> open_file & = delete;
public:
~open_file() override;
private:
i_upload_manager &mgr_;
private:
bool notified_ = false;
std::size_t read_chunk_index_{};
boost::dynamic_bitset<> read_state_;
std::unique_ptr<std::thread> reader_thread_;
std::unique_ptr<std::thread> download_thread_;
stop_type stop_requested_ = false;
private:
void download_chunk(std::size_t chunk, bool skip_active, bool should_reset);
void download_range(std::size_t start_chunk_index,
std::size_t end_chunk_index_inclusive, bool should_reset);
void set_modified();
void update_background_reader(std::size_t read_chunk);
protected:
auto is_download_complete() const -> bool override {
return read_state_.all();
}
public:
auto close() -> bool override;
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto is_complete() const -> bool override;
auto is_write_supported() const -> bool override { return true; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t new_file_size,
native_operation_callback callback)
-> api_error override;
void remove(std::uint64_t handle) override;
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t new_file_size) -> api_error override;
[[nodiscard]] auto write(std::uint64_t write_offset, const data_buffer &data,
std::size_t &bytes_written) -> api_error override;
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_HPP_

View File

@ -0,0 +1,194 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_
#include "file_manager/i_open_file.hpp"
#include "utils/types/file/i_file.hpp"
namespace repertory {
class i_provider;
class open_file_base : public i_closeable_open_file {
public:
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
open_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data,
i_provider &provider);
~open_file_base() override = default;
public:
open_file_base() = delete;
open_file_base(const open_file_base &) noexcept = delete;
open_file_base(open_file_base &&) noexcept = delete;
auto operator=(open_file_base &&) noexcept -> open_file_base & = delete;
auto operator=(const open_file_base &) noexcept -> open_file_base & = delete;
public:
class download final {
public:
download() = default;
~download() = default;
public:
download(const download &) noexcept = delete;
download(download &&) noexcept = delete;
auto operator=(download &&) noexcept -> download & = delete;
auto operator=(const download &) noexcept -> download & = delete;
private:
bool complete_{false};
api_error error_{api_error::success};
std::mutex mtx_;
std::condition_variable notify_;
public:
void notify(const api_error &err);
auto wait() -> api_error;
};
class io_item final {
public:
io_item(std::function<api_error()> action) : action_(std::move(action)) {}
~io_item() = default;
public:
io_item() = delete;
io_item(const io_item &) noexcept = delete;
io_item(io_item &&) noexcept = delete;
auto operator=(io_item &&) noexcept -> io_item & = delete;
auto operator=(const io_item &) noexcept -> io_item & = delete;
private:
std::function<api_error()> action_;
std::mutex mtx_;
std::condition_variable notify_;
std::optional<api_error> result_;
public:
void action();
[[nodiscard]] auto get_result() -> api_error;
};
protected:
std::uint64_t chunk_size_;
std::uint8_t chunk_timeout_;
filesystem_item fsi_;
std::size_t last_chunk_size_;
std::map<std::uint64_t, open_file_data> open_data_;
i_provider &provider_;
private:
api_error error_{api_error::success};
mutable std::mutex error_mtx_;
stop_type io_stop_requested_{false};
std::unique_ptr<std::thread> io_thread_;
protected:
std::unordered_map<std::size_t, std::shared_ptr<download>> active_downloads_;
mutable std::recursive_mutex file_mtx_;
std::atomic<std::chrono::system_clock::time_point> last_access_{
std::chrono::system_clock::now()};
bool modified_{false};
std::unique_ptr<utils::file::i_file> nf_;
mutable std::mutex io_thread_mtx_;
std::condition_variable io_thread_notify_;
std::deque<std::shared_ptr<io_item>> io_thread_queue_;
bool removed_{false};
private:
void file_io_thread();
protected:
[[nodiscard]] auto do_io(std::function<api_error()> action) -> api_error;
virtual auto is_download_complete() const -> bool = 0;
void reset_timeout();
auto set_api_error(const api_error &e) -> api_error;
public:
void add(std::uint64_t handle, open_file_data ofd) override;
[[nodiscard]] auto can_close() const -> bool override;
auto close() -> bool override;
[[nodiscard]] auto get_api_error() const -> api_error;
[[nodiscard]] auto get_api_path() const -> std::string override;
[[nodiscard]] auto get_chunk_size() const -> std::size_t override {
return chunk_size_;
}
[[nodiscard]] auto get_file_size() const -> std::uint64_t override;
[[nodiscard]] auto get_filesystem_item() const -> filesystem_item override;
[[nodiscard]] auto get_handles() const -> std::vector<std::uint64_t> override;
[[nodiscard]] auto get_open_data()
-> std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle)
-> open_file_data & override;
[[nodiscard]] auto get_open_data(std::uint64_t handle) const
-> const open_file_data & override;
[[nodiscard]] auto get_open_file_count() const -> std::size_t override;
[[nodiscard]] auto get_source_path() const -> std::string override {
return fsi_.source_path;
}
[[nodiscard]] auto has_handle(std::uint64_t handle) const -> bool override {
return open_data_.find(handle) != open_data_.end();
}
[[nodiscard]] auto is_directory() const -> bool override {
return fsi_.directory;
}
[[nodiscard]] auto is_modified() const -> bool override;
void remove(std::uint64_t handle) override;
void set_api_path(const std::string &api_path) override;
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_OPEN_FILE_BASE_HPP_

View File

@ -0,0 +1,132 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class ring_buffer_open_file final : public open_file_base {
public:
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider);
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
i_provider &provider, std::size_t ring_size);
~ring_buffer_open_file() override;
public:
ring_buffer_open_file() = delete;
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
auto operator=(ring_buffer_open_file &&) noexcept
-> ring_buffer_open_file & = delete;
auto operator=(const ring_buffer_open_file &) noexcept
-> ring_buffer_open_file & = delete;
private:
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::unique_ptr<std::thread> chunk_forward_thread_;
std::unique_ptr<std::thread> chunk_reverse_thread_;
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::size_t current_chunk_{};
std::size_t first_chunk_{};
std::size_t last_chunk_;
private:
auto download_chunk(std::size_t chunk) -> api_error;
void forward_reader_thread(std::size_t count);
void reverse_reader_thread(std::size_t count);
protected:
auto is_download_complete() const -> bool override;
public:
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return current_chunk_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return first_chunk_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t {
return last_chunk_;
}
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return true; }
auto is_write_supported() const -> bool override { return false; }
[[nodiscard]] auto native_operation(native_operation_callback callback)
-> api_error override;
[[nodiscard]] auto native_operation(std::uint64_t, native_operation_callback)
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto write(std::uint64_t, const data_buffer &, std::size_t &)
-> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_

View File

@ -1,41 +1,75 @@
/* /*
Copyright <2018-2024> <scott.e.graves@protonmail.com> Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "file_manager/file_manager.hpp" #ifndef REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_
namespace repertory {
void file_manager::open_file_base::io_item::action() { #include "types/repertory.hpp"
result_ = action_();
namespace repertory {
mutex_lock lock(mtx_); class i_provider;
notify_.notify_all();
} class upload final {
public:
auto file_manager::open_file_base::io_item::get_result() -> api_error { upload(filesystem_item fsi, i_provider &provider);
unique_mutex_lock lock(mtx_);
if (result_.has_value()) { ~upload();
return result_.value();
} public:
upload() = delete;
notify_.wait(lock); upload(const upload &) noexcept = delete;
return result_.value_or(api_error::error); upload(upload &&) noexcept = delete;
} auto operator=(upload &&) noexcept -> upload & = delete;
} // namespace repertory auto operator=(const upload &) noexcept -> upload & = delete;
private:
filesystem_item fsi_;
i_provider &provider_;
private:
bool cancelled_{false};
api_error error_{api_error::success};
std::unique_ptr<std::thread> thread_;
stop_type stop_requested_{false};
private:
void upload_thread();
public:
void cancel();
[[nodiscard]] auto get_api_error() const -> api_error { return error_; }
[[nodiscard]] auto get_api_path() const -> std::string {
return fsi_.api_path;
}
[[nodiscard]] auto get_source_path() const -> std::string {
return fsi_.source_path;
}
[[nodiscard]] auto is_cancelled() const -> bool { return cancelled_; }
void stop();
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_UPLOAD_HPP_

View File

@ -23,6 +23,10 @@
#include "app_config.hpp" #include "app_config.hpp"
#include "file_manager/events.hpp" #include "file_manager/events.hpp"
#include "file_manager/open_file.hpp"
#include "file_manager/open_file_base.hpp"
#include "file_manager/ring_buffer_open_file.hpp"
#include "file_manager/upload.hpp"
#include "providers/i_provider.hpp" #include "providers/i_provider.hpp"
#include "types/repertory.hpp" #include "types/repertory.hpp"
#include "utils/common.hpp" #include "utils/common.hpp"

View File

@ -1,43 +0,0 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/file_manager.hpp"
namespace repertory {
void file_manager::open_file_base::download::notify(const api_error &err) {
complete_ = true;
error_ = err;
unique_mutex_lock lock(mtx_);
notify_.notify_all();
}
auto file_manager::open_file_base::download::wait() -> api_error {
if (not complete_) {
unique_mutex_lock lock(mtx_);
if (not complete_) {
notify_.wait(lock);
}
notify_.notify_all();
}
return error_;
}
} // namespace repertory

View File

@ -1,261 +1,291 @@
/* /*
Copyright <2018-2024> <scott.e.graves@protonmail.com> Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "file_manager/file_manager.hpp" #include "file_manager/open_file_base.hpp"
#include "providers/i_provider.hpp" #include "file_manager/events.hpp"
#include "utils/path.hpp" #include "providers/i_provider.hpp"
#include "utils/path.hpp"
namespace repertory {
file_manager::open_file_base::open_file_base(std::uint64_t chunk_size, namespace repertory {
std::uint8_t chunk_timeout, void open_file_base::download::notify(const api_error &err) {
filesystem_item fsi, complete_ = true;
i_provider &provider) error_ = err;
: open_file_base(chunk_size, chunk_timeout, fsi, {}, provider) {} unique_mutex_lock lock(mtx_);
notify_.notify_all();
file_manager::open_file_base::open_file_base( }
std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
std::map<std::uint64_t, open_file_data> open_data, i_provider &provider) auto open_file_base::download::wait() -> api_error {
: chunk_size_(chunk_size), if (not complete_) {
chunk_timeout_(chunk_timeout), unique_mutex_lock lock(mtx_);
fsi_(std::move(fsi)), if (not complete_) {
last_chunk_size_(static_cast<std::size_t>( notify_.wait(lock);
fsi.size <= chunk_size ? fsi.size }
: (fsi.size % chunk_size) == 0U ? chunk_size notify_.notify_all();
: fsi.size % chunk_size)), }
open_data_(std::move(open_data)),
provider_(provider) { return error_;
if (not fsi.directory) { }
io_thread_ = std::make_unique<std::thread>([this] { file_io_thread(); });
} void open_file_base::io_item::action() {
} result_ = action_();
void file_manager::open_file_base::add(std::uint64_t handle, mutex_lock lock(mtx_);
open_file_data ofd) { notify_.notify_all();
recur_mutex_lock file_lock(file_mtx_); }
open_data_[handle] = ofd;
if (open_data_.size() == 1U) { auto open_file_base::io_item::get_result() -> api_error {
event_system::instance().raise<filesystem_item_opened>( unique_mutex_lock lock(mtx_);
fsi_.api_path, fsi_.source_path, fsi_.directory); if (result_.has_value()) {
} return result_.value();
}
event_system::instance().raise<filesystem_item_handle_opened>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory); notify_.wait(lock);
} return result_.value_or(api_error::error);
}
auto file_manager::open_file_base::can_close() const -> bool {
recur_mutex_lock file_lock(file_mtx_); open_file_base::open_file_base(std::uint64_t chunk_size,
if (fsi_.directory) { std::uint8_t chunk_timeout, filesystem_item fsi,
return true; i_provider &provider)
} : open_file_base(chunk_size, chunk_timeout, fsi, {}, provider) {}
if (not open_data_.empty()) { open_file_base::open_file_base(
return false; std::uint64_t chunk_size, std::uint8_t chunk_timeout, filesystem_item fsi,
} std::map<std::uint64_t, open_file_data> open_data, i_provider &provider)
: chunk_size_(chunk_size),
if (modified_) { chunk_timeout_(chunk_timeout),
return false; fsi_(std::move(fsi)),
} last_chunk_size_(static_cast<std::size_t>(
fsi.size <= chunk_size ? fsi.size
if (get_api_error() != api_error::success) { : (fsi.size % chunk_size) == 0U ? chunk_size
return true; : fsi.size % chunk_size)),
} open_data_(std::move(open_data)),
provider_(provider) {
if (is_download_complete()) { if (not fsi.directory) {
return true; io_thread_ = std::make_unique<std::thread>([this] { file_io_thread(); });
} }
}
if (provider_.is_direct_only()) {
return true; void open_file_base::add(std::uint64_t handle, open_file_data ofd) {
} recur_mutex_lock file_lock(file_mtx_);
open_data_[handle] = ofd;
const std::chrono::system_clock::time_point last_access = last_access_; if (open_data_.size() == 1U) {
const auto duration = std::chrono::duration_cast<std::chrono::seconds>( event_system::instance().raise<filesystem_item_opened>(
std::chrono::system_clock::now() - last_access); fsi_.api_path, fsi_.source_path, fsi_.directory);
return (duration.count() >= chunk_timeout_); }
}
event_system::instance().raise<filesystem_item_handle_opened>(
auto file_manager::open_file_base::do_io(std::function<api_error()> action) fsi_.api_path, handle, fsi_.source_path, fsi_.directory);
-> api_error { }
unique_mutex_lock io_lock(io_thread_mtx_);
auto item = std::make_shared<io_item>(action); auto open_file_base::can_close() const -> bool {
io_thread_queue_.emplace_back(item); recur_mutex_lock file_lock(file_mtx_);
io_thread_notify_.notify_all(); if (fsi_.directory) {
io_lock.unlock(); return true;
}
return item->get_result();
} if (not open_data_.empty()) {
return false;
void file_manager::open_file_base::file_io_thread() { }
unique_mutex_lock io_lock(io_thread_mtx_);
io_thread_notify_.notify_all(); if (modified_) {
io_lock.unlock(); return false;
}
const auto process_queue = [&]() {
io_lock.lock(); if (get_api_error() != api_error::success) {
if (not io_stop_requested_ && io_thread_queue_.empty()) { return true;
io_thread_notify_.wait(io_lock); }
}
if (is_download_complete()) {
while (not io_thread_queue_.empty()) { return true;
auto *item = io_thread_queue_.front().get(); }
io_thread_notify_.notify_all();
io_lock.unlock(); if (provider_.is_direct_only()) {
return true;
item->action(); }
io_lock.lock(); const std::chrono::system_clock::time_point last_access = last_access_;
io_thread_queue_.pop_front(); const auto duration = std::chrono::duration_cast<std::chrono::seconds>(
} std::chrono::system_clock::now() - last_access);
return (duration.count() >= chunk_timeout_);
io_thread_notify_.notify_all(); }
io_lock.unlock();
}; auto open_file_base::do_io(std::function<api_error()> action) -> api_error {
unique_mutex_lock io_lock(io_thread_mtx_);
while (not io_stop_requested_) { auto item = std::make_shared<io_item>(action);
process_queue(); io_thread_queue_.emplace_back(item);
} io_thread_notify_.notify_all();
io_lock.unlock();
process_queue();
} return item->get_result();
}
auto file_manager::open_file_base::get_api_error() const -> api_error {
mutex_lock error_lock(error_mtx_); void open_file_base::file_io_thread() {
return error_; unique_mutex_lock io_lock(io_thread_mtx_);
} io_thread_notify_.notify_all();
io_lock.unlock();
auto file_manager::open_file_base::get_api_path() const -> std::string {
recur_mutex_lock file_lock(file_mtx_); const auto process_queue = [&]() {
return fsi_.api_path; io_lock.lock();
} if (not io_stop_requested_ && io_thread_queue_.empty()) {
io_thread_notify_.wait(io_lock);
auto file_manager::open_file_base::get_file_size() const -> std::uint64_t { }
recur_mutex_lock file_lock(file_mtx_);
return fsi_.size; while (not io_thread_queue_.empty()) {
} auto *item = io_thread_queue_.front().get();
io_thread_notify_.notify_all();
auto file_manager::open_file_base::get_filesystem_item() const io_lock.unlock();
-> filesystem_item {
recur_mutex_lock file_lock(file_mtx_); item->action();
return fsi_;
} io_lock.lock();
io_thread_queue_.pop_front();
auto file_manager::open_file_base::get_handles() const }
-> std::vector<std::uint64_t> {
recur_mutex_lock file_lock(file_mtx_); io_thread_notify_.notify_all();
std::vector<std::uint64_t> ret; io_lock.unlock();
for (auto &&item : open_data_) { };
ret.emplace_back(item.first);
} while (not io_stop_requested_) {
process_queue();
return ret; }
}
process_queue();
auto file_manager::open_file_base::get_open_data() }
-> std::map<std::uint64_t, open_file_data> & {
recur_mutex_lock file_lock(file_mtx_); auto open_file_base::get_api_error() const -> api_error {
return open_data_; mutex_lock error_lock(error_mtx_);
} return error_;
}
auto file_manager::open_file_base::get_open_data() const
-> const std::map<std::uint64_t, open_file_data> & { auto open_file_base::get_api_path() const -> std::string {
recur_mutex_lock file_lock(file_mtx_); recur_mutex_lock file_lock(file_mtx_);
return open_data_; return fsi_.api_path;
} }
auto file_manager::open_file_base::get_open_data(std::uint64_t handle) auto open_file_base::get_file_size() const -> std::uint64_t {
-> open_file_data & { recur_mutex_lock file_lock(file_mtx_);
recur_mutex_lock file_lock(file_mtx_); return fsi_.size;
return open_data_.at(handle); }
}
auto open_file_base::get_filesystem_item() const -> filesystem_item {
auto file_manager::open_file_base::get_open_data(std::uint64_t handle) const recur_mutex_lock file_lock(file_mtx_);
-> const open_file_data & { return fsi_;
recur_mutex_lock file_lock(file_mtx_); }
return open_data_.at(handle);
} auto open_file_base::get_handles() const -> std::vector<std::uint64_t> {
recur_mutex_lock file_lock(file_mtx_);
auto file_manager::open_file_base::get_open_file_count() const -> std::size_t { std::vector<std::uint64_t> ret;
recur_mutex_lock file_lock(file_mtx_); for (auto &&item : open_data_) {
return open_data_.size(); ret.emplace_back(item.first);
} }
auto file_manager::open_file_base::is_modified() const -> bool { return ret;
recur_mutex_lock file_lock(file_mtx_); }
return modified_;
} auto open_file_base::get_open_data()
-> std::map<std::uint64_t, open_file_data> & {
void file_manager::open_file_base::remove(std::uint64_t handle) { recur_mutex_lock file_lock(file_mtx_);
recur_mutex_lock file_lock(file_mtx_); return open_data_;
open_data_.erase(handle); }
event_system::instance().raise<filesystem_item_handle_closed>(
fsi_.api_path, handle, fsi_.source_path, fsi_.directory, modified_); auto open_file_base::get_open_data() const
if (open_data_.empty()) { -> const std::map<std::uint64_t, open_file_data> & {
event_system::instance().raise<filesystem_item_closed>( recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path, fsi_.source_path, fsi_.directory, modified_); return open_data_;
} }
}
auto open_file_base::get_open_data(std::uint64_t handle) -> open_file_data & {
void file_manager::open_file_base::reset_timeout() { recur_mutex_lock file_lock(file_mtx_);
last_access_ = std::chrono::system_clock::now(); return open_data_.at(handle);
} }
auto file_manager::open_file_base::set_api_error(const api_error &err) auto open_file_base::get_open_data(std::uint64_t handle) const
-> api_error { -> const open_file_data & {
mutex_lock error_lock(error_mtx_); recur_mutex_lock file_lock(file_mtx_);
if (error_ != err) { return open_data_.at(handle);
return ((error_ = (error_ == api_error::success || }
error_ == api_error::download_incomplete ||
error_ == api_error::download_stopped auto open_file_base::get_open_file_count() const -> std::size_t {
? err recur_mutex_lock file_lock(file_mtx_);
: error_))); return open_data_.size();
} }
return error_; auto open_file_base::is_modified() const -> bool {
} recur_mutex_lock file_lock(file_mtx_);
return modified_;
void file_manager::open_file_base::set_api_path(const std::string &api_path) { }
recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path = api_path; void open_file_base::remove(std::uint64_t handle) {
fsi_.api_parent = utils::path::get_parent_api_path(api_path); recur_mutex_lock file_lock(file_mtx_);
} open_data_.erase(handle);
event_system::instance().raise<filesystem_item_handle_closed>(
auto file_manager::open_file_base::close() -> bool { fsi_.api_path, handle, fsi_.source_path, fsi_.directory, modified_);
unique_mutex_lock io_lock(io_thread_mtx_); if (open_data_.empty()) {
if (not fsi_.directory && not io_stop_requested_) { event_system::instance().raise<filesystem_item_closed>(
io_stop_requested_ = true; fsi_.api_path, fsi_.source_path, fsi_.directory, modified_);
io_thread_notify_.notify_all(); }
io_lock.unlock(); }
if (io_thread_) { void open_file_base::reset_timeout() {
io_thread_->join(); last_access_ = std::chrono::system_clock::now();
io_thread_.reset(); }
return true;
} auto open_file_base::set_api_error(const api_error &err) -> api_error {
mutex_lock error_lock(error_mtx_);
return false; if (error_ != err) {
} return ((error_ = (error_ == api_error::success ||
error_ == api_error::download_incomplete ||
io_thread_notify_.notify_all(); error_ == api_error::download_stopped
io_lock.unlock(); ? err
return false; : error_)));
} }
} // namespace repertory
return error_;
}
void open_file_base::set_api_path(const std::string &api_path) {
recur_mutex_lock file_lock(file_mtx_);
fsi_.api_path = api_path;
fsi_.api_parent = utils::path::get_parent_api_path(api_path);
}
auto open_file_base::close() -> bool {
unique_mutex_lock io_lock(io_thread_mtx_);
if (not fsi_.directory && not io_stop_requested_) {
io_stop_requested_ = true;
io_thread_notify_.notify_all();
io_lock.unlock();
if (io_thread_) {
io_thread_->join();
io_thread_.reset();
return true;
}
return false;
}
io_thread_notify_.notify_all();
io_lock.unlock();
return false;
}
} // namespace repertory

View File

@ -1,322 +1,323 @@
/* /*
Copyright <2018-2024> <scott.e.graves@protonmail.com> Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "file_manager/file_manager.hpp" #include "file_manager/ring_buffer_open_file.hpp"
#include "app_config.hpp" #include "app_config.hpp"
#include "file_manager/events.hpp" #include "file_manager/events.hpp"
#include "platform/platform.hpp" #include "file_manager/open_file_base.hpp"
#include "providers/i_provider.hpp" #include "platform/platform.hpp"
#include "types/repertory.hpp" #include "providers/i_provider.hpp"
#include "utils/common.hpp" #include "types/repertory.hpp"
#include "utils/encrypting_reader.hpp" #include "utils/common.hpp"
#include "utils/file_utils.hpp" #include "utils/encrypting_reader.hpp"
#include "utils/path.hpp" #include "utils/file_utils.hpp"
#include "utils/utils.hpp" #include "utils/path.hpp"
#include "utils/utils.hpp"
namespace repertory {
file_manager::ring_buffer_open_file::ring_buffer_open_file( namespace repertory {
std::string buffer_directory, std::uint64_t chunk_size, ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider) std::uint64_t chunk_size,
: ring_buffer_open_file(std::move(buffer_directory), chunk_size, std::uint8_t chunk_timeout,
chunk_timeout, std::move(fsi), provider, filesystem_item fsi,
(1024ULL * 1024ULL * 1024ULL) / chunk_size) {} i_provider &provider)
: ring_buffer_open_file(std::move(buffer_directory), chunk_size,
file_manager::ring_buffer_open_file::ring_buffer_open_file( chunk_timeout, std::move(fsi), provider,
std::string buffer_directory, std::uint64_t chunk_size, (1024ULL * 1024ULL * 1024ULL) / chunk_size) {}
std::uint8_t chunk_timeout, filesystem_item fsi, i_provider &provider,
std::size_t ring_size) ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
: open_file_base(chunk_size, chunk_timeout, fsi, provider), std::uint64_t chunk_size,
ring_state_(ring_size), std::uint8_t chunk_timeout,
total_chunks_(static_cast<std::size_t>( filesystem_item fsi,
utils::divide_with_ceiling(fsi.size, chunk_size_))) { i_provider &provider,
if ((ring_size % 2U) != 0U) { std::size_t ring_size)
throw std::runtime_error("ring size must be a multiple of 2"); : open_file_base(chunk_size, chunk_timeout, fsi, provider),
} ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
if (ring_size < 4U) { utils::divide_with_ceiling(fsi.size, chunk_size_))) {
throw std::runtime_error("ring size must be greater than or equal to 4"); if ((ring_size % 2U) != 0U) {
} throw std::runtime_error("ring size must be a multiple of 2");
}
if (fsi.size < (ring_state_.size() * chunk_size)) {
throw std::runtime_error("file size is less than ring buffer size"); if (ring_size < 4U) {
} throw std::runtime_error("ring size must be greater than or equal to 4");
}
last_chunk_ = ring_state_.size() - 1U;
ring_state_.set(0U, ring_state_.size(), true); if (fsi.size < (ring_state_.size() * chunk_size)) {
throw std::runtime_error("file size is less than ring buffer size");
buffer_directory = utils::path::absolute(buffer_directory); }
if (not utils::file::directory(buffer_directory).create_directory()) {
throw std::runtime_error("failed to create buffer directory|path|" + last_chunk_ = ring_state_.size() - 1U;
buffer_directory + "|err|" + ring_state_.set(0U, ring_state_.size(), true);
std::to_string(utils::get_last_error_code()));
} buffer_directory = utils::path::absolute(buffer_directory);
if (not utils::file::directory(buffer_directory).create_directory()) {
fsi_.source_path = throw std::runtime_error("failed to create buffer directory|path|" +
utils::path::combine(buffer_directory, {utils::create_uuid_string()}); buffer_directory + "|err|" +
nf_ = utils::file::file::open_or_create_file(fsi_.source_path); std::to_string(utils::get_last_error_code()));
if (not*nf_) { }
throw std::runtime_error("failed to create buffer file|err|" +
std::to_string(utils::get_last_error_code())); fsi_.source_path =
} utils::path::combine(buffer_directory, {utils::create_uuid_string()});
nf_ = utils::file::file::open_or_create_file(fsi_.source_path);
if (not nf_->truncate(ring_state_.size() * chunk_size)) { if (not*nf_) {
nf_->close(); throw std::runtime_error("failed to create buffer file|err|" +
throw std::runtime_error("failed to resize buffer file|err|" + std::to_string(utils::get_last_error_code()));
std::to_string(utils::get_last_error_code())); }
}
} if (not nf_->truncate(ring_state_.size() * chunk_size)) {
nf_->close();
file_manager::ring_buffer_open_file::~ring_buffer_open_file() { throw std::runtime_error("failed to resize buffer file|err|" +
REPERTORY_USES_FUNCTION_NAME(); std::to_string(utils::get_last_error_code()));
}
close(); }
nf_->close(); ring_buffer_open_file::~ring_buffer_open_file() {
if (not utils::file::file(fsi_.source_path).remove()) { REPERTORY_USES_FUNCTION_NAME();
utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path, close();
utils::get_last_error_code(), "failed to delete file");
} nf_->close();
} if (not utils::file::file(fsi_.source_path).remove()) {
utils::error::raise_api_path_error(
auto file_manager::file_manager::ring_buffer_open_file::download_chunk( function_name, fsi_.api_path, fsi_.source_path,
std::size_t chunk) -> api_error { utils::get_last_error_code(), "failed to delete file");
unique_mutex_lock chunk_lock(chunk_mtx_); }
if (active_downloads_.find(chunk) != active_downloads_.end()) { }
auto active_download = active_downloads_.at(chunk);
chunk_notify_.notify_all(); auto r::ring_buffer_open_file::download_chunk(std::size_t chunk) -> api_error {
chunk_lock.unlock(); unique_mutex_lock chunk_lock(chunk_mtx_);
if (active_downloads_.find(chunk) != active_downloads_.end()) {
return active_download->wait(); auto active_download = active_downloads_.at(chunk);
} chunk_notify_.notify_all();
chunk_lock.unlock();
if (ring_state_[chunk % ring_state_.size()]) {
auto active_download = std::make_shared<download>(); return active_download->wait();
active_downloads_[chunk] = active_download; }
ring_state_[chunk % ring_state_.size()] = false;
chunk_notify_.notify_all(); if (ring_state_[chunk % ring_state_.size()]) {
chunk_lock.unlock(); auto active_download = std::make_shared<download>();
active_downloads_[chunk] = active_download;
data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_ ring_state_[chunk % ring_state_.size()] = false;
: chunk_size_); chunk_notify_.notify_all();
chunk_lock.unlock();
stop_type stop_requested = !!ring_state_[chunk % ring_state_.size()];
auto res = data_buffer buffer((chunk == (total_chunks_ - 1U)) ? last_chunk_size_
provider_.read_file_bytes(fsi_.api_path, buffer.size(), : chunk_size_);
chunk * chunk_size_, buffer, stop_requested);
if (res == api_error::success) { stop_type stop_requested = !!ring_state_[chunk % ring_state_.size()];
res = do_io([&]() -> api_error { auto res =
std::size_t bytes_written{}; provider_.read_file_bytes(fsi_.api_path, buffer.size(),
if (not nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_, chunk * chunk_size_, buffer, stop_requested);
&bytes_written)) { if (res == api_error::success) {
return api_error::os_error; res = do_io([&]() -> api_error {
} std::size_t bytes_written{};
if (not nf_->write(buffer, (chunk % ring_state_.size()) * chunk_size_,
return api_error::success; &bytes_written)) {
}); return api_error::os_error;
} }
active_download->notify(res); return api_error::success;
});
chunk_lock.lock(); }
active_downloads_.erase(chunk);
chunk_notify_.notify_all(); active_download->notify(res);
return res;
} chunk_lock.lock();
active_downloads_.erase(chunk);
chunk_notify_.notify_all(); chunk_notify_.notify_all();
chunk_lock.unlock(); return res;
}
return api_error::success;
} chunk_notify_.notify_all();
chunk_lock.unlock();
void file_manager::ring_buffer_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_); return api_error::success;
if ((current_chunk_ + count) > (total_chunks_ - 1U)) { }
count = (total_chunks_ - 1U) - current_chunk_;
} void ring_buffer_open_file::forward(std::size_t count) {
mutex_lock chunk_lock(chunk_mtx_);
if ((current_chunk_ + count) <= last_chunk_) { if ((current_chunk_ + count) > (total_chunks_ - 1U)) {
current_chunk_ += count; count = (total_chunks_ - 1U) - current_chunk_;
} else { }
const auto added = count - (last_chunk_ - current_chunk_);
if (added >= ring_state_.size()) { if ((current_chunk_ + count) <= last_chunk_) {
ring_state_.set(0U, ring_state_.size(), true); current_chunk_ += count;
current_chunk_ += count; } else {
first_chunk_ += added; const auto added = count - (last_chunk_ - current_chunk_);
last_chunk_ = if (added >= ring_state_.size()) {
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); ring_state_.set(0U, ring_state_.size(), true);
} else { current_chunk_ += count;
for (std::size_t idx = 0U; idx < added; ++idx) { first_chunk_ += added;
ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true; last_chunk_ =
} std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
first_chunk_ += added; } else {
current_chunk_ += count; for (std::size_t idx = 0U; idx < added; ++idx) {
last_chunk_ = ring_state_[(first_chunk_ + idx) % ring_state_.size()] = true;
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); }
} first_chunk_ += added;
} current_chunk_ += count;
last_chunk_ =
chunk_notify_.notify_all(); std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
} }
}
auto file_manager::ring_buffer_open_file::get_read_state() const
-> boost::dynamic_bitset<> { chunk_notify_.notify_all();
recur_mutex_lock file_lock(file_mtx_); }
auto read_state = ring_state_;
return read_state.flip(); auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
} recur_mutex_lock file_lock(file_mtx_);
auto read_state = ring_state_;
auto file_manager::ring_buffer_open_file::get_read_state( return read_state.flip();
std::size_t chunk) const -> bool { }
recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()]; auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
} recur_mutex_lock file_lock(file_mtx_);
return not ring_state_[chunk % ring_state_.size()];
auto file_manager::ring_buffer_open_file::is_download_complete() const -> bool { }
return false;
} auto ring_buffer_open_file::is_download_complete() const -> bool {
return false;
auto file_manager::ring_buffer_open_file::native_operation( }
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); }); auto ring_buffer_open_file::native_operation(
} i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
void file_manager::ring_buffer_open_file::reverse(std::size_t count) { }
mutex_lock chunk_lock(chunk_mtx_);
if (current_chunk_ < count) { void ring_buffer_open_file::reverse(std::size_t count) {
count = current_chunk_; mutex_lock chunk_lock(chunk_mtx_);
} if (current_chunk_ < count) {
count = current_chunk_;
if ((current_chunk_ - count) >= first_chunk_) { }
current_chunk_ -= count;
} else { if ((current_chunk_ - count) >= first_chunk_) {
const auto removed = count - (current_chunk_ - first_chunk_); current_chunk_ -= count;
if (removed >= ring_state_.size()) { } else {
ring_state_.set(0U, ring_state_.size(), true); const auto removed = count - (current_chunk_ - first_chunk_);
current_chunk_ -= count; if (removed >= ring_state_.size()) {
first_chunk_ = current_chunk_; ring_state_.set(0U, ring_state_.size(), true);
last_chunk_ = current_chunk_ -= count;
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); first_chunk_ = current_chunk_;
} else { last_chunk_ =
for (std::size_t idx = 0U; idx < removed; ++idx) { std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true; } else {
} for (std::size_t idx = 0U; idx < removed; ++idx) {
first_chunk_ -= removed; ring_state_[(last_chunk_ - idx) % ring_state_.size()] = true;
current_chunk_ -= count; }
last_chunk_ = first_chunk_ -= removed;
std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U); current_chunk_ -= count;
} last_chunk_ =
} std::min(total_chunks_ - 1U, first_chunk_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all(); }
}
chunk_notify_.notify_all();
auto file_manager::ring_buffer_open_file::read(std::size_t read_size, }
std::uint64_t read_offset,
data_buffer &data) -> api_error { auto ring_buffer_open_file::read(std::size_t read_size,
if (fsi_.directory) { std::uint64_t read_offset, data_buffer &data)
return api_error::invalid_operation; -> api_error {
} if (fsi_.directory) {
return api_error::invalid_operation;
reset_timeout(); }
read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset); reset_timeout();
if (read_size == 0U) {
return api_error::success; read_size = utils::calculate_read_size(fsi_.size, read_size, read_offset);
} if (read_size == 0U) {
return api_error::success;
const auto start_chunk_index = }
static_cast<std::size_t>(read_offset / chunk_size_);
read_offset = read_offset - (start_chunk_index * chunk_size_); const auto start_chunk_index =
data_buffer buffer(chunk_size_); static_cast<std::size_t>(read_offset / chunk_size_);
read_offset = read_offset - (start_chunk_index * chunk_size_);
auto res = api_error::success; data_buffer buffer(chunk_size_);
for (std::size_t chunk = start_chunk_index;
(res == api_error::success) && (read_size > 0U); ++chunk) { auto res = api_error::success;
if (chunk > current_chunk_) { for (std::size_t chunk = start_chunk_index;
forward(chunk - current_chunk_); (res == api_error::success) && (read_size > 0U); ++chunk) {
} else if (chunk < current_chunk_) { if (chunk > current_chunk_) {
reverse(current_chunk_ - chunk); forward(chunk - current_chunk_);
} } else if (chunk < current_chunk_) {
reverse(current_chunk_ - chunk);
reset_timeout(); }
res = download_chunk(chunk);
if (res == api_error::success) { reset_timeout();
const auto to_read = std::min( res = download_chunk(chunk);
static_cast<std::size_t>(chunk_size_ - read_offset), read_size); if (res == api_error::success) {
res = do_io([this, &buffer, &chunk, &data, read_offset, const auto to_read = std::min(
&to_read]() -> api_error { static_cast<std::size_t>(chunk_size_ - read_offset), read_size);
std::size_t bytes_read{}; res = do_io([this, &buffer, &chunk, &data, read_offset,
auto ret = &to_read]() -> api_error {
nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_), std::size_t bytes_read{};
&bytes_read) auto ret =
? api_error::success nf_->read(buffer, ((chunk % ring_state_.size()) * chunk_size_),
: api_error::os_error; &bytes_read)
if (ret == api_error::success) { ? api_error::success
data.insert(data.end(), : api_error::os_error;
buffer.begin() + static_cast<std::int64_t>(read_offset), if (ret == api_error::success) {
buffer.begin() + data.insert(data.end(),
static_cast<std::int64_t>(read_offset + to_read)); buffer.begin() + static_cast<std::int64_t>(read_offset),
reset_timeout(); buffer.begin() +
} static_cast<std::int64_t>(read_offset + to_read));
reset_timeout();
return ret; }
});
read_offset = 0U; return ret;
read_size -= to_read; });
} read_offset = 0U;
} read_size -= to_read;
}
return res; }
}
return res;
void file_manager::ring_buffer_open_file::set(std::size_t first_chunk, }
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_); void ring_buffer_open_file::set(std::size_t first_chunk,
if (first_chunk >= total_chunks_) { std::size_t current_chunk) {
chunk_notify_.notify_all(); mutex_lock chunk_lock(chunk_mtx_);
throw std::runtime_error("first chunk must be less than total chunks"); if (first_chunk >= total_chunks_) {
} chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
first_chunk_ = first_chunk; }
last_chunk_ = first_chunk_ + ring_state_.size() - 1U;
first_chunk_ = first_chunk;
if (current_chunk > last_chunk_) { last_chunk_ = first_chunk_ + ring_state_.size() - 1U;
chunk_notify_.notify_all();
throw std::runtime_error( if (current_chunk > last_chunk_) {
"current chunk must be less than or equal to last chunk"); chunk_notify_.notify_all();
} throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
current_chunk_ = current_chunk; }
ring_state_.set(0U, ring_state_.size(), false);
current_chunk_ = current_chunk;
chunk_notify_.notify_all(); ring_state_.set(0U, ring_state_.size(), false);
}
chunk_notify_.notify_all();
void file_manager::ring_buffer_open_file::set_api_path( }
const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_); void ring_buffer_open_file::set_api_path(const std::string &api_path) {
open_file_base::set_api_path(api_path); mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all(); open_file_base::set_api_path(api_path);
} chunk_notify_.notify_all();
} // namespace repertory }
} // namespace repertory

View File

@ -1,65 +1,65 @@
/* /*
Copyright <2018-2024> <scott.e.graves@protonmail.com> Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "file_manager/file_manager.hpp" #include "file_manager/upload.hpp"
#include "platform/platform.hpp" #include "platform/platform.hpp"
#include "providers/i_provider.hpp" #include "providers/i_provider.hpp"
#include "utils/error_utils.hpp" #include "utils/error_utils.hpp"
#include "utils/file_utils.hpp" #include "utils/file_utils.hpp"
namespace repertory { namespace repertory {
using std::bind; using std::bind;
file_manager::upload::upload(filesystem_item fsi, i_provider &provider) file_manager::upload::upload(filesystem_item fsi, i_provider &provider)
: fsi_(std::move(fsi)), provider_(provider) { : fsi_(std::move(fsi)), provider_(provider) {
thread_ = std::make_unique<std::thread>([this] { upload_thread(); }); thread_ = std::make_unique<std::thread>([this] { upload_thread(); });
} }
file_manager::upload::~upload() { file_manager::upload::~upload() {
stop(); stop();
thread_->join(); thread_->join();
thread_.reset(); thread_.reset();
} }
void file_manager::upload::cancel() { void file_manager::upload::cancel() {
cancelled_ = true; cancelled_ = true;
stop(); stop();
} }
void file_manager::upload::stop() { stop_requested_ = true; } void file_manager::upload::stop() { stop_requested_ = true; }
void file_manager::upload::upload_thread() { void file_manager::upload::upload_thread() {
REPERTORY_USES_FUNCTION_NAME(); REPERTORY_USES_FUNCTION_NAME();
error_ = error_ =
provider_.upload_file(fsi_.api_path, fsi_.source_path, stop_requested_); provider_.upload_file(fsi_.api_path, fsi_.source_path, stop_requested_);
if (not utils::file::reset_modified_time(fsi_.source_path)) { if (not utils::file::reset_modified_time(fsi_.source_path)) {
utils::error::raise_api_path_error( utils::error::raise_api_path_error(
function_name, fsi_.api_path, fsi_.source_path, function_name, fsi_.api_path, fsi_.source_path,
utils::get_last_error_code(), "failed to reset modified time"); utils::get_last_error_code(), "failed to reset modified time");
} }
event_system::instance().raise<file_upload_completed>( event_system::instance().raise<file_upload_completed>(
get_api_path(), get_source_path(), get_api_error(), cancelled_); get_api_path(), get_source_path(), get_api_error(), cancelled_);
} }
} // namespace repertory } // namespace repertory

View File

@ -1,182 +1,182 @@
/* /*
Copyright <2018-2024> <scott.e.graves@protonmail.com> Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
*/ */
#include "test_common.hpp" #include "test_common.hpp"
#include "file_manager/file_manager.hpp" #include "file_manager/file_manager.hpp"
#include "mocks/mock_provider.hpp" #include "mocks/mock_provider.hpp"
#include "utils/event_capture.hpp" #include "utils/event_capture.hpp"
namespace repertory { namespace repertory {
static constexpr const std::size_t test_chunk_size{1024U}; static constexpr const std::size_t test_chunk_size{1024U};
TEST(upload, can_upload_a_valid_file) { TEST(upload, can_upload_a_valid_file) {
console_consumer con; console_consumer con;
event_system::instance().start(); event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test"); const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_prov; mock_provider mock_prov;
EXPECT_CALL(mock_prov, is_direct_only()).WillRepeatedly(Return(false)); EXPECT_CALL(mock_prov, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi; filesystem_item fsi;
fsi.api_path = "/test.txt"; fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U; fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path; fsi.source_path = source_path;
event_consumer evt_com("file_upload_completed", [&fsi](const event &evt) { event_consumer evt_com("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt); const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(), EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str()); comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(), EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str()); comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("success", comp_evt.get_result().get<std::string>().c_str()); EXPECT_STREQ("success", comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", comp_evt.get_cancelled().get<std::string>().c_str()); EXPECT_STREQ("0", comp_evt.get_cancelled().get<std::string>().c_str());
}); });
EXPECT_CALL(mock_prov, upload_file(fsi.api_path, fsi.source_path, _)) EXPECT_CALL(mock_prov, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &, .WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
return api_error::success; return api_error::success;
}); });
file_manager::upload upload(fsi, mock_prov); file_manager::upload upload(fsi, mock_prov);
event_capture evt_cap({"file_upload_completed"}); event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty(); evt_cap.wait_for_empty();
EXPECT_EQ(api_error::success, upload.get_api_error()); EXPECT_EQ(api_error::success, upload.get_api_error());
EXPECT_FALSE(upload.is_cancelled()); EXPECT_FALSE(upload.is_cancelled());
event_system::instance().stop(); event_system::instance().stop();
} }
TEST(upload, can_cancel_upload) { TEST(upload, can_cancel_upload) {
console_consumer con; console_consumer con;
event_system::instance().start(); event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test"); const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider; mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false)); EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi; filesystem_item fsi;
fsi.api_path = "/test.txt"; fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U; fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path; fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) { event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt); const auto &comp_evt = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(), EXPECT_STREQ(fsi.api_path.c_str(),
comp_evt.get_api_path().get<std::string>().c_str()); comp_evt.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(), EXPECT_STREQ(fsi.source_path.c_str(),
comp_evt.get_source().get<std::string>().c_str()); comp_evt.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error", EXPECT_STREQ("comm_error",
comp_evt.get_result().get<std::string>().c_str()); comp_evt.get_result().get<std::string>().c_str());
EXPECT_STREQ("1", comp_evt.get_cancelled().get<std::string>().c_str()); EXPECT_STREQ("1", comp_evt.get_cancelled().get<std::string>().c_str());
}); });
std::mutex mtx; std::mutex mtx;
std::condition_variable notify; std::condition_variable notify;
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _)) EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([&notify, &mtx](const std::string &, const std::string &, .WillOnce([&notify, &mtx](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
EXPECT_FALSE(stop_requested); EXPECT_FALSE(stop_requested);
unique_mutex_lock lock(mtx); unique_mutex_lock lock(mtx);
notify.notify_one(); notify.notify_one();
lock.unlock(); lock.unlock();
lock.lock(); lock.lock();
notify.wait(lock); notify.wait(lock);
lock.unlock(); lock.unlock();
EXPECT_TRUE(stop_requested); EXPECT_TRUE(stop_requested);
return api_error::comm_error; return api_error::comm_error;
}); });
unique_mutex_lock lock(mtx); unique_mutex_lock lock(mtx);
file_manager::upload upload(fsi, mock_provider); file_manager::upload upload(fsi, mock_provider);
notify.wait(lock); notify.wait(lock);
upload.cancel(); upload.cancel();
notify.notify_one(); notify.notify_one();
lock.unlock(); lock.unlock();
event_capture evt_cap({"file_upload_completed"}); event_capture evt_cap({"file_upload_completed"});
evt_cap.wait_for_empty(); evt_cap.wait_for_empty();
EXPECT_EQ(api_error::comm_error, upload.get_api_error()); EXPECT_EQ(api_error::comm_error, upload.get_api_error());
EXPECT_TRUE(upload.is_cancelled()); EXPECT_TRUE(upload.is_cancelled());
event_system::instance().stop(); event_system::instance().stop();
} }
TEST(upload, can_stop_upload) { TEST(upload, can_stop_upload) {
console_consumer con; console_consumer con;
event_system::instance().start(); event_system::instance().start();
const auto source_path = test::generate_test_file_name("upload_test"); const auto source_path = test::generate_test_file_name("upload_test");
mock_provider mock_provider; mock_provider mock_provider;
EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false)); EXPECT_CALL(mock_provider, is_direct_only()).WillRepeatedly(Return(false));
filesystem_item fsi; filesystem_item fsi;
fsi.api_path = "/test.txt"; fsi.api_path = "/test.txt";
fsi.size = test_chunk_size * 4U; fsi.size = test_chunk_size * 4U;
fsi.source_path = source_path; fsi.source_path = source_path;
event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) { event_consumer evt_con("file_upload_completed", [&fsi](const event &evt) {
const auto &evt_com = dynamic_cast<const file_upload_completed &>(evt); const auto &evt_com = dynamic_cast<const file_upload_completed &>(evt);
EXPECT_STREQ(fsi.api_path.c_str(), EXPECT_STREQ(fsi.api_path.c_str(),
evt_com.get_api_path().get<std::string>().c_str()); evt_com.get_api_path().get<std::string>().c_str());
EXPECT_STREQ(fsi.source_path.c_str(), EXPECT_STREQ(fsi.source_path.c_str(),
evt_com.get_source().get<std::string>().c_str()); evt_com.get_source().get<std::string>().c_str());
EXPECT_STREQ("comm_error", evt_com.get_result().get<std::string>().c_str()); EXPECT_STREQ("comm_error", evt_com.get_result().get<std::string>().c_str());
EXPECT_STREQ("0", evt_com.get_cancelled().get<std::string>().c_str()); EXPECT_STREQ("0", evt_com.get_cancelled().get<std::string>().c_str());
}); });
EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _)) EXPECT_CALL(mock_provider, upload_file(fsi.api_path, fsi.source_path, _))
.WillOnce([](const std::string &, const std::string &, .WillOnce([](const std::string &, const std::string &,
stop_type &stop_requested) -> api_error { stop_type &stop_requested) -> api_error {
std::this_thread::sleep_for(3s); std::this_thread::sleep_for(3s);
EXPECT_TRUE(stop_requested); EXPECT_TRUE(stop_requested);
return api_error::comm_error; return api_error::comm_error;
}); });
event_capture evt_cap({"file_upload_completed"}); event_capture evt_cap({"file_upload_completed"});
{ file_manager::upload upload(fsi, mock_provider); } { file_manager::upload upload(fsi, mock_provider); }
evt_cap.wait_for_empty(); evt_cap.wait_for_empty();
event_system::instance().stop(); event_system::instance().stop();
} }
} // namespace repertory } // namespace repertory

View File

@ -157,8 +157,8 @@ template <typename ctx_t, typename op_t> struct db_where_t final {
using action_t = std::variant<db_comp_data_t, n_t, db_where_t>; using action_t = std::variant<db_comp_data_t, n_t, db_where_t>;
[[nodiscard]] static auto dump(std::int32_t &idx, [[nodiscard]] static auto dump(std::int32_t &idx, auto &&actions)
auto &&actions) -> std::string { -> std::string {
std::stringstream stream; std::stringstream stream;
for (auto &&action : actions) { for (auto &&action : actions) {

View File

@ -27,13 +27,12 @@ std::atomic<const i_exception_handler *> exception_handler{
auto create_error_message(std::vector<std::string_view> items) -> std::string { auto create_error_message(std::vector<std::string_view> items) -> std::string {
std::stringstream stream{}; std::stringstream stream{};
stream << function_name;
for (std::size_t idx = 0U; idx < items.size(); ++idx) { for (std::size_t idx = 0U; idx < items.size(); ++idx) {
if (idx > 0) { if (idx > 0) {
stream << '|'; stream << '|';
} }
stream << item; stream << items.at(idx);
} }
return stream.str(); return stream.str();

View File

@ -397,6 +397,8 @@ auto file::sha256() -> std::optional<std::string> {
#endif // defined(PROJECT_ENABLE_LIBSODIUM) #endif // defined(PROJECT_ENABLE_LIBSODIUM)
auto file::remove() -> bool { auto file::remove() -> bool {
REPERTORY_USES_FUNCTION_NAME();
close(); close();
return utils::retry_action([this]() -> bool { return utils::retry_action([this]() -> bool {