From 39dcd632793718e9d7c8c7eed663806618cebac2 Mon Sep 17 00:00:00 2001 From: "Scott E. Graves" Date: Sat, 25 Feb 2017 12:42:37 -0600 Subject: [PATCH] Upload manager changes --- SiaDrive.Api/UploadManager.cpp | 99 +++++++++++++--- SiaDrive.Api/UploadManager.h | 203 +++++++++++++++++++++++++++++++- UnitTests/UnitTests.vcxproj | 6 + UnitTests/UploadManagerTest.cpp | 9 +- 4 files changed, 299 insertions(+), 18 deletions(-) diff --git a/SiaDrive.Api/UploadManager.cpp b/SiaDrive.Api/UploadManager.cpp index ca642e0..15cc8a2 100644 --- a/SiaDrive.Api/UploadManager.cpp +++ b/SiaDrive.Api/UploadManager.cpp @@ -7,11 +7,13 @@ using namespace Sia::Api; #define TABLE_CREATE L"create table if not exists %s (%s);" #define UPLOAD_TABLE L"upload_table" -#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, siadrive_path text unique not null, status integer not null" +#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, status integer not null" #define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;" #define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;" +#define UPDATE_STATUS "update upload_table set status=@status where sia_path=@sia_path;" +#define INSERT_UPLOAD "insert into upload_table (sia_path, status, file_path) values (@sia_path, @status, @file_path)" template String fmt(const String &fmt, Ts... vs) @@ -94,7 +96,29 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath))); if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS)) { - // TODO Change status to 'Queued' + try + { + std::lock_guard l(_uploadMutex); + SQLite::Statement update(_uploadDatabase, UPDATE_STATUS); + update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); + update.bind("@status", static_cast(UploadStatus::Queued)); + if (update.exec() != 1) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(update.getErrorMsg()).m_psz))); + if (!RetryDeleteFileIfExists(siaDriveFilePath)) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath))); + } + } + else + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadAddedToQueue(siaPath, filePath))); + } + } + catch (SQLite::Exception e) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(e.getErrorStr()).m_psz))); + } } else { @@ -169,6 +193,8 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig { std::string tempSiaPath = query.getColumn(1); String siaPath = CA2W(tempSiaPath.c_str()).m_psz; + std::string tempFilePath = query.getColumn(2); + String filePath = CA2W(tempSiaPath.c_str()).m_psz; UploadStatus uploadStatus = static_cast(query.getColumn(2).getUInt()); auto fileList = fileTree->GetFileList(); @@ -183,12 +209,46 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig } else if (uploadStatus == UploadStatus::Modified) { - // delete existing, change status to queued + try + { + SQLite::Statement update(_uploadDatabase, UPDATE_STATUS); + update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); + update.bind("@status", static_cast(UploadStatus::Modified)); + if (update.exec() != 1) + { + // TODO Delete from Sia first + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(update.getErrorMsg()).m_psz))); + } + else + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadModifiedInQueue(siaPath, filePath))); + } + } + catch (SQLite::Exception e) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(e.getErrorStr()).m_psz))); + } } else if ((*it)->GetUploadProgress() >= 100) { - // upload complete - change status - + try + { + SQLite::Statement update(_uploadDatabase, UPDATE_STATUS); + update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); + update.bind("@status", static_cast(UploadStatus::Complete)); + if (update.exec() != 1) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(update.getErrorMsg()).m_psz))); + } + else + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadComplete(siaPath, filePath))); + } + } + catch (SQLite::Exception e) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(e.getErrorStr()).m_psz))); + } } else { @@ -294,6 +354,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath) // start again later std::lock_guard l(_uploadMutex); + // TODO Handle SQLite::Exception SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS); query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); query.bind("@status1", static_cast(UploadStatus::Uploading)); @@ -306,7 +367,6 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath) if (uploadStatus == UploadStatus::Uploading) { // set to modified - // update file path } } else @@ -324,16 +384,23 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath) PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str()); // Add to db - /*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD); - addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); - addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz); - addOrUpdate.bind("@siadrive_path", CW2A(siaDriveFilePath.c_str()).m_psz); - addOrUpdate.bind("@status", static_cast(UploadStatus::Copying));*/ - - // Queue file upload operation - std::lock_guard l2(_fileQueueMutex); - _fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); }); - CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath))); + // TODO Handle SQLite::Exception + SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD); + insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); + insert.bind("@file_path", CW2A(filePath.c_str()).m_psz); + insert.bind("@status", static_cast(UploadStatus::Copying)); + if (insert.exec() != 1) + { + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(insert.getErrorMsg()).m_psz))); + ret = UploadError::DatabaseError; + } + else + { + // Queue file upload operation + std::lock_guard l2(_fileQueueMutex); + _fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); }); + CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath))); + } } } else diff --git a/SiaDrive.Api/UploadManager.h b/SiaDrive.Api/UploadManager.h index 6094a21..3e13566 100644 --- a/SiaDrive.Api/UploadManager.h +++ b/SiaDrive.Api/UploadManager.h @@ -25,7 +25,8 @@ public: enum class _UploadError { Success, - SourceFileNotFound + SourceFileNotFound, + DatabaseError }; private: @@ -108,6 +109,172 @@ public: } }; +class QueueUploadFailed : + public CEvent +{ +public: + QueueUploadFailed(const String& siaPath, const String& filePath, const String& errorMsg) : + _siaPath(siaPath), + _filePath(filePath), + _errorMsg(errorMsg) + { + + } + +public: + virtual ~QueueUploadFailed() + { + } + +private: + const String _siaPath; + const String _filePath; + const String _errorMsg; + +public: + virtual String GetSingleLineMessage() const override + { + return L"QueueUploadFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|MSG|" + _errorMsg; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new QueueUploadFailed(_siaPath, _filePath, _errorMsg)); + } +}; + +class UploadAddedToQueue : + public CEvent +{ +public: + UploadAddedToQueue(const String& siaPath, const String& filePath) : + _siaPath(siaPath), + _filePath(filePath) + { + + } + +public: + virtual ~UploadAddedToQueue() + { + } + +private: + const String _siaPath; + const String _filePath; + +public: + virtual String GetSingleLineMessage() const override + { + return L"UploadAddedToQueue|SP|" + _siaPath + L"|FP|" + _filePath; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new UploadAddedToQueue(_siaPath, _filePath)); + } +}; + +class UploadComplete : + public CEvent +{ +public: + UploadComplete(const String& siaPath, const String& filePath) : + _siaPath(siaPath), + _filePath(filePath) + { + + } + +public: + virtual ~UploadComplete() + { + } + +private: + const String _siaPath; + const String _filePath; + +public: + virtual String GetSingleLineMessage() const override + { + return L"UploadComplete|SP|" + _siaPath + L"|FP|" + _filePath; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new UploadComplete(_siaPath, _filePath)); + } +}; + +class UploadModifiedInQueue : + public CEvent +{ +public: + UploadModifiedInQueue(const String& siaPath, const String& filePath) : + _siaPath(siaPath), + _filePath(filePath) + { + + } + +public: + virtual ~UploadModifiedInQueue() + { + } + +private: + const String _siaPath; + const String _filePath; + +public: + virtual String GetSingleLineMessage() const override + { + return L"UploadModifiedInQueue|SP|" + _siaPath + L"|FP|" + _filePath; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new UploadModifiedInQueue(_siaPath, _filePath)); + } +}; + +class ModifyUploadInQueueFailed : + public CEvent +{ +public: + ModifyUploadInQueueFailed(const String& siaPath, const String& filePath, const UploadStatus& uploadStatus, const String& errorMsg) : + _siaPath(siaPath), + _filePath(filePath), + _uploadStatus(uploadStatus), + _errorMsg(errorMsg) + { + + } + +public: + virtual ~ModifyUploadInQueueFailed() + { + } + +private: + const String _siaPath; + const String _filePath; + const UploadStatus _uploadStatus; + const String _errorMsg; + +public: + virtual String GetSingleLineMessage() const override + { + return L"ModifyUploadInQueueFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|ST|" + CUploadManager::UploadStatusToString(_uploadStatus) + L"|MSG|" + _errorMsg; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new ModifyUploadInQueueFailed(_siaPath, _filePath, _uploadStatus, _errorMsg)); + } +}; + class CreatingTemporarySiaDriveFileFailed : public CEvent { @@ -282,6 +449,40 @@ public: } }; +class DatabaseInsertFailed : + public CEvent +{ +public: + DatabaseInsertFailed(const String& siaPath, const String& filePath, const String& errorMessage) : + _siaPath(siaPath), + _filePath(filePath), + _errorMessage(errorMessage) + { + + } + +public: + virtual ~DatabaseInsertFailed() + { + } + +private: + const String _siaPath; + const String _filePath; + const String _errorMessage; + +public: + virtual String GetSingleLineMessage() const override + { + return L"DatabaseInsertFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|MSG|" + _errorMessage; + } + + virtual std::shared_ptr Clone() const override + { + return std::shared_ptr(new DatabaseInsertFailed(_siaPath, _filePath, _errorMessage)); + } +}; + class ExistingUploadFound : public CEvent { diff --git a/UnitTests/UnitTests.vcxproj b/UnitTests/UnitTests.vcxproj index 71026a1..5b29185 100644 --- a/UnitTests/UnitTests.vcxproj +++ b/UnitTests/UnitTests.vcxproj @@ -191,6 +191,12 @@ + + {92ef9cae-3f0c-31d5-9556-62586cc5072d} + + + {be7ee71d-6608-36dd-9687-d84aae20c0a3} + {aa357195-d159-4cb7-bfff-cc8666d7ad77} diff --git a/UnitTests/UploadManagerTest.cpp b/UnitTests/UploadManagerTest.cpp index af1aca6..5c76057 100644 --- a/UnitTests/UploadManagerTest.cpp +++ b/UnitTests/UploadManagerTest.cpp @@ -127,21 +127,28 @@ namespace UnitTests Assert::IsTrue(_eventAccumulator.WaitForEvent(5000)); Assert::IsTrue(_eventAccumulator.WaitForEvent(5000)); Assert::IsTrue(_eventAccumulator.WaitForEvent(5000)); + Assert::IsTrue(_eventAccumulator.WaitForEvent(5000)); Assert::IsFalse(_eventAccumulator.Contains()); Assert::IsFalse(_eventAccumulator.Contains()); Assert::IsFalse(_eventAccumulator.Contains()); Assert::IsFalse(_eventAccumulator.Contains()); - Assert::IsFalse(_eventAccumulator.Contains()); + Assert::IsFalse(_eventAccumulator.Contains()); + Assert::IsFalse(_eventAccumulator.Contains()); + + Assert::IsTrue(uploadManager.GetUploadStatus(L"/test1/test.rtf") == UploadStatus::Queued); } catch (SQLite::Exception e) { siad->Stop(); CEventSystem::EventSystem.Stop(); + _eventAccumulator.Clear(); + Assert::Fail(CA2W(e.getErrorStr())); } siad->Stop(); CEventSystem::EventSystem.Stop(); + _eventAccumulator.Clear(); } };