This commit is contained in:
@ -42,8 +42,8 @@ public:
|
||||
direct_open_file(const direct_open_file &) noexcept = delete;
|
||||
direct_open_file(direct_open_file &&) noexcept = delete;
|
||||
auto operator=(direct_open_file &&) noexcept -> direct_open_file & = delete;
|
||||
auto operator=(const direct_open_file &) noexcept
|
||||
-> direct_open_file & = delete;
|
||||
auto
|
||||
operator=(const direct_open_file &) noexcept -> direct_open_file & = delete;
|
||||
|
||||
public:
|
||||
static constexpr const auto ring_size{5U};
|
||||
@ -64,10 +64,12 @@ private:
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
private:
|
||||
void background_reader_thread();
|
||||
[[nodiscard]] auto check_start() -> api_error;
|
||||
|
||||
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
|
||||
|
||||
void reader_thread();
|
||||
|
||||
public:
|
||||
auto close() -> bool override;
|
||||
|
||||
@ -109,10 +111,9 @@ public:
|
||||
|
||||
void set_api_path(const std::string &api_path) override;
|
||||
|
||||
[[nodiscard]] auto write(std::uint64_t /* write_offset */,
|
||||
const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */)
|
||||
-> api_error override {
|
||||
[[nodiscard]] auto
|
||||
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
};
|
||||
|
@ -42,18 +42,17 @@ public:
|
||||
ring_buffer_open_file() = delete;
|
||||
ring_buffer_open_file(const ring_buffer_open_file &) noexcept = delete;
|
||||
ring_buffer_open_file(ring_buffer_open_file &&) noexcept = delete;
|
||||
auto operator=(ring_buffer_open_file &&) noexcept
|
||||
-> ring_buffer_open_file & = delete;
|
||||
auto operator=(ring_buffer_open_file &&) noexcept -> ring_buffer_open_file & =
|
||||
delete;
|
||||
auto operator=(const ring_buffer_open_file &) noexcept
|
||||
-> ring_buffer_open_file & = delete;
|
||||
|
||||
private:
|
||||
boost::dynamic_bitset<> ring_state_;
|
||||
std::string source_path_;
|
||||
std::size_t total_chunks_;
|
||||
|
||||
private:
|
||||
std::unique_ptr<std::thread> chunk_forward_thread_;
|
||||
std::unique_ptr<std::thread> chunk_reverse_thread_;
|
||||
std::condition_variable chunk_notify_;
|
||||
mutable std::mutex chunk_mtx_;
|
||||
std::mutex read_mtx_;
|
||||
@ -61,16 +60,15 @@ private:
|
||||
std::size_t ring_begin_{};
|
||||
std::size_t ring_end_{};
|
||||
std::size_t ring_pos_{};
|
||||
std::string source_path_;
|
||||
stop_type stop_requested_{false};
|
||||
|
||||
private:
|
||||
void background_reader_thread();
|
||||
|
||||
[[nodiscard]] auto check_allocation() -> api_error;
|
||||
|
||||
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
|
||||
|
||||
void reader_thread();
|
||||
|
||||
public:
|
||||
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
@ -104,8 +102,8 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
[[nodiscard]] auto native_operation(native_operation_callback callback)
|
||||
-> api_error override;
|
||||
[[nodiscard]] auto
|
||||
native_operation(native_operation_callback callback) -> api_error override;
|
||||
|
||||
[[nodiscard]] auto native_operation(std::uint64_t /* new_file_size */,
|
||||
native_operation_callback /* callback */)
|
||||
@ -126,10 +124,9 @@ public:
|
||||
|
||||
void set_api_path(const std::string &api_path) override;
|
||||
|
||||
[[nodiscard]] auto write(std::uint64_t /* write_offset */,
|
||||
const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */)
|
||||
-> api_error override {
|
||||
[[nodiscard]] auto
|
||||
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
|
||||
std::size_t & /* bytes_written */) -> api_error override {
|
||||
return api_error::not_supported;
|
||||
}
|
||||
};
|
||||
|
@ -42,12 +42,7 @@ direct_open_file::direct_open_file(std::uint64_t chunk_size,
|
||||
ring_end_ =
|
||||
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
|
||||
ring_state_.set(0U, ring_state_.size(), true);
|
||||
|
||||
reader_thread_ =
|
||||
std::make_unique<std::thread>([this]() { background_reader_thread(); });
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, "direct");
|
||||
}
|
||||
|
||||
direct_open_file::~direct_open_file() {
|
||||
@ -61,48 +56,14 @@ direct_open_file::~direct_open_file() {
|
||||
}
|
||||
}
|
||||
|
||||
void direct_open_file::background_reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
auto direct_open_file::check_start() -> api_error {
|
||||
if (fsi_.size == 0U || reader_thread_) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, "direct",
|
||||
api_error::download_stopped);
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, "direct");
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto direct_open_file::close() -> bool {
|
||||
@ -115,8 +76,8 @@ auto direct_open_file::close() -> bool {
|
||||
return open_file_base::close();
|
||||
}
|
||||
|
||||
auto direct_open_file::download_chunk(std::size_t chunk, bool skip_active)
|
||||
-> api_error {
|
||||
auto direct_open_file::download_chunk(std::size_t chunk,
|
||||
bool skip_active) -> api_error {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
const auto unlock_and_notify = [this, &chunk_lock]() {
|
||||
chunk_notify_.notify_all();
|
||||
@ -266,9 +227,12 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
|
||||
read_offset = read_offset - (begin_chunk * chunk_size_);
|
||||
|
||||
auto res{api_error::success};
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_start();
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
|
||||
++chunk) {
|
||||
@ -313,6 +277,50 @@ auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
}
|
||||
|
||||
void direct_open_file::reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, "direct",
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
void direct_open_file::set_api_path(const std::string &api_path) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
open_file_base::set_api_path(api_path);
|
||||
|
@ -21,7 +21,6 @@
|
||||
*/
|
||||
#include "file_manager/ring_buffer_open_file.hpp"
|
||||
|
||||
#include "app_config.hpp"
|
||||
#include "events/event_system.hpp"
|
||||
#include "file_manager/events.hpp"
|
||||
#include "file_manager/open_file_base.hpp"
|
||||
@ -43,6 +42,8 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
std::size_t ring_size)
|
||||
: open_file_base(chunk_size, chunk_timeout, fsi, provider),
|
||||
ring_state_(ring_size),
|
||||
source_path_(utils::path::combine(buffer_directory,
|
||||
{utils::create_uuid_string()})),
|
||||
total_chunks_(static_cast<std::size_t>(
|
||||
utils::divide_with_ceiling(fsi_.size, chunk_size))) {
|
||||
if (ring_size < 5U) {
|
||||
@ -55,31 +56,6 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
|
||||
|
||||
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
|
||||
ring_state_.set(0U, ring_size, true);
|
||||
|
||||
buffer_directory = utils::path::absolute(buffer_directory);
|
||||
if (not utils::file::directory(buffer_directory).create_directory()) {
|
||||
throw std::runtime_error(
|
||||
fmt::format("failed to create buffer directory|path|{}|err|{}",
|
||||
buffer_directory, utils::get_last_error_code()));
|
||||
}
|
||||
|
||||
source_path_ =
|
||||
utils::path::combine(buffer_directory, {utils::create_uuid_string()});
|
||||
nf_ = utils::file::file::open_or_create_file(source_path_);
|
||||
if (not*nf_) {
|
||||
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
}
|
||||
|
||||
if (not nf_->truncate(ring_size * chunk_size)) {
|
||||
nf_->close();
|
||||
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
}
|
||||
|
||||
reader_thread_ =
|
||||
std::make_unique<std::thread>([this]() { background_reader_thread(); });
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, source_path_);
|
||||
}
|
||||
|
||||
ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
@ -89,6 +65,8 @@ ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
|
||||
if (nf_) {
|
||||
nf_->close();
|
||||
nf_.reset();
|
||||
|
||||
if (not utils::file::file(source_path_).remove()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
@ -102,50 +80,6 @@ ring_buffer_open_file::~ring_buffer_open_file() {
|
||||
}
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::background_reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, source_path_,
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
|
||||
std::size_t chunk_size,
|
||||
std::size_t ring_size) -> bool {
|
||||
@ -153,7 +87,44 @@ auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::check_allocation() -> api_error {
|
||||
REPERTORY_USES_FUNCTION_NAME();
|
||||
|
||||
if (nf_) {
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto buffer_directory{utils::path::get_parent_path(source_path_)};
|
||||
if (not utils::file::directory(buffer_directory).create_directory()) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
fmt::format("failed to create buffer directory|path|{}|err|{}",
|
||||
buffer_directory, utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
nf_ = utils::file::file::open_or_create_file(source_path_);
|
||||
if (not nf_ || not *nf_) {
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
fmt::format("failed to create buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
if (not nf_->truncate(ring_state_.size() * chunk_size_)) {
|
||||
nf_->close();
|
||||
nf_.reset();
|
||||
|
||||
utils::error::raise_api_path_error(
|
||||
function_name, fsi_.api_path, source_path_,
|
||||
fmt::format("failed to resize buffer file|err|{}",
|
||||
utils::get_last_error_code()));
|
||||
return api_error::os_error;
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_begin>(fsi_.api_path, source_path_);
|
||||
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
|
||||
return api_error::success;
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::close() -> bool {
|
||||
@ -166,8 +137,8 @@ auto ring_buffer_open_file::close() -> bool {
|
||||
return open_file_base::close();
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::download_chunk(std::size_t chunk, bool skip_active)
|
||||
-> api_error {
|
||||
auto ring_buffer_open_file::download_chunk(std::size_t chunk,
|
||||
bool skip_active) -> api_error {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
const auto unlock_and_notify = [this, &chunk_lock]() {
|
||||
chunk_notify_.notify_all();
|
||||
@ -316,8 +287,8 @@ void ring_buffer_open_file::reverse(std::size_t count) {
|
||||
}
|
||||
|
||||
auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
std::uint64_t read_offset, data_buffer &data)
|
||||
-> api_error {
|
||||
std::uint64_t read_offset,
|
||||
data_buffer &data) -> api_error {
|
||||
if (fsi_.directory) {
|
||||
return api_error::invalid_operation;
|
||||
}
|
||||
@ -332,9 +303,12 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
auto begin_chunk{static_cast<std::size_t>(read_offset / chunk_size_)};
|
||||
read_offset = read_offset - (begin_chunk * chunk_size_);
|
||||
|
||||
auto res{api_error::success};
|
||||
|
||||
unique_mutex_lock read_lock(read_mtx_);
|
||||
auto res = check_allocation();
|
||||
if (res != api_error::success) {
|
||||
return res;
|
||||
}
|
||||
|
||||
for (std::size_t chunk = begin_chunk;
|
||||
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
|
||||
++chunk) {
|
||||
@ -395,6 +369,50 @@ auto ring_buffer_open_file::read(std::size_t read_size,
|
||||
return stop_requested_ ? api_error::download_stopped : res;
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::reader_thread() {
|
||||
unique_mutex_lock chunk_lock(chunk_mtx_);
|
||||
auto next_chunk = ring_pos_;
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
while (not stop_requested_) {
|
||||
chunk_lock.lock();
|
||||
|
||||
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
|
||||
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
|
||||
if (stop_requested_) {
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
return;
|
||||
}
|
||||
|
||||
if (get_read_state().all()) {
|
||||
chunk_notify_.wait(chunk_lock);
|
||||
next_chunk = ring_pos_;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
};
|
||||
|
||||
if (not ring_state_[next_chunk % ring_state_.size()]) {
|
||||
check_and_wait();
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk_notify_.notify_all();
|
||||
chunk_lock.unlock();
|
||||
|
||||
download_chunk(next_chunk, true);
|
||||
|
||||
chunk_lock.lock();
|
||||
check_and_wait();
|
||||
}
|
||||
|
||||
event_system::instance().raise<download_end>(fsi_.api_path, source_path_,
|
||||
api_error::download_stopped);
|
||||
}
|
||||
|
||||
void ring_buffer_open_file::set(std::size_t first_chunk,
|
||||
std::size_t current_chunk) {
|
||||
mutex_lock chunk_lock(chunk_mtx_);
|
||||
|
Reference in New Issue
Block a user