refactor
All checks were successful
BlockStorage/repertory/pipeline/head This commit looks good

This commit is contained in:
2024-12-28 11:25:15 -06:00
parent d81be88d8d
commit 827d0b5371
6 changed files with 633 additions and 746 deletions

View File

@ -22,7 +22,7 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_DIRECT_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "file_manager/ring_buffer_common.hpp"
#include "types/repertory.hpp"
@ -30,7 +30,7 @@ namespace repertory {
class i_provider;
class i_upload_manager;
class direct_open_file final : public open_file_base {
class direct_open_file final : public ring_buffer_common {
public:
direct_open_file(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider);
@ -45,52 +45,28 @@ public:
auto
operator=(const direct_open_file &) noexcept -> direct_open_file & = delete;
public:
static constexpr const auto ring_size{5U};
private:
std::size_t total_chunks_;
std::array<data_buffer, min_ring_size> ring_data_;
private:
mutable std::mutex chunk_mtx_;
std::condition_variable chunk_notify_;
std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_;
std::size_t ring_begin_{};
std::array<data_buffer, ring_size> ring_data_;
std::size_t ring_end_{};
std::size_t ring_pos_{};
boost::dynamic_bitset<> ring_state_{ring_size};
stop_type stop_requested_{false};
protected:
[[nodiscard]] auto on_check_start() -> bool override;
private:
[[nodiscard]] auto check_start() -> api_error;
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void reader_thread();
void update_position(std::size_t count, bool is_forward);
public:
auto close() -> bool override;
void forward(std::size_t count);
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
[[nodiscard]] auto
on_chunk_downloaded(std::size_t /* chunk */,
const data_buffer & /* buffer */) -> api_error override {
return api_error::success;
}
[[nodiscard]] auto is_complete() const -> bool override { return false; }
[[nodiscard]] auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
[[nodiscard]] auto is_write_supported() const -> bool override {
return false;
}
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override;
public:
[[nodiscard]] auto native_operation(native_operation_callback /* callback */)
-> api_error override {
return api_error::not_supported;
@ -101,23 +77,6 @@ public:
-> api_error override {
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
std::size_t & /* bytes_written */) -> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory

View File

@ -0,0 +1,151 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_
#include "file_manager/open_file_base.hpp"
#include "types/repertory.hpp"
#include "utils/file.hpp"
namespace repertory {
class i_provider;
class i_upload_manager;
class ring_buffer_common : public open_file_base {
public:
ring_buffer_common(std::uint64_t chunk_size, std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider,
std::size_t ring_size, bool disable_io);
~ring_buffer_common() override = default;
public:
ring_buffer_common() = delete;
ring_buffer_common(const ring_buffer_common &) noexcept = delete;
ring_buffer_common(ring_buffer_common &&) noexcept = delete;
auto
operator=(ring_buffer_common &&) noexcept -> ring_buffer_common & = delete;
auto operator=(const ring_buffer_common &) noexcept -> ring_buffer_common & =
delete;
public:
static constexpr const auto min_ring_size{5U};
private:
boost::dynamic_bitset<> ring_state_;
std::size_t total_chunks_;
private:
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_;
std::size_t ring_begin_{};
std::size_t ring_end_{};
std::size_t ring_pos_{};
stop_type stop_requested_{false};
private:
[[nodiscard]] auto check_start() -> api_error;
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
void reader_thread();
void update_position(std::size_t count, bool is_forward);
protected:
[[nodiscard]] auto has_reader_thread() const -> bool {
return reader_thread_ != nullptr;
}
[[nodiscard]] auto get_ring_size() const -> std::size_t {
return ring_state_.size();
}
[[nodiscard]] virtual auto on_check_start() -> bool = 0;
[[nodiscard]] virtual auto
on_chunk_downloaded(std::size_t chunk,
const data_buffer &buffer) -> api_error = 0;
[[nodiscard]] virtual auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error = 0;
[[nodiscard]] virtual auto
use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func) -> api_error = 0;
public:
auto close() -> bool override;
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return ring_pos_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return ring_begin_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return false; }
[[nodiscard]] auto is_write_supported() const -> bool override {
return false;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
std::size_t & /* bytes_written */) -> api_error override {
return api_error::not_supported;
}
};
} // namespace repertory
#endif // REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_COMMON_HPP_

View File

@ -22,7 +22,7 @@
#ifndef REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#define REPERTORY_INCLUDE_FILE_MANAGER_RING_BUFFER_OPEN_FILE_HPP_
#include "file_manager/open_file_base.hpp"
#include "file_manager/ring_buffer_common.hpp"
#include "types/repertory.hpp"
#include "utils/file.hpp"
@ -31,7 +31,7 @@ namespace repertory {
class i_provider;
class i_upload_manager;
class ring_buffer_open_file final : public open_file_base {
class ring_buffer_open_file final : public ring_buffer_common {
public:
ring_buffer_open_file(std::string buffer_directory, std::uint64_t chunk_size,
std::uint8_t chunk_timeout, filesystem_item fsi,
@ -51,61 +51,31 @@ public:
private:
boost::dynamic_bitset<> ring_state_;
std::string source_path_;
std::size_t total_chunks_;
private:
std::condition_variable chunk_notify_;
mutable std::mutex chunk_mtx_;
std::unique_ptr<utils::file::i_file> nf_;
std::mutex read_mtx_;
std::unique_ptr<std::thread> reader_thread_;
std::size_t ring_begin_{};
std::size_t ring_end_{};
std::size_t ring_pos_{};
stop_type stop_requested_{false};
private:
[[nodiscard]] auto check_start() -> api_error;
protected:
[[nodiscard]] auto on_check_start() -> bool override;
auto download_chunk(std::size_t chunk, bool skip_active) -> api_error;
[[nodiscard]] auto
on_chunk_downloaded(std::size_t chunk,
const data_buffer &buffer) -> api_error override;
void reader_thread();
[[nodiscard]] auto
on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset, data_buffer &data,
std::size_t &bytes_read) -> api_error override;
void update_position(std::size_t count, bool is_forward);
[[nodiscard]] auto use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error override;
public:
[[nodiscard]] static auto can_handle_file(std::uint64_t file_size,
std::size_t chunk_size,
std::size_t ring_size) -> bool;
auto close() -> bool override;
void forward(std::size_t count);
[[nodiscard]] auto get_current_chunk() const -> std::size_t {
return ring_pos_;
}
[[nodiscard]] auto get_first_chunk() const -> std::size_t {
return ring_begin_;
}
[[nodiscard]] auto get_last_chunk() const -> std::size_t { return ring_end_; }
[[nodiscard]] auto get_read_state() const -> boost::dynamic_bitset<> override;
[[nodiscard]] auto get_read_state(std::size_t chunk) const -> bool override;
[[nodiscard]] auto get_total_chunks() const -> std::size_t {
return total_chunks_;
}
[[nodiscard]] auto is_complete() const -> bool override { return false; }
[[nodiscard]] auto is_write_supported() const -> bool override {
return false;
}
[[nodiscard]] auto
native_operation(native_operation_callback callback) -> api_error override;
@ -115,23 +85,8 @@ public:
return api_error::not_supported;
}
[[nodiscard]] auto read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error override;
[[nodiscard]] auto resize(std::uint64_t /* size */) -> api_error override {
return api_error::not_supported;
}
void reverse(std::size_t count);
void set(std::size_t first_chunk, std::size_t current_chunk);
void set_api_path(const std::string &api_path) override;
[[nodiscard]] auto
write(std::uint64_t /* write_offset */, const data_buffer & /* data */,
std::size_t & /* bytes_written */) -> api_error override {
return api_error::not_supported;
[[nodiscard]] auto get_source_path() const -> std::string override {
return source_path_;
}
};
} // namespace repertory

View File

@ -21,301 +21,43 @@
*/
#include "file_manager/direct_open_file.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
#include "file_manager/open_file_base.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
namespace repertory {
direct_open_file::direct_open_file(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi, i_provider &provider)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, true),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (fsi.size > 0U) {
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), false);
}
}
: ring_buffer_common(chunk_size, chunk_timeout, fsi, provider,
min_ring_size, true) {}
direct_open_file::~direct_open_file() {
REPERTORY_USES_FUNCTION_NAME();
close();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
}
auto direct_open_file::check_start() -> api_error {
if (get_file_size() == 0U || reader_thread_) {
return api_error::success;
}
auto direct_open_file::on_check_start() -> bool {
return (get_file_size() == 0U || has_reader_thread());
}
event_system::instance().raise<download_begin>(get_api_path(), "direct");
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
auto direct_open_file::on_read_chunk(std::size_t chunk, std::size_t read_size,
std::uint64_t read_offset,
data_buffer &data,
std::size_t &bytes_read) -> api_error {
auto &buffer = ring_data_.at(chunk % get_ring_size());
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(read_size));
data.insert(data.end(), begin, end);
bytes_read = read_size;
return api_error::success;
}
auto direct_open_file::close() -> bool {
stop_requested_ = true;
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
chunk_lock.unlock();
return open_file_base::close();
}
auto direct_open_file::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = get_active_downloads().at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
get_active_downloads()[chunk] = active_download;
auto &buffer = ring_data_.at(chunk % ring_state_.size());
auto data_offset{chunk * get_chunk_size()};
auto data_size{
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
};
unlock_and_notify();
auto res{
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
buffer, stop_requested_),
};
chunk_lock.lock();
if (chunk < ring_begin_ || chunk > ring_end_) {
res = api_error::invalid_ring_buffer_position;
unlock_and_notify();
active_download->notify(res);
return res;
}
if (res == api_error::success) {
ring_state_[chunk % ring_state_.size()] = true;
auto progress =
(static_cast<double>(chunk + 1U) / static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(get_api_path(), "direct",
progress);
}
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
}
void direct_open_file::forward(std::size_t count) {
return update_position(count, true);
}
auto direct_open_file::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return ring_state_;
}
auto direct_open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()];
}
void direct_open_file::reverse(std::size_t count) {
return update_position(count, false);
}
auto direct_open_file::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) {
return api_error::invalid_operation;
}
reset_timeout();
read_size =
utils::calculate_read_size(get_file_size(), read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
read_offset = read_offset - (begin_chunk * get_chunk_size());
unique_mutex_lock read_lock(read_mtx_);
auto res = check_start();
if (res != api_error::success) {
return res;
}
for (std::size_t chunk = begin_chunk;
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
++chunk) {
reset_timeout();
if (chunk > ring_pos_) {
forward(chunk - ring_pos_);
} else if (chunk < ring_pos_) {
reverse(ring_pos_ - chunk);
}
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res;
}
reset_timeout();
auto to_read{
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
read_size),
};
auto &buffer = ring_data_.at(chunk % ring_state_.size());
auto begin =
std::next(buffer.begin(), static_cast<std::int64_t>(read_offset));
auto end = std::next(begin, static_cast<std::int64_t>(to_read));
data.insert(data.end(), begin, end);
read_offset = 0U;
read_size -= to_read;
}
return stop_requested_ ? api_error::download_stopped : res;
}
void direct_open_file::reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
chunk_notify_.notify_all();
chunk_lock.unlock();
while (not stop_requested_) {
chunk_lock.lock();
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
if (stop_requested_) {
chunk_notify_.notify_all();
chunk_lock.unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
next_chunk = ring_pos_;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
};
if (ring_state_[next_chunk % ring_state_.size()]) {
check_and_wait();
continue;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(get_api_path(), "direct",
api_error::download_stopped);
}
void direct_open_file::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void direct_open_file::update_position(std::size_t count, bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
} else {
count = std::min(ring_pos_, count);
}
if (is_forward ? (ring_pos_ + count) <= ring_end_
: (ring_pos_ - count) >= ring_begin_) {
ring_pos_ += is_forward ? count : -count;
} else {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
auto direct_open_file::use_buffer(std::size_t chunk,
std::function<api_error(data_buffer &)> func)
-> api_error {
return func(ring_data_.at(chunk % get_ring_size()));
}
} // namespace repertory

View File

@ -0,0 +1,372 @@
/*
Copyright <2018-2024> <scott.e.graves@protonmail.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "file_manager/ring_buffer_common.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
#include "file_manager/open_file_base.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
namespace repertory {
ring_buffer_common::ring_buffer_common(std::uint64_t chunk_size,
std::uint8_t chunk_timeout,
filesystem_item fsi,
i_provider &provider,
std::size_t ring_size, bool disable_io)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, disable_io),
ring_state_(ring_size),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (disable_io) {
if (fsi.size > 0U) {
ring_state_.resize(std::min(total_chunks_, ring_state_.size()));
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
ring_state_.set(0U, ring_state_.size(), false);
}
} else {
if (ring_size < min_ring_size) {
throw std::runtime_error("ring size must be greater than or equal to 5");
}
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
ring_state_.set(0U, ring_size, false);
}
}
auto ring_buffer_common::check_start() -> api_error {
REPERTORY_USES_FUNCTION_NAME();
try {
if (on_check_start()) {
return api_error::success;
}
event_system::instance().raise<download_begin>(get_api_path(),
get_source_path());
reader_thread_ =
std::make_unique<std::thread>([this]() { reader_thread(); });
return api_error::success;
} catch (const std::exception &ex) {
utils::error::raise_api_path_error(function_name, get_api_path(),
get_source_path(), ex,
"failed to start");
return api_error::error;
}
}
auto ring_buffer_common::close() -> bool {
stop_requested_ = true;
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
chunk_lock.unlock();
auto res = open_file_base::close();
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
}
return res;
}
auto ring_buffer_common::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = get_active_downloads().at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
get_active_downloads()[chunk] = active_download;
return use_buffer(chunk, [&](data_buffer &buffer) -> api_error {
auto data_offset{chunk * get_chunk_size()};
auto data_size{
chunk == (total_chunks_ - 1U) ? get_last_chunk_size()
: get_chunk_size(),
};
unlock_and_notify();
auto result{
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
buffer, stop_requested_),
};
chunk_lock.lock();
if (chunk < ring_begin_ || chunk > ring_end_) {
result = api_error::invalid_ring_buffer_position;
unlock_and_notify();
active_download->notify(result);
return result;
}
if (result == api_error::success) {
result = on_chunk_downloaded(chunk, buffer);
if (result == api_error::success) {
ring_state_[chunk % ring_state_.size()] = true;
auto progress = (static_cast<double>(chunk + 1U) /
static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(
get_api_path(), get_source_path(), progress);
}
}
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(result);
return result;
});
}
void ring_buffer_common::forward(std::size_t count) {
update_position(count, true);
}
auto ring_buffer_common::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return ring_state_;
}
auto ring_buffer_common::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()];
}
auto ring_buffer_common::read(std::size_t read_size, std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) {
return api_error::invalid_operation;
}
reset_timeout();
read_size =
utils::calculate_read_size(get_file_size(), read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
read_offset = read_offset - (begin_chunk * get_chunk_size());
unique_mutex_lock read_lock(read_mtx_);
auto res = check_start();
if (res != api_error::success) {
return res;
}
for (std::size_t chunk = begin_chunk;
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
++chunk) {
reset_timeout();
if (chunk > ring_pos_) {
forward(chunk - ring_pos_);
} else if (chunk < ring_pos_) {
reverse(ring_pos_ - chunk);
}
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res;
}
reset_timeout();
std::size_t bytes_read{};
res = on_read_chunk(
chunk,
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
read_size),
read_offset, data, bytes_read);
if (res != api_error::success) {
return res;
}
reset_timeout();
read_size -= bytes_read;
read_offset = 0U;
}
return stop_requested_ ? api_error::download_stopped : res;
}
void ring_buffer_common::reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
chunk_notify_.notify_all();
chunk_lock.unlock();
while (not stop_requested_) {
chunk_lock.lock();
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
if (stop_requested_) {
chunk_notify_.notify_all();
chunk_lock.unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
next_chunk = ring_pos_;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
};
if (ring_state_[next_chunk % ring_state_.size()]) {
check_and_wait();
continue;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(
get_api_path(), get_source_path(), api_error::download_stopped);
}
void ring_buffer_common::reverse(std::size_t count) {
update_position(count, false);
}
void ring_buffer_common::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
ring_begin_ = first_chunk;
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
if (current_chunk > ring_end_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), true);
chunk_notify_.notify_all();
}
void ring_buffer_common::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void ring_buffer_common::update_position(std::size_t count, bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
} else {
count = std::min(ring_pos_, count);
}
if (is_forward ? (ring_pos_ + count) <= ring_end_
: (ring_pos_ - count) >= ring_begin_) {
ring_pos_ += is_forward ? count : -count;
} else {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
}
} // namespace repertory

View File

@ -21,15 +21,12 @@
*/
#include "file_manager/ring_buffer_open_file.hpp"
#include "events/event_system.hpp"
#include "file_manager/events.hpp"
#include "file_manager/open_file_base.hpp"
#include "platform/platform.hpp"
#include "providers/i_provider.hpp"
#include "types/repertory.hpp"
#include "utils/common.hpp"
#include "utils/error_utils.hpp"
#include "utils/file_utils.hpp"
#include "utils/path.hpp"
namespace repertory {
@ -39,24 +36,16 @@ ring_buffer_open_file::ring_buffer_open_file(std::string buffer_directory,
filesystem_item fsi,
i_provider &provider,
std::size_t ring_size)
: open_file_base(chunk_size, chunk_timeout, fsi, provider, false),
: ring_buffer_common(chunk_size, chunk_timeout, fsi, provider, ring_size,
false),
ring_state_(ring_size),
source_path_(utils::path::combine(buffer_directory,
{
utils::create_uuid_string(),
})),
total_chunks_(static_cast<std::size_t>(
utils::divide_with_ceiling(fsi.size, chunk_size))) {
if (ring_size < 5U) {
throw std::runtime_error("ring size must be greater than or equal to 5");
}
})) {
if (not can_handle_file(fsi.size, chunk_size, ring_size)) {
throw std::runtime_error("file size is less than ring buffer size");
}
ring_end_ = std::min(total_chunks_ - 1U, ring_begin_ + ring_size - 1U);
ring_state_.set(0U, ring_size, false);
}
ring_buffer_open_file::~ring_buffer_open_file() {
@ -64,20 +53,17 @@ ring_buffer_open_file::~ring_buffer_open_file() {
close();
if (nf_) {
nf_->close();
nf_.reset();
if (not utils::file::file(source_path_).remove()) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
utils::get_last_error_code(), "failed to delete file");
}
if (not nf_) {
return;
}
if (reader_thread_) {
reader_thread_->join();
reader_thread_.reset();
nf_->close();
nf_.reset();
if (not utils::file::file(source_path_).remove()) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
utils::get_last_error_code(), "failed to delete file");
}
}
@ -87,358 +73,80 @@ auto ring_buffer_open_file::can_handle_file(std::uint64_t file_size,
return file_size >= (static_cast<std::uint64_t>(ring_size) * chunk_size);
}
auto ring_buffer_open_file::check_start() -> api_error {
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_buffer_open_file::on_check_start() -> bool {
REPERTORY_USES_FUNCTION_NAME();
if (nf_) {
return api_error::success;
return true;
}
auto buffer_directory{utils::path::get_parent_path(source_path_)};
if (not utils::file::directory(buffer_directory).create_directory()) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
throw std::runtime_error(
fmt::format("failed to create buffer directory|path|{}|err|{}",
buffer_directory, utils::get_last_error_code()));
return api_error::os_error;
}
nf_ = utils::file::file::open_or_create_file(source_path_);
if (not nf_ || not *nf_) {
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
fmt::format("failed to create buffer file|err|{}",
utils::get_last_error_code()));
return api_error::os_error;
throw std::runtime_error(fmt::format("failed to create buffer file|err|{}",
utils::get_last_error_code()));
}
if (not nf_->truncate(ring_state_.size() * get_chunk_size())) {
nf_->close();
nf_.reset();
utils::error::raise_api_path_error(
function_name, get_api_path(), source_path_,
fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
throw std::runtime_error(fmt::format("failed to resize buffer file|err|{}",
utils::get_last_error_code()));
}
return false;
}
auto ring_buffer_open_file::on_chunk_downloaded(
std::size_t chunk, const data_buffer &buffer) -> api_error {
return do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(),
&bytes_written)) {
return api_error::success;
}
return api_error::os_error;
}
event_system::instance().raise<download_begin>(get_api_path(), source_path_);
reader_thread_ = std::make_unique<std::thread>([this]() { reader_thread(); });
return api_error::success;
});
}
auto ring_buffer_open_file::close() -> bool {
stop_requested_ = true;
auto ring_buffer_open_file::on_read_chunk(
std::size_t chunk, std::size_t read_size, std::uint64_t read_offset,
data_buffer &data, std::size_t &bytes_read) -> api_error {
data_buffer buffer(read_size);
auto res = do_io([&]() -> api_error {
return nf_->read(buffer,
(((chunk % ring_state_.size()) * get_chunk_size()) +
read_offset),
&bytes_read)
? api_error::success
: api_error::os_error;
});
unique_mutex_lock chunk_lock(chunk_mtx_);
chunk_notify_.notify_all();
chunk_lock.unlock();
return open_file_base::close();
}
auto ring_buffer_open_file::download_chunk(std::size_t chunk,
bool skip_active) -> api_error {
unique_mutex_lock chunk_lock(chunk_mtx_);
const auto unlock_and_notify = [this, &chunk_lock]() {
chunk_notify_.notify_all();
chunk_lock.unlock();
};
const auto unlock_and_return =
[&unlock_and_notify](api_error res) -> api_error {
unlock_and_notify();
return res;
};
if (chunk < ring_begin_ || chunk > ring_end_) {
return unlock_and_return(api_error::invalid_ring_buffer_position);
}
if (get_active_downloads().find(chunk) != get_active_downloads().end()) {
if (skip_active) {
return unlock_and_return(api_error::success);
}
auto active_download = get_active_downloads().at(chunk);
unlock_and_notify();
return active_download->wait();
}
if (ring_state_[chunk % ring_state_.size()]) {
return unlock_and_return(api_error::success);
}
auto active_download{std::make_shared<download>()};
get_active_downloads()[chunk] = active_download;
data_buffer buffer;
auto data_offset{chunk * get_chunk_size()};
auto data_size{
chunk == (total_chunks_ - 1U) ? get_last_chunk_size() : get_chunk_size(),
};
unlock_and_notify();
auto res{
get_provider().read_file_bytes(get_api_path(), data_size, data_offset,
buffer, stop_requested_),
};
chunk_lock.lock();
if (chunk < ring_begin_ || chunk > ring_end_) {
res = api_error::invalid_ring_buffer_position;
unlock_and_notify();
active_download->notify(res);
return res;
}
if (res == api_error::success) {
res = do_io([&]() -> api_error {
std::size_t bytes_written{};
if (nf_->write(buffer, (chunk % ring_state_.size()) * get_chunk_size(),
&bytes_written)) {
return api_error::success;
}
return api_error::os_error;
});
if (res == api_error::success) {
ring_state_[chunk % ring_state_.size()] = true;
auto progress = (static_cast<double>(chunk + 1U) /
static_cast<double>(total_chunks_)) *
100.0;
event_system::instance().raise<download_progress>(get_api_path(),
source_path_, progress);
}
}
get_active_downloads().erase(chunk);
unlock_and_notify();
active_download->notify(res);
return res;
}
void ring_buffer_open_file::forward(std::size_t count) {
update_position(count, true);
}
auto ring_buffer_open_file::get_read_state() const -> boost::dynamic_bitset<> {
recur_mutex_lock file_lock(get_mutex());
return ring_state_;
}
auto ring_buffer_open_file::get_read_state(std::size_t chunk) const -> bool {
recur_mutex_lock file_lock(get_mutex());
return ring_state_[chunk % ring_state_.size()];
}
auto ring_buffer_open_file::native_operation(
i_open_file::native_operation_callback callback) -> api_error {
return do_io([&]() -> api_error { return callback(nf_->get_handle()); });
}
auto ring_buffer_open_file::read(std::size_t read_size,
std::uint64_t read_offset,
data_buffer &data) -> api_error {
if (is_directory()) {
return api_error::invalid_operation;
}
reset_timeout();
read_size =
utils::calculate_read_size(get_file_size(), read_size, read_offset);
if (read_size == 0U) {
return api_error::success;
}
auto begin_chunk{static_cast<std::size_t>(read_offset / get_chunk_size())};
read_offset = read_offset - (begin_chunk * get_chunk_size());
unique_mutex_lock read_lock(read_mtx_);
auto res = check_start();
if (res != api_error::success) {
return res;
}
for (std::size_t chunk = begin_chunk;
not stop_requested_ && (res == api_error::success) && (read_size > 0U);
++chunk) {
reset_timeout();
if (chunk > ring_pos_) {
forward(chunk - ring_pos_);
} else if (chunk < ring_pos_) {
reverse(ring_pos_ - chunk);
}
res = download_chunk(chunk, false);
if (res != api_error::success) {
if (res == api_error::invalid_ring_buffer_position) {
read_lock.unlock();
// TODO limit retry
return read(read_size, read_offset, data);
}
return res;
}
reset_timeout();
auto to_read{
std::min(static_cast<std::size_t>(get_chunk_size() - read_offset),
read_size),
};
res = do_io([&]() -> api_error {
data_buffer buffer(to_read);
std::size_t bytes_read{};
auto result =
nf_->read(
buffer,
(((chunk % ring_state_.size()) * get_chunk_size()) + read_offset),
&bytes_read)
? api_error::success
: api_error::os_error;
if (result != api_error::success) {
return result;
}
reset_timeout();
data.insert(data.end(), buffer.begin(), buffer.end());
read_size -= bytes_read;
return result;
});
read_offset = 0U;
}
return stop_requested_ ? api_error::download_stopped : res;
data.insert(data.end(), buffer.begin(), buffer.end());
return api_error::success;
}
void ring_buffer_open_file::reader_thread() {
unique_mutex_lock chunk_lock(chunk_mtx_);
auto next_chunk = ring_pos_;
chunk_notify_.notify_all();
chunk_lock.unlock();
while (not stop_requested_) {
chunk_lock.lock();
next_chunk = next_chunk + 1U > ring_end_ ? ring_begin_ : next_chunk + 1U;
const auto check_and_wait = [this, &chunk_lock, &next_chunk]() {
if (stop_requested_) {
chunk_notify_.notify_all();
chunk_lock.unlock();
return;
}
if (get_read_state().all()) {
chunk_notify_.wait(chunk_lock);
next_chunk = ring_pos_;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
};
if (ring_state_[next_chunk % ring_state_.size()]) {
check_and_wait();
continue;
}
chunk_notify_.notify_all();
chunk_lock.unlock();
download_chunk(next_chunk, true);
}
event_system::instance().raise<download_end>(get_api_path(), source_path_,
api_error::download_stopped);
}
void ring_buffer_open_file::reverse(std::size_t count) {
update_position(count, false);
}
void ring_buffer_open_file::set(std::size_t first_chunk,
std::size_t current_chunk) {
mutex_lock chunk_lock(chunk_mtx_);
if (first_chunk >= total_chunks_) {
chunk_notify_.notify_all();
throw std::runtime_error("first chunk must be less than total chunks");
}
ring_begin_ = first_chunk;
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
if (current_chunk > ring_end_) {
chunk_notify_.notify_all();
throw std::runtime_error(
"current chunk must be less than or equal to last chunk");
}
ring_pos_ = current_chunk;
ring_state_.set(0U, ring_state_.size(), true);
chunk_notify_.notify_all();
}
void ring_buffer_open_file::set_api_path(const std::string &api_path) {
mutex_lock chunk_lock(chunk_mtx_);
open_file_base::set_api_path(api_path);
chunk_notify_.notify_all();
}
void ring_buffer_open_file::update_position(std::size_t count,
bool is_forward) {
mutex_lock chunk_lock(chunk_mtx_);
if (is_forward) {
if ((ring_pos_ + count) > (total_chunks_ - 1U)) {
count = (total_chunks_ - 1U) - ring_pos_;
}
} else {
count = std::min(ring_pos_, count);
}
if (is_forward ? (ring_pos_ + count) <= ring_end_
: (ring_pos_ - count) >= ring_begin_) {
ring_pos_ += is_forward ? count : -count;
} else {
auto delta = is_forward ? count - (ring_end_ - ring_pos_)
: count - (ring_pos_ - ring_begin_);
if (delta >= ring_state_.size()) {
ring_state_.set(0U, ring_state_.size(), false);
ring_pos_ += is_forward ? count : -count;
ring_begin_ += is_forward ? delta : -delta;
} else {
for (std::size_t idx = 0U; idx < delta; ++idx) {
if (is_forward) {
ring_state_[(ring_begin_ + idx) % ring_state_.size()] = false;
} else {
ring_state_[(ring_end_ - idx) % ring_state_.size()] = false;
}
}
ring_begin_ += is_forward ? delta : -delta;
ring_pos_ += is_forward ? count : -count;
}
ring_end_ =
std::min(total_chunks_ - 1U, ring_begin_ + ring_state_.size() - 1U);
}
chunk_notify_.notify_all();
auto ring_buffer_open_file::use_buffer(
std::size_t /* chunk */,
std::function<api_error(data_buffer &)> func) -> api_error {
data_buffer buffer;
return func(buffer);
}
} // namespace repertory