1
0

Upload manager changes

This commit is contained in:
Scott E. Graves
2017-02-25 12:42:37 -06:00
parent 6135c1ffc9
commit 39dcd63279
4 changed files with 299 additions and 18 deletions

View File

@@ -7,11 +7,13 @@ using namespace Sia::Api;
#define TABLE_CREATE L"create table if not exists %s (%s);"
#define UPLOAD_TABLE L"upload_table"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, siadrive_path text unique not null, status integer not null"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, status integer not null"
#define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;"
#define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;"
#define UPDATE_STATUS "update upload_table set status=@status where sia_path=@sia_path;"
#define INSERT_UPLOAD "insert into upload_table (sia_path, status, file_path) values (@sia_path, @status, @file_path)"
template <typename... Ts>
String fmt(const String &fmt, Ts... vs)
@@ -94,7 +96,29 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath)));
if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
{
// TODO Change status to 'Queued'
try
{
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
update.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
if (update.exec() != 1)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(update.getErrorMsg()).m_psz)));
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
}
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadAddedToQueue(siaPath, filePath)));
}
}
catch (SQLite::Exception e)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(QueueUploadFailed(siaPath, filePath, CA2W(e.getErrorStr()).m_psz)));
}
}
else
{
@@ -169,6 +193,8 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
{
std::string tempSiaPath = query.getColumn(1);
String siaPath = CA2W(tempSiaPath.c_str()).m_psz;
std::string tempFilePath = query.getColumn(2);
String filePath = CA2W(tempSiaPath.c_str()).m_psz;
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(2).getUInt());
auto fileList = fileTree->GetFileList();
@@ -183,12 +209,46 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
}
else if (uploadStatus == UploadStatus::Modified)
{
// delete existing, change status to queued
try
{
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
update.bind("@status", static_cast<unsigned>(UploadStatus::Modified));
if (update.exec() != 1)
{
// TODO Delete from Sia first
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(update.getErrorMsg()).m_psz)));
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadModifiedInQueue(siaPath, filePath)));
}
}
catch (SQLite::Exception e)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Modified, CA2W(e.getErrorStr()).m_psz)));
}
}
else if ((*it)->GetUploadProgress() >= 100)
{
// upload complete - change status
try
{
SQLite::Statement update(_uploadDatabase, UPDATE_STATUS);
update.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
update.bind("@status", static_cast<unsigned>(UploadStatus::Complete));
if (update.exec() != 1)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(update.getErrorMsg()).m_psz)));
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(UploadComplete(siaPath, filePath)));
}
}
catch (SQLite::Exception e)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ModifyUploadInQueueFailed(siaPath, filePath, UploadStatus::Complete, CA2W(e.getErrorStr()).m_psz)));
}
}
else
{
@@ -294,6 +354,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
// start again later
std::lock_guard<std::mutex> l(_uploadMutex);
// TODO Handle SQLite::Exception
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS);
query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
@@ -306,7 +367,6 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
if (uploadStatus == UploadStatus::Uploading)
{
// set to modified
// update file path
}
}
else
@@ -324,18 +384,25 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str());
// Add to db
/*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD);
addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz);
addOrUpdate.bind("@siadrive_path", CW2A(siaDriveFilePath.c_str()).m_psz);
addOrUpdate.bind("@status", static_cast<unsigned>(UploadStatus::Copying));*/
// TODO Handle SQLite::Exception
SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD);
insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
insert.bind("@file_path", CW2A(filePath.c_str()).m_psz);
insert.bind("@status", static_cast<unsigned>(UploadStatus::Copying));
if (insert.exec() != 1)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(insert.getErrorMsg()).m_psz)));
ret = UploadError::DatabaseError;
}
else
{
// Queue file upload operation
std::lock_guard<std::mutex> l2(_fileQueueMutex);
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath)));
}
}
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(SourceFileNotFound(siaPath, filePath)));

View File

