1
0
This repository has been archived on 2025-07-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
siadrive/SiaDrive.Api/UploadManager.cpp
Scott E. Graves 6135c1ffc9 Comments
2017-02-24 14:28:50 -06:00

362 lines
12 KiB
C++

#include "stdafx.h"
#include "UploadManager.h"
#include "SiaDriveConfig.h"
#include "SiaApi.h"
#include "EventSystem.h"
using namespace Sia::Api;
#define TABLE_CREATE L"create table if not exists %s (%s);"
#define UPLOAD_TABLE L"upload_table"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, siadrive_path text unique not null, status integer not null"
#define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;"
#define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;"
template <typename... Ts>
String fmt(const String &fmt, Ts... vs)
{
size_t required = _sntprintf(nullptr, 0, fmt.c_str(), vs...);
String ret;
ret.resize(required);
_sntprintf(&ret[0], required, fmt.c_str(), vs...);
return ret;
}
static void CreateTableIfNotFound(SQLite::Database* database, const String& tableName, const String& columns)
{
String sqlCreate = fmt(TABLE_CREATE, &tableName[0], &columns[0]);
database->exec(CW2A(sqlCreate.c_str()));
}
String CUploadManager::UploadStatusToString(const UploadStatus& uploadStatus)
{
switch (uploadStatus)
{
case UploadStatus::Complete:
return L"Complete";
case UploadStatus::Copying:
return L"Copying";
case UploadStatus::Error:
return L"Error";
case UploadStatus::Modified:
return L"Modified";
case UploadStatus::NotFound:
return L"Not Found";
case UploadStatus::Queued:
return L"Queued";
case UploadStatus::Uploading:
return L"Uploading";
default:
return L"!!Not Defined!!";
}
}
CUploadManager::CUploadManager(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) :
CAutoThread(siaCurl, siaDriveConfig),
_uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE),
_siaDriveConfig(siaDriveConfig),
_fileThread(siaCurl, siaDriveConfig, [this](const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { this->FileThreadCallback(siaCurl, siaDriveConfig); })
{
CreateTableIfNotFound(&_uploadDatabase, UPLOAD_TABLE, UPLOAD_TABLE_COLUMNS);
StartAutoThread();
_fileThread.StartAutoThread();
}
CUploadManager::~CUploadManager()
{
_fileThread.StopAutoThread();
StopAutoThread();
}
void CUploadManager::FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath)));
if (RetryableAction(::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
{
// Delete existing '.siadrive' file, if found
// !!Should never come here. If so, there was a problem with startup clean-up
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath)));
}
// Rename '.siadrive.temp' to '.siadrive'
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath)));
if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
{
// TODO Change status to 'Queued'
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath, siaDriveFilePath)));
if (!RetryDeleteFileIfExists(tempSourcePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
}
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath)));
}
// Requeue
AddOrUpdate(siaPath, filePath);
}
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
// If temp copy fails, try to delete
// If partial copy and file is unable to be deleted, log warning
if (!RetryDeleteFileIfExists(tempSourcePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
}
// Requeue
AddOrUpdate(siaPath, filePath);
}
}
void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
{
std::function<void()> nextFile = nullptr;
{
std::lock_guard<std::mutex> l(_fileQueueMutex);
if (_fileQueue.size())
{
nextFile = _fileQueue.front();
_fileQueue.pop_front();
}
}
if (nextFile)
{
nextFile();
}
}
void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
{
bool processNext = true;
try
{
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
json result;
if (ApiSuccess(siaCurl.Get(L"/renter/files", result)))
{
// Lock here - if file is modified again before a prior upload is complete, delete it and
// start again later
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
query.bind("@status2", static_cast<unsigned>(UploadStatus::Modified));
fileTree->BuildTree(result);
if (query.executeStep())
{
std::string tempSiaPath = query.getColumn(1);
String siaPath = CA2W(tempSiaPath.c_str()).m_psz;
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(2).getUInt());
auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
{
return ptr->GetSiaPath() == siaPath;
});
if (it == fileList.end())
{
// error condition - should always exist. delete from db and log warning, but continue processing
}
else if (uploadStatus == UploadStatus::Modified)
{
// delete existing, change status to queued
}
else if ((*it)->GetUploadProgress() >= 100)
{
// upload complete - change status
}
else
{
// upload still active
processNext = false;
}
}
}
else
{
// error condition - host down?
processNext = false;
}
}
catch (const SQLite::Exception& e)
{
// error condition - database not initialized (i.e. no table)?
std::string msg = e.what();
processNext = false;
}
if (processNext)
{
try
{
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
// Lock here - if file is modified again before a prior upload is complete, delete it and
// start again later
if (query.executeStep())
{
String siaPath = CA2W(query.getColumn(1)).m_psz;
String tempFilePath;
json response;
SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", tempFilePath} }, response);
if (ApiSuccess(cerror))
{
// TODO Update status in DB
}
}
}
catch (const SQLite::Exception& e)
{
// error condition
std::string msg = e.what();
}
}
}
UploadStatus CUploadManager::GetUploadStatus(const String& siaPath)
{
UploadStatus uploadStatus = UploadStatus::NotFound;
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH);
query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
if (query.executeStep())
{
uploadStatus = static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2)));
}
return uploadStatus;
}
// The real source file is copied to a hidden file. The hidden filename is constructed by generating an SHA25 hash of the
// source path (all lowercase). '.siadrive.temp' will be used as the extension. After copy is successful, the extension
// is renamed to '.siadrive'. '.siadrive' files will be hidden/system and used by the Dokan API for file i/o until Sia upload
// is complete. If a change occurs, the file will be deleted from Sia (cancelling the in-progress upload) and renamed to the
// real source path. The process will then start over again as if the file was new.
// If the file has been fully uploaded, the hidden file should not exist, so rename will not occur; however, the file
// will be deleted from Sia and treated as new.
// Uploads will always use the real file. User i/o will occur against the hidden file temporarily. Since upload will take
// longer to handle than a normal file copy, this seems to be the best compromise for performance.
//
// Error Scenarios:
// Crash before db update to status copying - file will be re-uploaded automatically, if complete; otherwise, deleted
// Crash before copy begins - on startup, check for copying status with no .siadrive
// Crash during copy - on startup, check for copying status and delete .siadrive
// Crash after copy but before db update - on startup, check for copying status and delete .siadrive
UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
{
UploadError ret = UploadError::Success;
// Relative to absolute and grab parent folder of source
String rootPath;
{
String temp;
if (::PathIsRelative(filePath.c_str()))
{
temp.resize(MAX_PATH + 1);
filePath = _wfullpath(&temp[0], filePath.c_str(), MAX_PATH);
}
temp = filePath;
::PathRemoveFileSpec(&temp[0]);
rootPath = temp;
}
if (::PathFileExists(filePath.c_str()))
{
// Lock here - if file is modified again before a prior upload is complete, delete it and
// start again later
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS);
query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
query.bind("@status2", static_cast<unsigned>(UploadStatus::Modified));
// Check copying
if (query.executeStep())
{
UploadStatus uploadStatus = static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2)));
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ExistingUploadFound(siaPath, filePath, uploadStatus)));
if (uploadStatus == UploadStatus::Uploading)
{
// set to modified
// update file path
}
}
else
{
// Strip drive specification (i.e. C:\)
// TODO If mount to drive is ever enabled, this will need to change
String siaDriveFileName = GenerateSha256(&filePath[3]) + L".siadrive";
String siaDriveFilePath;
siaDriveFilePath.resize(MAX_PATH + 1);
PathCombine(&siaDriveFilePath[0], rootPath.c_str(), siaDriveFileName.c_str());
String tempSourcePath;
tempSourcePath.resize(MAX_PATH + 1);
PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str());
// Add to db
/*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD);
addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz);
addOrUpdate.bind("@siadrive_path", CW2A(siaDriveFilePath.c_str()).m_psz);
addOrUpdate.bind("@status", static_cast<unsigned>(UploadStatus::Copying));*/
// Queue file upload operation
std::lock_guard<std::mutex> l2(_fileQueueMutex);
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath)));
}
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(SourceFileNotFound(siaPath, filePath)));
ret = UploadError::SourceFileNotFound;
}
return ret;
}
void CUploadManager::PurgeCompleteStatus()
{
}
void CUploadManager::PurgeErrorStatus()
{
}
UploadError CUploadManager::Remove(const String& siaPath)
{
UploadError ret = UploadError::Success;
return ret;
}