Upload manager changes
This commit is contained in:
@@ -120,7 +120,7 @@ void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CUploadManager::FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
|
void CUploadManager::NewFileAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
|
||||||
{
|
{
|
||||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath)));
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath)));
|
||||||
if (RetryableAction(::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
|
if (RetryableAction(::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
|
||||||
@@ -138,7 +138,7 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
|||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> l(_uploadMutex);
|
std::lock_guard<std::mutex> l(_uploadMutex);
|
||||||
SET_STATUS(UploadStatus::Queued, UploadAddedToQueue, ModifyUploadStatusFailed)
|
SET_STATUS(UploadStatus::Queued, UploadAddedToQueue, ModifyUploadStatusFailed)
|
||||||
if (statusUpdated && !RetryDeleteFileIfExists(siaDriveFilePath))
|
if (statusUpdated && !RetryDeleteFileIfExists(tempSourcePath))
|
||||||
{
|
{
|
||||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
||||||
}
|
}
|
||||||
@@ -156,7 +156,7 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
|||||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath)));
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Requeue
|
// Requeued
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@@ -170,7 +170,7 @@ void CUploadManager::FileUploadAction(const String& siaPath, const String& fileP
|
|||||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Requeue
|
// Requeued
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,11 +194,9 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
|||||||
fileTree->BuildTree(result);
|
fileTree->BuildTree(result);
|
||||||
if (query.executeStep())
|
if (query.executeStep())
|
||||||
{
|
{
|
||||||
std::string tempSiaPath = query.getColumn(1);
|
String siaPath = CA2W(query.getColumn(1)).m_psz;
|
||||||
String siaPath = CA2W(tempSiaPath.c_str()).m_psz;
|
String filePath = CA2W(query.getColumn(2)).m_psz;
|
||||||
std::string tempFilePath = query.getColumn(2);
|
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(3).getUInt());
|
||||||
String filePath = CA2W(tempSiaPath.c_str()).m_psz;
|
|
||||||
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(2).getUInt());
|
|
||||||
|
|
||||||
auto fileList = fileTree->GetFileList();
|
auto fileList = fileTree->GetFileList();
|
||||||
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
|
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
|
||||||
@@ -212,9 +210,18 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
|||||||
}
|
}
|
||||||
else if (uploadStatus == UploadStatus::Modified)
|
else if (uploadStatus == UploadStatus::Modified)
|
||||||
{
|
{
|
||||||
// TODO Delete from Sia first
|
json response;
|
||||||
|
SiaCurlError cerror = siaCurl.Post(String(L"/renter/delete/") + siaPath, {}, response);
|
||||||
|
if (ApiSuccess(cerror))
|
||||||
|
{
|
||||||
|
// TODO validate response
|
||||||
SET_STATUS(UploadStatus::Queued, ModifiedUploadQueued, ModifyUploadStatusFailed)
|
SET_STATUS(UploadStatus::Queued, ModifiedUploadQueued, ModifyUploadStatusFailed)
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(FailedToDeleteFromSia(siaPath, filePath, cerror)));
|
||||||
|
}
|
||||||
|
}
|
||||||
else if ((*it)->GetUploadProgress() >= 100)
|
else if ((*it)->GetUploadProgress() >= 100)
|
||||||
{
|
{
|
||||||
SET_STATUS(UploadStatus::Complete, UploadComplete, ModifyUploadStatusFailed)
|
SET_STATUS(UploadStatus::Complete, UploadComplete, ModifyUploadStatusFailed)
|
||||||
@@ -243,6 +250,7 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
|||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> l(_uploadMutex);
|
||||||
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
|
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
|
||||||
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
|
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
|
||||||
|
|
||||||
@@ -251,13 +259,13 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
|
|||||||
if (query.executeStep())
|
if (query.executeStep())
|
||||||
{
|
{
|
||||||
String siaPath = CA2W(query.getColumn(1)).m_psz;
|
String siaPath = CA2W(query.getColumn(1)).m_psz;
|
||||||
String tempFilePath;
|
String filePath = CA2W(query.getColumn(2)).m_psz;
|
||||||
|
|
||||||
json response;
|
json response;
|
||||||
SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", tempFilePath} }, response);
|
SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", filePath} }, response);
|
||||||
if (ApiSuccess(cerror))
|
if (ApiSuccess(cerror))
|
||||||
{
|
{
|
||||||
// TODO Update status in DB
|
SET_STATUS(UploadStatus::Uploading, UploadComplete, ModifyUploadStatusFailed)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -368,8 +376,8 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
|
|||||||
{
|
{
|
||||||
// Queue file upload operation
|
// Queue file upload operation
|
||||||
std::lock_guard<std::mutex> l2(_fileQueueMutex);
|
std::lock_guard<std::mutex> l2(_fileQueueMutex);
|
||||||
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
|
_fileQueue.push_back([=]() { this->NewFileAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
|
||||||
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath)));
|
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(NewFileAdded(siaPath, filePath, siaDriveFilePath)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (SQLite::Exception e)
|
catch (SQLite::Exception e)
|
||||||
|
@@ -54,7 +54,7 @@ private:
|
|||||||
std::deque<std::function<void()>> _fileQueue;
|
std::deque<std::function<void()>> _fileQueue;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath);
|
void NewFileAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;
|
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;
|
||||||
@@ -237,6 +237,40 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class FailedToDeleteFromSia :
|
||||||
|
public CEvent
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
FailedToDeleteFromSia(const String& siaPath, const String& filePath, const SiaCurlError& curlError) :
|
||||||
|
_siaPath(siaPath),
|
||||||
|
_filePath(filePath),
|
||||||
|
_curlError(curlError)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
virtual ~FailedToDeleteFromSia()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
const String _siaPath;
|
||||||
|
const String _filePath;
|
||||||
|
const SiaCurlError _curlError = SiaCurlError::Success;
|
||||||
|
|
||||||
|
public:
|
||||||
|
virtual String GetSingleLineMessage() const override
|
||||||
|
{
|
||||||
|
return L"FailedToDeleteFromSia|SP|" + _siaPath + L"|FP|" + _filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual std::shared_ptr<CEvent> Clone() const override
|
||||||
|
{
|
||||||
|
return std::shared_ptr<CEvent>(new FailedToDeleteFromSia(_siaPath, _filePath, _curlError));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
class ModifyUploadStatusFailed :
|
class ModifyUploadStatusFailed :
|
||||||
public CEvent
|
public CEvent
|
||||||
{
|
{
|
||||||
@@ -515,11 +549,11 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
class NewUploadAdded :
|
class NewFileAdded :
|
||||||
public CEvent
|
public CEvent
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
NewUploadAdded(const String& siaPath, const String& filePath, const String& siaDriveFilePath) :
|
NewFileAdded(const String& siaPath, const String& filePath, const String& siaDriveFilePath) :
|
||||||
_siaPath(siaPath),
|
_siaPath(siaPath),
|
||||||
_filePath(filePath),
|
_filePath(filePath),
|
||||||
_siaDriveFilePath(siaDriveFilePath)
|
_siaDriveFilePath(siaDriveFilePath)
|
||||||
@@ -528,7 +562,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
virtual ~NewUploadAdded()
|
virtual ~NewFileAdded()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -540,12 +574,12 @@ private:
|
|||||||
public:
|
public:
|
||||||
virtual String GetSingleLineMessage() const override
|
virtual String GetSingleLineMessage() const override
|
||||||
{
|
{
|
||||||
return L"NewUploadAdded|SP|" + _siaPath + L"|FP|" + _filePath + L"|SDP|" + _siaDriveFilePath;
|
return L"NewFileAdded|SP|" + _siaPath + L"|FP|" + _filePath + L"|SDP|" + _siaDriveFilePath;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual std::shared_ptr<CEvent> Clone() const override
|
virtual std::shared_ptr<CEvent> Clone() const override
|
||||||
{
|
{
|
||||||
return std::shared_ptr<CEvent>(new NewUploadAdded(_siaPath, _filePath, _siaDriveFilePath));
|
return std::shared_ptr<CEvent>(new NewFileAdded(_siaPath, _filePath, _siaDriveFilePath));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -12,7 +12,7 @@ private:
|
|||||||
const std::string SERVER_VERSION_SUCCESS_CONTENT = "{\"version\": \"1.1.0\"}";
|
const std::string SERVER_VERSION_SUCCESS_CONTENT = "{\"version\": \"1.1.0\"}";
|
||||||
|
|
||||||
public:
|
public:
|
||||||
CMockClient(SOCKET socket, std::function<void(CMockClient* client)> removedCallback) :
|
CMockClient(SOCKET socket, std::function<void(CMockClient*)> removedCallback) :
|
||||||
_socket(socket),
|
_socket(socket),
|
||||||
_stopEvent(::CreateEvent(nullptr, FALSE, FALSE, nullptr))
|
_stopEvent(::CreateEvent(nullptr, FALSE, FALSE, nullptr))
|
||||||
{
|
{
|
||||||
@@ -29,7 +29,7 @@ public:
|
|||||||
private:
|
private:
|
||||||
SOCKET _socket;
|
SOCKET _socket;
|
||||||
HANDLE _stopEvent;
|
HANDLE _stopEvent;
|
||||||
std::function<void(CMockClient* client)> NotifyClientRemoved;
|
std::function<void(CMockClient*)> NotifyClientRemoved;
|
||||||
std::unique_ptr<std::thread> _clientThread;
|
std::unique_ptr<std::thread> _clientThread;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@@ -124,7 +124,7 @@ namespace UnitTests
|
|||||||
CUploadManager uploadManager(siaCurl, &driveConfig);
|
CUploadManager uploadManager(siaCurl, &driveConfig);
|
||||||
uploadManager.AddOrUpdate(L"/test1/test.rtf", L"./TestCacheFolder/test1/test.rtf");
|
uploadManager.AddOrUpdate(L"/test1/test.rtf", L"./TestCacheFolder/test1/test.rtf");
|
||||||
|
|
||||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<NewUploadAdded>(5000));
|
Assert::IsTrue(_eventAccumulator.WaitForEvent<NewFileAdded>(5000));
|
||||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<CreatingTemporarySiaDriveFile>(5000));
|
Assert::IsTrue(_eventAccumulator.WaitForEvent<CreatingTemporarySiaDriveFile>(5000));
|
||||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<RenamingTemporarySiaDriveFile>(5000));
|
Assert::IsTrue(_eventAccumulator.WaitForEvent<RenamingTemporarySiaDriveFile>(5000));
|
||||||
Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000));
|
Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000));
|
||||||
|
Reference in New Issue
Block a user