#include "stdafx.h" #include "UploadManager.h" #include "SiaDriveConfig.h" #include "SiaApi.h" #include "EventSystem.h" using namespace Sia::Api; #define TABLE_CREATE L"create table if not exists %s (%s);" #define UPLOAD_TABLE L"upload_table" #define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, siadrive_path text unique not null, status integer not null" #define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;" #define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;" template String fmt(const String &fmt, Ts... vs) { size_t required = _sntprintf(nullptr, 0, fmt.c_str(), vs...); String ret; ret.resize(required); _sntprintf(&ret[0], required, fmt.c_str(), vs...); return ret; } static void CreateTableIfNotFound(SQLite::Database* database, const String& tableName, const String& columns) { String sqlCreate = fmt(TABLE_CREATE, &tableName[0], &columns[0]); database->exec(CW2A(sqlCreate.c_str())); } String CUploadManager::UploadStatusToString(const UploadStatus& uploadStatus) { switch (uploadStatus) { case UploadStatus::Complete: return L"Complete"; case UploadStatus::Copying: return L"Copying"; case UploadStatus::Error: return L"Error"; case UploadStatus::Modified: return L"Modified"; case UploadStatus::NotFound: return L"Not Found"; case UploadStatus::Queued: return L"Queued"; case UploadStatus::Uploading: return L"Uploading"; default: return L"!!Not Defined!!"; } } CUploadManager::CUploadManager(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) : CAutoThread(siaCurl, siaDriveConfig), _uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE), _siaDriveConfig(siaDriveConfig), _fileThread(siaCurl, siaDriveConfig, [this](const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { this->FileThreadCallback(siaCurl, siaDriveConfig); }) { CreateTableIfNotFound(&_uploadDatabase, UPLOAD_TABLE, UPLOAD_TABLE_COLUMNS); StartAutoThread(); _fileThread.StartAutoThread(); } CUploadManager::~CUploadManager() { _fileThread.StopAutoThread(); StopAutoThread(); } void CUploadManager::FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath) { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath))); if (RetryableAction(::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS)) { // Delete existing '.siadrive' file, if found // !!Should never come here. If so, there was a problem with startup clean-up if (!RetryDeleteFileIfExists(siaDriveFilePath)) { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath))); } // Rename '.siadrive.temp' to '.siadrive' CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath))); if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS)) { // TODO Change status to 'Queued' } else { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath, siaDriveFilePath))); if (!RetryDeleteFileIfExists(tempSourcePath)) { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath))); } if (!RetryDeleteFileIfExists(siaDriveFilePath)) { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath))); } // Requeue AddOrUpdate(siaPath, filePath); } } else { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath))); // If temp copy fails, try to delete // If partial copy and file is unable to be deleted, log warning if (!RetryDeleteFileIfExists(tempSourcePath)) { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath))); } // Requeue AddOrUpdate(siaPath, filePath); } } void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { std::function nextFile = nullptr; { std::lock_guard l(_fileQueueMutex); if (_fileQueue.size()) { nextFile = _fileQueue.front(); _fileQueue.pop_front(); } } if (nextFile) { nextFile(); } } void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { bool processNext = true; try { CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig)); json result; if (ApiSuccess(siaCurl.Get(L"/renter/files", result))) { // Lock here - if file is modified again before a prior upload is complete, delete it and // start again later std::lock_guard l(_uploadMutex); SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS); query.bind("@status1", static_cast(UploadStatus::Uploading)); query.bind("@status2", static_cast(UploadStatus::Modified)); fileTree->BuildTree(result); if (query.executeStep()) { std::string tempSiaPath = query.getColumn(1); String siaPath = CA2W(tempSiaPath.c_str()).m_psz; UploadStatus uploadStatus = static_cast(query.getColumn(2).getUInt()); auto fileList = fileTree->GetFileList(); auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr) { return ptr->GetSiaPath() == siaPath; }); if (it == fileList.end()) { // error condition - should always exist. delete from db and log warning, but continue processing } else if (uploadStatus == UploadStatus::Modified) { // delete existing, change status to queued } else if ((*it)->GetUploadProgress() >= 100) { // upload complete - change status } else { // upload still active processNext = false; } } } else { // error condition - host down? processNext = false; } } catch (const SQLite::Exception& e) { // error condition - database not initialized (i.e. no table)? std::string msg = e.what(); processNext = false; } if (processNext) { try { SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS); query.bind("@status", static_cast(UploadStatus::Queued)); // Lock here - if file is modified again before a prior upload is complete, delete it and // start again later if (query.executeStep()) { String siaPath = CA2W(query.getColumn(1)).m_psz; String tempFilePath; json response; SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", tempFilePath} }, response); if (ApiSuccess(cerror)) { // TODO Update status in DB } } } catch (const SQLite::Exception& e) { // error condition std::string msg = e.what(); } } } UploadStatus CUploadManager::GetUploadStatus(const String& siaPath) { UploadStatus uploadStatus = UploadStatus::NotFound; SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH); query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); if (query.executeStep()) { uploadStatus = static_cast(static_cast(query.getColumn(2))); } return uploadStatus; } // The real source file is copied to a hidden file. The hidden filename is constructed by generating an SHA25 hash of the // source path (all lowercase). '.siadrive.temp' will be used as the extension. After copy is successful, the extension // is renamed to '.siadrive'. '.siadrive' files will be hidden/system and used by the Dokan API for file i/o until Sia upload // is complete. If a change occurs, the file will be deleted from Sia (cancelling the in-progress upload) and renamed to the // real source path. The process will then start over again as if the file was new. // If the file has been fully uploaded, the hidden file should not exist, so rename will not occur; however, the file // will be deleted from Sia and treated as new. // Uploads will always use the real file. User i/o will occur against the hidden file only temporarily. Since upload will take // longer to handle than a normal file copy, this seems to be the best compromise for performance. // // ERROR SCENARIOS // While copy is active, point to normal file. // After copy, use temp file as real file until upload is complete // This allows modifications to the file to occur in a more timely manner. // Error Scenarios: // Crash before db update to status copying - file will be re-uploaded automatically, if complete; otherwise, deleted // Need to keep track of files as being copied and then there status // Crash before copy begins - on startup, check for copying status with no .siadrive // Crash during copy - on startup, check for copying status and delete .siadrive // Crash after copy but before db update - on startup, check for copying status and delete .siadrive UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath) { UploadError ret = UploadError::Success; // Relative to absolute and grab parent folder of source String rootPath; { String temp; if (::PathIsRelative(filePath.c_str())) { temp.resize(MAX_PATH + 1); filePath = _wfullpath(&temp[0], filePath.c_str(), MAX_PATH); } temp = filePath; ::PathRemoveFileSpec(&temp[0]); rootPath = temp; } if (::PathFileExists(filePath.c_str())) { // Lock here - if file is modified again before a prior upload is complete, delete it and // start again later std::lock_guard l(_uploadMutex); SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS); query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); query.bind("@status1", static_cast(UploadStatus::Uploading)); query.bind("@status2", static_cast(UploadStatus::Modified)); // Check copying if (query.executeStep()) { UploadStatus uploadStatus = static_cast(static_cast(query.getColumn(2))); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ExistingUploadFound(siaPath, filePath, uploadStatus))); if (uploadStatus == UploadStatus::Uploading) { // set to modified // update file path } } else { // Strip drive specification (i.e. C:\) // TODO If mount to drive is ever enabled, this will need to change String siaDriveFileName = GenerateSha256(&filePath[3]) + L".siadrive"; String siaDriveFilePath; siaDriveFilePath.resize(MAX_PATH + 1); PathCombine(&siaDriveFilePath[0], rootPath.c_str(), siaDriveFileName.c_str()); String tempSourcePath; tempSourcePath.resize(MAX_PATH + 1); PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str()); // Add to db /*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD); addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz); addOrUpdate.bind("@siadrive_path", CW2A(siaDriveFilePath.c_str()).m_psz); addOrUpdate.bind("@status", static_cast(UploadStatus::Copying));*/ // Queue file upload operation std::lock_guard l2(_fileQueueMutex); _fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); }); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath))); } } else { CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(SourceFileNotFound(siaPath, filePath))); ret = UploadError::SourceFileNotFound; } return ret; } void CUploadManager::PurgeCompleteStatus() { } void CUploadManager::PurgeErrorStatus() { } UploadError CUploadManager::Remove(const String& siaPath) { UploadError ret = UploadError::Success; return ret; }