Upload manager changes
This commit is contained in:
@@ -15,6 +15,28 @@ using namespace Sia::Api;
|
||||
#define UPDATE_STATUS "update upload_table set status=@status where sia_path=@sia_path;"
|
||||
#define INSERT_UPLOAD "insert into upload_table (sia_path, status, file_path) values (@sia_path, @status, @file_path)"
|
||||
|
||||
#define SET_STATUS(status, success_event, fail_event)\
|
||||
bool statusUpdated = false;\
|
||||
try\
|
||||
{\
|
||||
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);\
|
||||
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);\
|
||||
update.bind("@status", static_cast<unsigned>(status));\
|
||||
if (update.exec() != 1)\
|
||||
{\
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(fail_event(siaPath, filePath, status, CA2W(update.getErrorMsg()).m_psz)));\
|
||||
}\
|
||||
else\
|
||||
{\
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(success_event(siaPath, filePath)));\
|
||||
statusUpdated = true;\
|
||||
}\
|
||||
}\
|
||||
catch (SQLite::Exception e)\
|
||||
{\
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(fail_event(siaPath, filePath, status, CA2W(e.getErrorStr()).m_psz)));\
|
||||
}
|
||||
|
||||
template <typename... Ts>
|
||||
String fmt(const String &fmt, Ts... vs)
|
||||
{
|
||||
@@ -80,6 +102,24 @@ CUploadManager::~CUploadManager()
|
||||
StopAutoThread();
|
||||
}
|
||||
|
||||
void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
|
||||
{
|
||||
std::function<void()> nextFile = nullptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_fileQueueMutex);
|
||||
if (_fileQueue.size())
|
||||
{
|
||||
nextFile = _fileQueue.front();
|
||||
_fileQueue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
if (nextFile)
|
||||
{
|
||||
nextFile();
|
||||
}
|
||||
}
|
||||
|
||||
void CUploadManager::FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath)));
|
||||
@@ -96,28 +136,11 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath)));
|
||||
if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
|
||||
{
|
||||
try
|
||||
std::lock_guard<std::mutex> l(_uploadMutex);
|
||||
SET_STATUS(UploadStatus::Queued, UploadAddedToQueue, ModifyUploadStatusFailed)
|
||||
if (statusUpdated && !RetryDeleteFileIfExists(siaDriveFilePath))
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_uploadMutex);
|
||||
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
|
||||
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||
update.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
|
||||
if (update.exec() != 1)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(update.getErrorMsg()).m_psz)));
|
||||
if (!RetryDeleteFileIfExists(siaDriveFilePath))
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadAddedToQueue(siaPath, filePath)));
|
||||
}
|
||||
}
|
||||
catch (SQLite::Exception e)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(e.getErrorStr()).m_psz)));
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -134,7 +157,6 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
||||
}
|
||||
|
||||
// Requeue
|
||||
AddOrUpdate(siaPath, filePath);
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -149,25 +171,6 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
||||
}
|
||||
|
||||
// Requeue
|
||||
AddOrUpdate(siaPath, filePath);
|
||||
}
|
||||
}
|
||||
|
||||
void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
|
||||
{
|
||||
std::function<void()> nextFile = nullptr;
|
||||
{
|
||||
std::lock_guard<std::mutex> l(_fileQueueMutex);
|
||||
if (_fileQueue.size())
|
||||
{
|
||||
nextFile = _fileQueue.front();
|
||||
_fileQueue.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
if (nextFile)
|
||||
{
|
||||
nextFile();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -209,46 +212,12 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
||||
}
|
||||
else if (uploadStatus == UploadStatus::Modified)
|
||||
{
|
||||
try
|
||||
{
|
||||
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
|
||||
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||
update.bind("@status", static_cast<unsigned>(UploadStatus::Modified));
|
||||
if (update.exec() != 1)
|
||||
{
|
||||
// TODO Delete from Sia first
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(update.getErrorMsg()).m_psz)));
|
||||
}
|
||||
else
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadModifiedInQueue(siaPath, filePath)));
|
||||
}
|
||||
}
|
||||
catch (SQLite::Exception e)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(e.getErrorStr()).m_psz)));
|
||||
}
|
||||
// TODO Delete from Sia first
|
||||
SET_STATUS(UploadStatus::Modified, UploadModifiedInQueue, ModifyUploadStatusFailed)
|
||||
}
|
||||
else if ((*it)->GetUploadProgress() >= 100)
|
||||
{
|
||||
try
|
||||
{
|
||||
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
|
||||
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||
update.bind("@status", static_cast<unsigned>(UploadStatus::Complete));
|
||||
if (update.exec() != 1)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(update.getErrorMsg()).m_psz)));
|
||||
}
|
||||
else
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadComplete(siaPath, filePath)));
|
||||
}
|
||||
}
|
||||
catch (SQLite::Exception e)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(e.getErrorStr()).m_psz)));
|
||||
}
|
||||
SET_STATUS(UploadStatus::Complete, UploadComplete, ModifyUploadStatusFailed)
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -372,7 +341,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
|
||||
else
|
||||
{
|
||||
// Strip drive specification (i.e. C:\)
|
||||
// TODO If mount to drive is ever enabled, this will need to change
|
||||
// TODO If mount to folder is ever enabled, this will need to change
|
||||
String siaDriveFileName = GenerateSha256(&filePath[3]) + L".siadrive";
|
||||
|
||||
String siaDriveFilePath;
|
||||
@@ -384,22 +353,29 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
|
||||
PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str());
|
||||
|
||||
// Add to db
|
||||
// TODO Handle SQLite::Exception
|
||||
SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD);
|
||||
insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||
insert.bind("@file_path", CW2A(filePath.c_str()).m_psz);
|
||||
insert.bind("@status", static_cast<unsigned>(UploadStatus::Copying));
|
||||
if (insert.exec() != 1)
|
||||
try
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(insert.getErrorMsg()).m_psz)));
|
||||
ret = UploadError::DatabaseError;
|
||||
SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD);
|
||||
insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||
insert.bind("@file_path", CW2A(filePath.c_str()).m_psz);
|
||||
insert.bind("@status", static_cast<unsigned>(UploadStatus::Copying));
|
||||
if (insert.exec() != 1)
|
||||
{
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(insert.getErrorMsg()).m_psz)));
|
||||
ret = UploadError::DatabaseError;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Queue file upload operation
|
||||
std::lock_guard<std::mutex> l2(_fileQueueMutex);
|
||||
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath)));
|
||||
}
|
||||
}
|
||||
else
|
||||
catch (SQLite::Exception e)
|
||||
{
|
||||
// Queue file upload operation
|
||||
std::lock_guard<std::mutex> l2(_fileQueueMutex);
|
||||
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath)));
|
||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(e.getErrorStr()).m_psz)));
|
||||
ret = UploadError::DatabaseError;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -109,40 +109,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class QueueUploadFailed :
|
||||
public CEvent
|
||||
{
|
||||
public:
|
||||
QueueUploadFailed(const String& siaPath, const String& filePath, const String& errorMsg) :
|
||||
_siaPath(siaPath),
|
||||
_filePath(filePath),
|
||||
_errorMsg(errorMsg)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
virtual ~QueueUploadFailed()
|
||||
{
|
||||
}
|
||||
|
||||
private:
|
||||
const String _siaPath;
|
||||
const String _filePath;
|
||||
const String _errorMsg;
|
||||
|
||||
public:
|
||||
virtual String GetSingleLineMessage() const override
|
||||
{
|
||||
return L"QueueUploadFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|MSG|" + _errorMsg;
|
||||
}
|
||||
|
||||
virtual std::shared_ptr<CEvent> Clone() const override
|
||||
{
|
||||
return std::shared_ptr<CEvent>(new QueueUploadFailed(_siaPath, _filePath, _errorMsg));
|
||||
}
|
||||
};
|
||||
|
||||
class UploadAddedToQueue :
|
||||
public CEvent
|
||||
{
|
||||
@@ -239,11 +205,11 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class ModifyUploadInQueueFailed :
|
||||
class ModifyUploadStatusFailed :
|
||||
public CEvent
|
||||
{
|
||||
public:
|
||||
ModifyUploadInQueueFailed(const String& siaPath, const String& filePath, const UploadStatus& uploadStatus, const String& errorMsg) :
|
||||
ModifyUploadStatusFailed(const String& siaPath, const String& filePath, const UploadStatus& uploadStatus, const String& errorMsg) :
|
||||
_siaPath(siaPath),
|
||||
_filePath(filePath),
|
||||
_uploadStatus(uploadStatus),
|
||||
@@ -253,7 +219,7 @@ public:
|
||||
}
|
||||
|
||||
public:
|
||||
virtual ~ModifyUploadInQueueFailed()
|
||||
virtual ~ModifyUploadStatusFailed()
|
||||
{
|
||||
}
|
||||
|
||||
@@ -266,12 +232,12 @@ private:
|
||||
public:
|
||||
virtual String GetSingleLineMessage() const override
|
||||
{
|
||||
return L"ModifyUploadInQueueFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|ST|" + CUploadManager::UploadStatusToString(_uploadStatus) + L"|MSG|" + _errorMsg;
|
||||
return L"ModifyUploadStatusFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|ST|" + CUploadManager::UploadStatusToString(_uploadStatus) + L"|MSG|" + _errorMsg;
|
||||
}
|
||||
|
||||
virtual std::shared_ptr<CEvent> Clone() const override
|
||||
{
|
||||
return std::shared_ptr<CEvent>(new ModifyUploadInQueueFailed(_siaPath, _filePath, _uploadStatus, _errorMsg));
|
||||
return std::shared_ptr<CEvent>(new ModifyUploadStatusFailed(_siaPath, _filePath, _uploadStatus, _errorMsg));
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -129,11 +129,12 @@ namespace UnitTests
|
||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<RenamingTemporarySiaDriveFile>(5000));
|
||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000));
|
||||
|
||||
Assert::IsFalse(_eventAccumulator.Contains<DatabaseInsertFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<DeleteSiaDriveFileFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<RenamingTemporarySiaDriveFileFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<DeleteTemporarySiaDriveFileFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<CreatingTemporarySiaDriveFileFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<QueueUploadFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<ModifyUploadStatusFailed>());
|
||||
Assert::IsFalse(_eventAccumulator.Contains<ExistingUploadFound>());
|
||||
|
||||
Assert::IsTrue(uploadManager.GetUploadStatus(L"/test1/test.rtf") == UploadStatus::Queued);
|
||||
|
Reference in New Issue
Block a user