@@ -25,7 +25,8 @@ public:
enum class _UploadError
{
Success,
SourceFileNotFound
SourceFileNotFound,
DatabaseError
};
private:
@@ -108,6 +109,172 @@ public:
}
};
class QueueUploadFailed :
public CEvent
{
public:
QueueUploadFailed(const String& siaPath, const String& filePath, const String& errorMsg) :
_siaPath(siaPath),
_filePath(filePath),
_errorMsg(errorMsg)
{
}
public:
virtual ~QueueUploadFailed()
{
}
private:
const String _siaPath;
const String _filePath;
const String _errorMsg;
public:
virtual String GetSingleLineMessage() const override
{
return L"QueueUploadFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|MSG|" + _errorMsg;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new QueueUploadFailed(_siaPath, _filePath, _errorMsg));
}
};
class UploadAddedToQueue :
public CEvent
{
public:
UploadAddedToQueue(const String& siaPath, const String& filePath) :
_siaPath(siaPath),
_filePath(filePath)
{
}
public:
virtual ~UploadAddedToQueue()
{
}
private:
const String _siaPath;
const String _filePath;
public:
virtual String GetSingleLineMessage() const override
{
return L"UploadAddedToQueue|SP|" + _siaPath + L"|FP|" + _filePath;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new UploadAddedToQueue(_siaPath, _filePath));
}
};
class UploadComplete :
public CEvent
{
public:
UploadComplete(const String& siaPath, const String& filePath) :
_siaPath(siaPath),
_filePath(filePath)
{
}
public:
virtual ~UploadComplete()
{
}
private:
const String _siaPath;
const String _filePath;
public:
virtual String GetSingleLineMessage() const override
{
return L"UploadComplete|SP|" + _siaPath + L"|FP|" + _filePath;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new UploadComplete(_siaPath, _filePath));
}
};
class UploadModifiedInQueue :
public CEvent
{
public:
UploadModifiedInQueue(const String& siaPath, const String& filePath) :
_siaPath(siaPath),
_filePath(filePath)
{
}
public:
virtual ~UploadModifiedInQueue()
{
}
private:
const String _siaPath;
const String _filePath;
public:
virtual String GetSingleLineMessage() const override
{
return L"UploadModifiedInQueue|SP|" + _siaPath + L"|FP|" + _filePath;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new UploadModifiedInQueue(_siaPath, _filePath));
}
};
class ModifyUploadInQueueFailed :
public CEvent
{
public:
ModifyUploadInQueueFailed(const String& siaPath, const String& filePath, const UploadStatus& uploadStatus, const String& errorMsg) :
_siaPath(siaPath),
_filePath(filePath),
_uploadStatus(uploadStatus),
_errorMsg(errorMsg)
{
}
public:
virtual ~ModifyUploadInQueueFailed()
{
}
private:
const String _siaPath;
const String _filePath;
const UploadStatus _uploadStatus;
const String _errorMsg;
public:
virtual String GetSingleLineMessage() const override
{
return L"ModifyUploadInQueueFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|ST|" + CUploadManager::UploadStatusToString(_uploadStatus) + L"|MSG|" + _errorMsg;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new ModifyUploadInQueueFailed(_siaPath, _filePath, _uploadStatus, _errorMsg));
}
};
class CreatingTemporarySiaDriveFileFailed :
public CEvent
{
@@ -282,6 +449,40 @@ public:
}
};
class DatabaseInsertFailed :
public CEvent
{
public:
DatabaseInsertFailed(const String& siaPath, const String& filePath, const String& errorMessage) :
_siaPath(siaPath),
_filePath(filePath),
_errorMessage(errorMessage)
{
}
public:
virtual ~DatabaseInsertFailed()
{
}
private:
const String _siaPath;
const String _filePath;
const String _errorMessage;
public:
virtual String GetSingleLineMessage() const override
{
return L"DatabaseInsertFailed|SP|" + _siaPath + L"|FP|" + _filePath + L"|MSG|" + _errorMessage;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new DatabaseInsertFailed(_siaPath, _filePath, _errorMessage));
}
};
class ExistingUploadFound :
public CEvent
{

View File

@@ -191,6 +191,12 @@
</ClCompile>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\3rd-party\SRombauts-SQLiteCpp-f69986a\build\sqlite3\sqlite3.vcxproj">
<Project>{92ef9cae-3f0c-31d5-9556-62586cc5072d}</Project>
</ProjectReference>
<ProjectReference Include="..\3rd-party\SRombauts-SQLiteCpp-f69986a\build\SQLiteCpp.vcxproj">
<Project>{be7ee71d-6608-36dd-9687-d84aae20c0a3}</Project>
</ProjectReference>
<ProjectReference Include="..\SiaDrive.Api\SiaDrive.Api.vcxproj">
<Project>{aa357195-d159-4cb7-bfff-cc8666d7ad77}</Project>
</ProjectReference>

View File

@@ -127,21 +127,28 @@ namespace UnitTests
Assert::IsTrue(_eventAccumulator.WaitForEvent<NewUploadAdded>(5000));
Assert::IsTrue(_eventAccumulator.WaitForEvent<CreatingTemporarySiaDriveFile>(5000));
Assert::IsTrue(_eventAccumulator.WaitForEvent<RenamingTemporarySiaDriveFile>(5000));
Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000));
Assert::IsFalse(_eventAccumulator.Contains<DeleteSiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<RenamingTemporarySiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<DeleteTemporarySiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<CreatingTemporarySiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<CreatingTemporarySiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<QueueUploadFailed>());
Assert::IsFalse(_eventAccumulator.Contains<ExistingUploadFound>());
Assert::IsTrue(uploadManager.GetUploadStatus(L"/test1/test.rtf") == UploadStatus::Queued);
}
catch (SQLite::Exception e)
{
siad->Stop();
CEventSystem::EventSystem.Stop();
_eventAccumulator.Clear();
Assert::Fail(CA2W(e.getErrorStr()));
}
siad->Stop();
CEventSystem::EventSystem.Stop();
_eventAccumulator.Clear();
}
};