This commit is contained in:
@ -22,7 +22,7 @@
|
||||
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
|
||||
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
|
||||
|
||||
#include "file_manager/ring_file_base.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
|
||||
#include "types/repertory.hpp"
|
||||
|
||||
@ -30,7 +30,7 @@ namespace repertory {
|
||||
class i_provider;
|
||||
class i_upload_manager;
|
||||
|
||||
class direct_open_file final : public ring_file_base {
|
||||
class direct_open_file final : public open_file_base {
|
||||
public:
|
||||
direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
filesystem_item fsi, i_provider &provider);
|
||||
@ -45,24 +45,48 @@ public:
|
||||
auto
|
||||
operator=(const direct_open_file &) noexcept -> direct_open_file & = delete;
|
||||
|
||||
public:
|
||||
static constexpr const auto ring_size{5U};
|
||||
|
||||
private:
|
||||
std::array<data_buffer, min_ring_size> ring_data_;
|
||||
std::size_t total_chunks_;
|
||||
|
||||
protected:
|
||||
[[nodiscard]] auto handle_read_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
|
||||
private:
|
||||
mutable std::mutex chunk_mtx_;
|
||||
std::condition_variable chunk_notify_;
|
||||
std::mutex read_mtx_;
|
||||
std::unique_ptr<std::thread> reader_thread_;
|
||||
std::size_t ring_begin_{};
|
||||
std::array<data_buffer, ring_size> ring_data_;
|
||||
std::size_t ring_end_{};
|
||||
std::size_t ring_pos_{};
|
||||
boost::dynamic_bitset<> ring_state_{ring_size};
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
[[nodiscard]] auto on_check_start() -> bool override;
|
||||
private:
|
||||
[[nodiscard]] auto check_start() -> api_error;
|
||||
|
||||
[[nodiscard]] auto
|
||||
use_buffer(std::size_t chunk,
|
||||
std::function<api_error(const data_buffer &buffer)> func)
|
||||
-> api_error override;
|
||||
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
|
||||
|
||||
void reader_thread();
|
||||
|
||||
public:
|
||||
[[nodiscard]] auto get_source_path() const -> std::string override {
|
||||
return "direct";
|
||||
auto close() -> bool override;
|
||||
|
||||
void forward(std::size_t count);
|
||||
|
||||
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
|
||||
|
||||
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
|
||||
|
||||
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
|
||||
return total_chunks_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto is_complete() const -> bool override { return false; }
|
||||
|
||||
[[nodiscard]] auto is_write_supported() const -> bool override {
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto native_operation(native_operation_callback /* callback */)
|
||||
@ -75,6 +99,23 @@ public:
|
||||
-> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
|
||||
void reverse(std::size_t count);
|
||||
|
||||
void set_api_path(const std::string &api_path) override;
|
||||
|
||||
[[nodiscard]] auto
|
||||
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
};
|
||||
} // namespace repertory
|
||||
|
||||
|
@ -22,16 +22,16 @@
|
||||
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
|
||||
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
|
||||
|
||||
#include "file_manager/ring_file_base.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/types/file/i_file.hpp"
|
||||
#include "utils/file.hpp"
|
||||
|
||||
namespace repertory {
|
||||
class i_provider;
|
||||
class i_upload_manager;
|
||||
|
||||
class ring_buffer_open_file final : public ring_file_base {
|
||||
class ring_buffer_open_file final : public open_file_base {
|
||||
public:
|
||||
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout, filesystem_item fsi,
|
||||
@ -49,39 +49,87 @@ public:
|
||||
-> ring_buffer_open_file & = delete;
|
||||
|
||||
private:
|
||||
boost::dynamic_bitset<> ring_state_;
|
||||
std::string source_path_;
|
||||
std::size_t total_chunks_;
|
||||
|
||||
private:
|
||||
std::condition_variable chunk_notify_;
|
||||
mutable std::mutex chunk_mtx_;
|
||||
std::unique_ptr<utils::file::i_file> nf_;
|
||||
std::mutex read_mtx_;
|
||||
std::unique_ptr<std::thread> reader_thread_;
|
||||
std::size_t ring_begin_{};
|
||||
std::size_t ring_end_{};
|
||||
std::size_t ring_pos_{};
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
protected:
|
||||
[[nodiscard]] auto handle_read_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(data_buffer &buffer)> func) -> api_error override;
|
||||
private:
|
||||
[[nodiscard]] auto check_start() -> api_error;
|
||||
|
||||
[[nodiscard]] auto on_check_start() -> bool override;
|
||||
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
|
||||
|
||||
[[nodiscard]] auto
|
||||
on_chunk_downloaded(std::size_t chunk,
|
||||
const data_buffer &buffer) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto
|
||||
use_buffer(std::size_t chunk,
|
||||
std::function<api_error(const data_buffer &buffer)> func)
|
||||
-> api_error override;
|
||||
void reader_thread();
|
||||
|
||||
public:
|
||||
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
std::size_t ring_size) -> bool;
|
||||
|
||||
auto close() -> bool override;
|
||||
|
||||
void forward(std::size_t count);
|
||||
|
||||
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
|
||||
return ring_pos_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
|
||||
return ring_begin_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
|
||||
|
||||
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
|
||||
|
||||
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
|
||||
|
||||
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
|
||||
return total_chunks_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto is_complete() const -> bool override { return false; }
|
||||
|
||||
[[nodiscard]] auto is_write_supported() const -> bool override {
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto
|
||||
native_operation(native_operation_callback callback) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
|
||||
native_operation_callback /* callback */)
|
||||
-> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto
|
||||
native_operation(native_operation_callback callback) -> api_error override;
|
||||
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto get_source_path() const -> std::string override {
|
||||
return source_path_;
|
||||
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
|
||||
void reverse(std::size_t count);
|
||||
|
||||
void set(std::size_t first_chunk, std::size_t current_chunk);
|
||||
|
||||
void set_api_path(const std::string &api_path) override;
|
||||
|
||||
[[nodiscard]] auto
|
||||
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
};
|
||||
} // namespace repertory
|
||||
|
@ -1,152 +0,0 @@
|
||||
/*
|
||||
Copyright <2018-2024> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
|
||||
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
|
||||
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
|
||||
#include "types/repertory.hpp"
|
||||
|
||||
namespace repertory {
|
||||
class i_provider;
|
||||
class i_upload_manager;
|
||||
|
||||
class ring_file_base : public open_file_base {
|
||||
public:
|
||||
ring_file_base(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
|
||||
filesystem_item fsi, i_provider &provider,
|
||||
std::size_t ring_size, bool disable_io);
|
||||
|
||||
~ring_file_base() override = default;
|
||||
|
||||
public:
|
||||
static constexpr const auto min_ring_size{5U};
|
||||
|
||||
public:
|
||||
ring_file_base() = delete;
|
||||
ring_file_base(const ring_file_base &) noexcept = delete;
|
||||
ring_file_base(ring_file_base &&) noexcept = delete;
|
||||
auto operator=(ring_file_base &&) noexcept -> ring_file_base & = delete;
|
||||
auto operator=(const ring_file_base &) noexcept -> ring_file_base & = delete;
|
||||
|
||||
private:
|
||||
boost::dynamic_bitset<> ring_state_;
|
||||
std::size_t total_chunks_;
|
||||
|
||||
private:
|
||||
std::condition_variable chunk_notify_;
|
||||
mutable std::mutex chunk_mtx_;
|
||||
std::mutex read_mtx_;
|
||||
std::unique_ptr<std::thread> reader_thread_;
|
||||
std::size_t ring_begin_{};
|
||||
std::size_t ring_end_{};
|
||||
std::size_t ring_pos_{};
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
private:
|
||||
[[nodiscard]] auto check_start() -> api_error;
|
||||
|
||||
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
|
||||
|
||||
void reader_thread();
|
||||
|
||||
void update_position(std::size_t count, bool is_forward);
|
||||
|
||||
protected:
|
||||
[[nodiscard]] auto get_read_state_size() const -> std::size_t;
|
||||
|
||||
[[nodiscard]] virtual auto handle_read_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(data_buffer &buffer)> func) -> api_error = 0;
|
||||
|
||||
[[nodiscard]] auto has_reader_thread() -> bool {
|
||||
return reader_thread_ != nullptr;
|
||||
}
|
||||
|
||||
[[nodiscard]] virtual auto on_check_start() -> bool = 0;
|
||||
|
||||
[[nodiscard]] virtual auto
|
||||
on_chunk_downloaded(std::size_t /* chunk */,
|
||||
const data_buffer & /* buffer */) -> api_error {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
[[nodiscard]] virtual auto
|
||||
use_buffer(std::size_t chunk,
|
||||
std::function<api_error(const data_buffer &buffer)> func)
|
||||
-> api_error = 0;
|
||||
|
||||
public:
|
||||
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
std::size_t ring_size) -> bool;
|
||||
|
||||
auto close() -> bool override;
|
||||
|
||||
void forward(std::size_t count);
|
||||
|
||||
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
|
||||
return ring_pos_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
|
||||
return ring_begin_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
|
||||
|
||||
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
|
||||
|
||||
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
|
||||
|
||||
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
|
||||
return total_chunks_;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto is_complete() const -> bool override { return false; }
|
||||
|
||||
[[nodiscard]] auto is_write_supported() const -> bool override {
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
|
||||
void reverse(std::size_t count);
|
||||
|
||||
void set(std::size_t first_chunk, std::size_t current_chunk);
|
||||
|
||||
void set_api_path(const std::string &api_path) override;
|
||||
|
||||
[[nodiscard]] auto
|
||||
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
};
|
||||
} // namespace repertory
|
||||
|
||||
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_FILE_BASE_HPP_
|
@ -21,28 +21,303 @@
|
||||
*/
|
||||
#include "file_manager/direct_open_file.hpp"
|
||||
|
||||
#include "events/event_system.hpp"
|
||||
#include "file_manager/events.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
#include "providers/i_provider.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/common.hpp"
|
||||
|
||||
namespace repertory {
|
||||
direct_open_file::direct_open_file(std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout,
|
||||
filesystem_item fsi, i_provider &provider)
|
||||
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, min_ring_size,
|
||||
true) {}
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, provider, true),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size))) {
|
||||
if (fsi.size > 0U) {
|
||||
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
|
||||
|
||||
direct_open_file::~direct_open_file() { close(); }
|
||||
|
||||
auto direct_open_file::handle_read_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(data_buffer &data)> func) -> api_error {
|
||||
return func(ring_data_.at(chunk % get_read_state_size()));
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
}
|
||||
}
|
||||
|
||||
auto direct_open_file::on_check_start() -> bool {
|
||||
return (get_file_size() == 0U || has_reader_thread());
|
||||
direct_open_file::~direct_open_file() {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
close();
|
||||
|
||||
if (reader_thread_) {
|
||||
reader_thread_->join();
|
||||
reader_thread_.reset();
|
||||
}
|
||||
}
|
||||
|
||||
auto direct_open_file::use_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(const data_buffer &data)> func) -> api_error {
|
||||
return func(ring_data_.at(chunk % get_read_state_size()));
|
||||
auto direct_open_file::check_start() -> api_error {
|
||||
if (get_file_size() == 0U || reader_thread_) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(get_api_path(), "direct");
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto direct_open_file::close() -> bool {
|
||||
stop_requested_ = true;
|
||||
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
return open_file_base::close();
|
||||
}
|
||||
|
||||
auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
bool skip_active) -> api_error {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
const auto unlock_and_notify = [this, &chunk_lock]() {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
const auto unlock_and_return =
|
||||
[&unlock_and_notify](api_error res) -> api_error {
|
||||
unlock_and_notify();
|
||||
return res;
|
||||
};
|
||||
|
||||
if (chunk < ring_begin_ || chunk > ring_end_) {
|
||||
return unlock_and_return(api_error::invalid_ring_buffer_position);
|
||||
}
|
||||
|
||||
if (active_downloads_.find(chunk) != active_downloads_.end()) {
|
||||
if (skip_active) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download = active_downloads_.at(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
return active_download->wait();
|
||||
}
|
||||
|
||||
if (ring_state_[chunk % ring_state_.size()]) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download{std::make_shared<download>()};
|
||||
active_downloads_[chunk] = active_download;
|
||||
|
||||
auto &buffer = ring_data_.at(chunk % ring_state_.size());
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
|
||||
auto res{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
if (res == api_error::success) {
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(get_api_path(), "direct",
|
||||
progress);
|
||||
}
|
||||
|
||||
active_downloads_.erase(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
active_download->notify(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
void direct_open_file::forward(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
|
||||
if ((ring_pos_ + count) <= ring_end_) {
|
||||
ring_pos_ += count;
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_;
|
||||
}
|
||||
|
||||
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
void direct_open_file::reverse(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
count = std::min(ring_pos_, count);
|
||||
|
||||
if ((ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ -= count;
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (is_directory()) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
read_size =
|
||||
utils::calculate_read_size(get_file_size(), read_size, read_offset);
|
||||
if (read_size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
|
||||
read_offset = read_offset - (begin_chunk * get_chunk_size());
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
|
||||
++chunk) {
|
||||
reset_timeout();
|
||||
|
||||
if (chunk > ring_pos_) {
|
||||
forward(chunk - ring_pos_);
|
||||
} else if (chunk < ring_pos_) {
|
||||
reverse(ring_pos_ - chunk);
|
||||
}
|
||||
|
||||
res = download_chunk(chunk, false);
|
||||
if (res != api_error::success) {
|
||||
if (not stop_requested_ &&
|
||||
res == api_error::invalid_ring_buffer_position) {
|
||||
read_lock.unlock();
|
||||
|
||||
// TODO limit retry
|
||||
return read(read_size, read_offset, data);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
auto to_read{
|
||||
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
|
||||
read_size),
|
||||
};
|
||||
|
||||
auto &buffer = ring_data_.at(chunk % ring_state_.size());
|
||||
auto begin =
|
||||
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
|
||||
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
|
||||
data.insert(data.end(), begin, end);
|
||||
|
||||
read_offset = 0U;
|
||||
read_size -= to_read;
|
||||
}
|
||||
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
}
|
||||
|
||||
void direct_open_file::reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(get_api_path(), "direct",
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
void direct_open_file::set_api_path(const std::string &api_path) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
open_file_base::set_api_path(api_path);
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
} // namespace repertory
|
||||
|
@ -21,10 +21,16 @@
|
||||
*/
|
||||
#include "file_manager/ring_buffer_open_file.hpp"
|
||||
|
||||
#include "events/event_system.hpp"
|
||||
#include "file_manager/events.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
#include "platform/platform.hpp"
|
||||
#include "providers/i_provider.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/common.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
#include "utils/file.hpp"
|
||||
#include "utils/file_utils.hpp"
|
||||
#include "utils/path.hpp"
|
||||
|
||||
namespace repertory {
|
||||
ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
@ -33,12 +39,25 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
filesystem_item fsi,
|
||||
i_provider &provider,
|
||||
std::size_t ring_size)
|
||||
: ring_file_base(chunk_size, chunk_timeout, fsi, provider, ring_size,
|
||||
false),
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, provider, false),
|
||||
ring_state_(ring_size),
|
||||
source_path_(utils::path::combine(buffer_directory,
|
||||
{
|
||||
utils::create_uuid_string(),
|
||||
})) {}
|
||||
})),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size))) {
|
||||
if (ring_size < 5U) {
|
||||
throw std::runtime_error("ring size must be greater than or equal to 5");
|
||||
}
|
||||
|
||||
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
|
||||
throw std::runtime_error("file size is less than ring buffer size");
|
||||
}
|
||||
|
||||
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
|
||||
ring_state_.set(0U, ring_size, false);
|
||||
}
|
||||
|
||||
ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
@ -48,93 +67,379 @@ ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
if (nf_) {
|
||||
nf_->close();
|
||||
nf_.reset();
|
||||
|
||||
if (not utils::file::file(source_path_).remove()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, get_api_path(), source_path_,
|
||||
utils::get_last_error_code(), "failed to delete file");
|
||||
}
|
||||
}
|
||||
|
||||
if (utils::file::file(get_source_path()).remove()) {
|
||||
return;
|
||||
if (reader_thread_) {
|
||||
reader_thread_->join();
|
||||
reader_thread_.reset();
|
||||
}
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, get_api_path(), get_source_path(),
|
||||
utils::get_last_error_code(), "failed to delete file");
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::handle_read_buffer(
|
||||
std::size_t /* chunk */,
|
||||
std::function<api_error(data_buffer &data)> func) -> api_error {
|
||||
data_buffer buffer;
|
||||
return func(buffer);
|
||||
auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
std::size_t ring_size) -> bool {
|
||||
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::on_check_start() -> bool {
|
||||
auto ring_buffer_open_file::check_start() -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (nf_) {
|
||||
return true;
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto buffer_directory{utils::path::get_parent_path(get_source_path())};
|
||||
auto buffer_directory{utils::path::get_parent_path(source_path_)};
|
||||
if (not utils::file::directory(buffer_directory).create_directory()) {
|
||||
throw std::runtime_error(
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to create buffer directory|path|{}|err|{}",
|
||||
buffer_directory, utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
nf_ = utils::file::file::open_or_create_file(get_source_path());
|
||||
nf_ = utils::file::file::open_or_create_file(source_path_);
|
||||
if (not nf_ || not *nf_) {
|
||||
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to create buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
if (not nf_->truncate(get_read_state_size() * get_chunk_size())) {
|
||||
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
|
||||
if (not nf_->truncate(ring_state_.size() * get_chunk_size())) {
|
||||
nf_->close();
|
||||
nf_.reset();
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, get_api_path(), source_path_,
|
||||
fmt::format("failed to resize buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
return false;
|
||||
event_system::instance().raise<download_begin>(get_api_path(), source_path_);
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::on_chunk_downloaded(
|
||||
std::size_t chunk, const data_buffer &buffer) -> api_error {
|
||||
return do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer, (chunk % get_read_state_size()) * get_chunk_size(),
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
auto ring_buffer_open_file::close() -> bool {
|
||||
stop_requested_ = true;
|
||||
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
return open_file_base::close();
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
bool skip_active) -> api_error {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
const auto unlock_and_notify = [this, &chunk_lock]() {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
const auto unlock_and_return =
|
||||
[&unlock_and_notify](api_error res) -> api_error {
|
||||
unlock_and_notify();
|
||||
return res;
|
||||
};
|
||||
|
||||
if (chunk < ring_begin_ || chunk > ring_end_) {
|
||||
return unlock_and_return(api_error::invalid_ring_buffer_position);
|
||||
}
|
||||
|
||||
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
|
||||
if (skip_active) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
return api_error::os_error;
|
||||
});
|
||||
auto active_download = get_active_downloads().at(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
return active_download->wait();
|
||||
}
|
||||
|
||||
if (ring_state_[chunk % ring_state_.size()]) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download{std::make_shared<download>()};
|
||||
get_active_downloads()[chunk] = active_download;
|
||||
|
||||
data_buffer buffer;
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
|
||||
auto res{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
if (res == api_error::success) {
|
||||
res = do_io([&]() -> api_error {
|
||||
std::size_t bytes_written{};
|
||||
if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(),
|
||||
&bytes_written)) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
return api_error::os_error;
|
||||
});
|
||||
|
||||
if (res == api_error::success) {
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress = (static_cast<double>(chunk + 1U) /
|
||||
static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(get_api_path(),
|
||||
source_path_, progress);
|
||||
}
|
||||
}
|
||||
|
||||
get_active_downloads().erase(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
active_download->notify(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::native_operation(native_operation_callback callback)
|
||||
-> api_error {
|
||||
void ring_buffer_open_file::forward(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
|
||||
if ((ring_pos_ + count) <= ring_end_) {
|
||||
ring_pos_ += count;
|
||||
} else {
|
||||
auto added = count - (ring_end_ - ring_pos_);
|
||||
if (added >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += count;
|
||||
ring_begin_ += added;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < added; ++idx) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ += added;
|
||||
ring_pos_ += count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_;
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::native_operation(
|
||||
i_open_file::native_operation_callback callback) -> api_error {
|
||||
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::use_buffer(
|
||||
std::size_t chunk,
|
||||
std::function<api_error(const data_buffer &data)> func) -> api_error {
|
||||
data_buffer buffer;
|
||||
buffer.resize(get_chunk_size());
|
||||
auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (is_directory()) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
auto res = do_io([&]() -> api_error {
|
||||
std::size_t bytes_read{};
|
||||
auto result =
|
||||
nf_->read(buffer, (chunk % get_read_state_size()) * get_chunk_size(),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
buffer.resize(bytes_read);
|
||||
return result;
|
||||
});
|
||||
reset_timeout();
|
||||
|
||||
read_size =
|
||||
utils::calculate_read_size(get_file_size(), read_size, read_offset);
|
||||
if (read_size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
|
||||
read_offset = read_offset - (begin_chunk * get_chunk_size());
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
|
||||
return func(buffer);
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
|
||||
++chunk) {
|
||||
reset_timeout();
|
||||
|
||||
if (chunk > ring_pos_) {
|
||||
forward(chunk - ring_pos_);
|
||||
} else if (chunk < ring_pos_) {
|
||||
reverse(ring_pos_ - chunk);
|
||||
}
|
||||
|
||||
res = download_chunk(chunk, false);
|
||||
if (res != api_error::success) {
|
||||
if (not stop_requested_ &&
|
||||
res == api_error::invalid_ring_buffer_position) {
|
||||
read_lock.unlock();
|
||||
|
||||
// TODO limit retry
|
||||
return read(read_size, read_offset, data);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
auto to_read{
|
||||
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
|
||||
read_size),
|
||||
};
|
||||
|
||||
res = do_io([&]() -> api_error {
|
||||
data_buffer buffer(to_read);
|
||||
|
||||
std::size_t bytes_read{};
|
||||
auto result =
|
||||
nf_->read(
|
||||
buffer,
|
||||
(((chunk % ring_state_.size()) * get_chunk_size()) + read_offset),
|
||||
&bytes_read)
|
||||
? api_error::success
|
||||
: api_error::os_error;
|
||||
|
||||
if (result != api_error::success) {
|
||||
return result;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
data.insert(data.end(), buffer.begin(), buffer.end());
|
||||
read_size -= bytes_read;
|
||||
|
||||
return result;
|
||||
});
|
||||
|
||||
read_offset = 0U;
|
||||
}
|
||||
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(get_api_path(), source_path_,
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
count = std::min(ring_pos_, count);
|
||||
|
||||
if ((ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ -= count;
|
||||
} else {
|
||||
auto removed = count - (ring_pos_ - ring_begin_);
|
||||
if (removed >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ -= count;
|
||||
ring_begin_ = ring_pos_;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < removed; ++idx) {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
ring_begin_ -= removed;
|
||||
ring_pos_ -= count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::set(std::size_t first_chunk,
|
||||
std::size_t current_chunk) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if (first_chunk >= total_chunks_) {
|
||||
chunk_notify_.notify_all();
|
||||
throw std::runtime_error("first chunk must be less than total chunks");
|
||||
}
|
||||
|
||||
ring_begin_ = first_chunk;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
|
||||
if (current_chunk > ring_end_) {
|
||||
chunk_notify_.notify_all();
|
||||
throw std::runtime_error(
|
||||
"current chunk must be less than or equal to last chunk");
|
||||
}
|
||||
|
||||
ring_pos_ = current_chunk;
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::set_api_path(const std::string &api_path) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
open_file_base::set_api_path(api_path);
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
} // namespace repertory
|
||||
|
@ -1,386 +0,0 @@
|
||||
/*
|
||||
Copyright <2018-2024> <scott.e.graves@protonmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
*/
|
||||
#include "file_manager/ring_file_base.hpp"
|
||||
|
||||
#include "events/event_system.hpp"
|
||||
#include "file_manager/events.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
#include "platform/platform.hpp"
|
||||
#include "providers/i_provider.hpp"
|
||||
#include "types/repertory.hpp"
|
||||
#include "utils/common.hpp"
|
||||
#include "utils/error_utils.hpp"
|
||||
|
||||
namespace repertory {
|
||||
ring_file_base::ring_file_base(std::uint64_t chunk_size,
|
||||
std::uint8_t chunk_timeout, filesystem_item fsi,
|
||||
i_provider &provider, std::size_t ring_size,
|
||||
bool disable_io)
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io),
|
||||
ring_state_(ring_size),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi.size, chunk_size))) {
|
||||
if (disable_io) {
|
||||
if (fsi.size > 0U) {
|
||||
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
|
||||
}
|
||||
} else {
|
||||
if (ring_state_.size() < min_ring_size) {
|
||||
throw std::runtime_error(fmt::format(
|
||||
"ring size must be greater than or equal to {}", min_ring_size));
|
||||
}
|
||||
|
||||
if (not can_handle_file(fsi.size, chunk_size, get_read_state_size())) {
|
||||
throw std::runtime_error("file size is less than ring buffer size");
|
||||
}
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
}
|
||||
|
||||
auto ring_file_base::can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
std::size_t ring_size) -> bool {
|
||||
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
|
||||
}
|
||||
|
||||
auto ring_file_base::check_start() -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
try {
|
||||
if (on_check_start()) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(get_api_path(),
|
||||
get_source_path());
|
||||
reader_thread_ =
|
||||
std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
} catch (const std::exception &ex) {
|
||||
utils::error::raise_api_path_error(function_name, get_api_path(),
|
||||
get_source_path(), ex,
|
||||
"failed to start");
|
||||
}
|
||||
|
||||
return api_error::error;
|
||||
}
|
||||
|
||||
auto ring_file_base::close() -> bool {
|
||||
stop_requested_ = true;
|
||||
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
auto ret = open_file_base::close();
|
||||
|
||||
if (reader_thread_) {
|
||||
reader_thread_->join();
|
||||
reader_thread_.reset();
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
auto ring_file_base::download_chunk(std::size_t chunk,
|
||||
bool skip_active) -> api_error {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
const auto unlock_and_notify = [this, &chunk_lock]() {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
const auto unlock_and_return =
|
||||
[&unlock_and_notify](api_error res) -> api_error {
|
||||
unlock_and_notify();
|
||||
return res;
|
||||
};
|
||||
|
||||
if (chunk < ring_begin_ || chunk > ring_end_) {
|
||||
return unlock_and_return(api_error::invalid_ring_buffer_position);
|
||||
}
|
||||
|
||||
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
|
||||
if (skip_active) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download = get_active_downloads().at(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
return active_download->wait();
|
||||
}
|
||||
|
||||
if (ring_state_[chunk % ring_state_.size()]) {
|
||||
return unlock_and_return(api_error::success);
|
||||
}
|
||||
|
||||
auto active_download{std::make_shared<download>()};
|
||||
get_active_downloads()[chunk] = active_download;
|
||||
|
||||
auto data_offset{chunk * get_chunk_size()};
|
||||
auto data_size{
|
||||
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
|
||||
};
|
||||
|
||||
auto res = handle_read_buffer(chunk, [&](auto &&buffer) {
|
||||
auto result{
|
||||
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
|
||||
buffer, stop_requested_),
|
||||
};
|
||||
|
||||
if (result != api_error::success) {
|
||||
return result;
|
||||
}
|
||||
result = on_chunk_downloaded(chunk, buffer);
|
||||
if (result != api_error::success) {
|
||||
return result;
|
||||
}
|
||||
|
||||
ring_state_[chunk % ring_state_.size()] = true;
|
||||
auto progress =
|
||||
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
|
||||
100.0;
|
||||
event_system::instance().raise<download_progress>(
|
||||
get_api_path(), get_source_path(), progress);
|
||||
return api_error::success;
|
||||
});
|
||||
|
||||
get_active_downloads().erase(chunk);
|
||||
unlock_and_notify();
|
||||
|
||||
active_download->notify(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
void ring_file_base::forward(std::size_t count) {
|
||||
update_position(count, true);
|
||||
}
|
||||
|
||||
auto ring_file_base::get_read_state() const -> boost::dynamic_bitset<> {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_;
|
||||
}
|
||||
|
||||
auto ring_file_base::get_read_state(std::size_t chunk) const -> bool {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_[chunk % ring_state_.size()];
|
||||
}
|
||||
|
||||
auto ring_file_base::get_read_state_size() const -> std::size_t {
|
||||
recur_mutex_lock file_lock(get_mutex());
|
||||
return ring_state_.size();
|
||||
}
|
||||
|
||||
auto ring_file_base::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (is_directory()) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
read_size =
|
||||
utils::calculate_read_size(get_file_size(), read_size, read_offset);
|
||||
if (read_size == 0U) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
|
||||
read_offset = read_offset - (begin_chunk * get_chunk_size());
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
|
||||
++chunk) {
|
||||
reset_timeout();
|
||||
|
||||
if (chunk > ring_pos_) {
|
||||
forward(chunk - ring_pos_);
|
||||
} else if (chunk < ring_pos_) {
|
||||
reverse(ring_pos_ - chunk);
|
||||
}
|
||||
|
||||
res = download_chunk(chunk, false);
|
||||
if (res != api_error::success) {
|
||||
if (not stop_requested_ &&
|
||||
res == api_error::invalid_ring_buffer_position) {
|
||||
read_lock.unlock();
|
||||
|
||||
// TODO limit retry
|
||||
return read(read_size, read_offset, data);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
reset_timeout();
|
||||
|
||||
auto to_read{
|
||||
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
|
||||
read_size),
|
||||
};
|
||||
|
||||
res = use_buffer(
|
||||
chunk, [&data, &read_offset, &to_read](auto &&buffer) -> api_error {
|
||||
auto begin =
|
||||
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
|
||||
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
|
||||
data.insert(data.end(), begin, end);
|
||||
return api_error::success;
|
||||
});
|
||||
|
||||
reset_timeout();
|
||||
read_size -= to_read;
|
||||
read_offset = 0U;
|
||||
}
|
||||
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
}
|
||||
|
||||
void ring_file_base::reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(
|
||||
get_api_path(), get_source_path(), api_error::download_stopped);
|
||||
}
|
||||
|
||||
void ring_file_base::reverse(std::size_t count) {
|
||||
update_position(count, false);
|
||||
}
|
||||
|
||||
void ring_file_base::set(std::size_t first_chunk, std::size_t current_chunk) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
if (first_chunk >= total_chunks_) {
|
||||
chunk_notify_.notify_all();
|
||||
throw std::runtime_error("first chunk must be less than total chunks");
|
||||
}
|
||||
|
||||
ring_begin_ = first_chunk;
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
|
||||
if (current_chunk > ring_end_) {
|
||||
chunk_notify_.notify_all();
|
||||
throw std::runtime_error(
|
||||
"current chunk must be less than or equal to last chunk");
|
||||
}
|
||||
|
||||
ring_pos_ = current_chunk;
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void ring_file_base::set_api_path(const std::string &api_path) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
open_file_base::set_api_path(api_path);
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
|
||||
void ring_file_base::update_position(std::size_t count, bool is_forward) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
|
||||
if (is_forward) {
|
||||
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
|
||||
count = (total_chunks_ - 1U) - ring_pos_;
|
||||
}
|
||||
} else {
|
||||
count = std::min(ring_pos_, count);
|
||||
}
|
||||
|
||||
if (is_forward ? (ring_pos_ + count) <= ring_end_
|
||||
: (ring_pos_ - count) >= ring_begin_) {
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
} else {
|
||||
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
|
||||
: count - (ring_pos_ - ring_begin_);
|
||||
|
||||
if (delta >= ring_state_.size()) {
|
||||
ring_state_.set(0U, ring_state_.size(), false);
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
} else {
|
||||
for (std::size_t idx = 0U; idx < delta; ++idx) {
|
||||
if (is_forward) {
|
||||
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
|
||||
} else {
|
||||
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
|
||||
}
|
||||
}
|
||||
|
||||
ring_begin_ += is_forward ? delta : -delta;
|
||||
ring_pos_ += is_forward ? count : -count;
|
||||
}
|
||||
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
}
|
||||
} // namespace repertory
|
Reference in New Issue
Block a user