1
0

Upload manager changes and added initial event system

This commit is contained in:
Scott E. Graves
2017-02-23 11:36:18 -06:00
parent ace1e7292e
commit 9bd8d9881b
9 changed files with 266 additions and 70 deletions

View File

@@ -0,0 +1,19 @@
#include "stdafx.h"
#include "EventSystem.h"
using namespace Sia::Api;
CEventSystem CEventSystem::EventSystem;
CEventSystem::CEventSystem()
{
}
CEventSystem::~CEventSystem()
{
}
void CEventSystem::NotifyEvent(const CEvent& eventData)
{
}

View File

@@ -0,0 +1,34 @@
#pragma once
#include <SiaCommon.h>
NS_BEGIN(Sia)
NS_BEGIN(Api)
class AFX_EXT_CLASS CEvent
{
};
// Singleton
class AFX_EXT_CLASS CEventSystem
{
private:
CEventSystem();
private:
~CEventSystem();
public:
// Singleton setup
CEventSystem(const CEventSystem&) = delete;
CEventSystem(CEventSystem&&) = delete;
CEventSystem& operator=(CEventSystem&&) = delete;
CEventSystem& operator=(const CEventSystem&) = delete;
public:
static CEventSystem EventSystem;
public:
void NotifyEvent(const CEvent& eventData);
};
NS_END(2)

View File

@@ -59,14 +59,14 @@ const hastingsPerSiacoin = new BigNumber('10').toPower(24)
const siacoinsToHastings = (siacoins) => new BigNumber(siacoins).times(hastingsPerSiacoin)
const hastingsToSiacoins = (hastings) => new BigNumber(hastings).dividedBy(hastingsPerSiacoin)
*/
static inline SiaCurrency HastingsStringToSiaCurrency(const String& value)
inline static SiaCurrency HastingsStringToSiaCurrency(const String& value)
{
ttmath::Parser<SiaCurrency> parser;
parser.Parse(value + L" / (10^24)");
return parser.stack[0].value;
}
static inline String SiaCurrencyToString(const SiaCurrency& value)
inline static String SiaCurrencyToString(const SiaCurrency& value)
{
ttmath::Conv conv;
conv.base = 10;
@@ -88,7 +88,7 @@ public:
};
template<typename T, typename R>
R CalculateAveragePrice(const std::vector<T>& v, std::function<R(const T& t)> PriceGetter)
inline static R CalculateAveragePrice(const std::vector<T>& v, std::function<R(const T& t)> PriceGetter)
{
R result = v.size() ? std::accumulate(std::next(v.begin()), v.end(), PriceGetter(v[0]), [&](const R& a, const T& b) {
return a + PriceGetter(b);
@@ -97,17 +97,17 @@ R CalculateAveragePrice(const std::vector<T>& v, std::function<R(const T& t)> Pr
return result;
}
inline Hastings CalculateAverageHostPrice(const std::vector<IHost>& hosts)
inline static Hastings CalculateAverageHostPrice(const std::vector<IHost>& hosts)
{
return CalculateAveragePrice<IHost, Hastings>(hosts, [](const IHost& host)->Hastings { return host.GetStoragePrice(); });
}
inline Hastings CalculateAverageDownloadPrice(const std::vector<IHost>& hosts)
inline static Hastings CalculateAverageDownloadPrice(const std::vector<IHost>& hosts)
{
return CalculateAveragePrice<IHost, Hastings>(hosts, [](const IHost& host)->Hastings { return host.GetDownloadPrice(); });
}
inline Hastings CalculateAverageUploadPrice(const std::vector<IHost>& hosts)
inline static Hastings CalculateAverageUploadPrice(const std::vector<IHost>& hosts)
{
return CalculateAveragePrice<IHost, Hastings>(hosts, [](const IHost& host)->Hastings { return host.GetUploadPrice(); });
}
@@ -127,17 +127,41 @@ static T& ReplaceStringInPlace(T& subject, const T& search, const T& replace)
template<typename T>
static T& ReplaceStringInPlace(T& subject, typename T::value_type* search, const T& replace)
inline static T& ReplaceStringInPlace(T& subject, typename T::value_type* search, const T& replace)
{
return ReplaceStringInPlace(subject, T(search), replace);
}
template<typename T>
static T& ReplaceStringInPlace(T& subject, typename T::value_type* search, typename T::value_type* replace)
inline static T& ReplaceStringInPlace(T& subject, typename T::value_type* search, typename T::value_type* replace)
{
return ReplaceStringInPlace(subject, T(search), T(replace));
}
String AFX_EXT_API GenerateSha256(const String& str);
static BOOL RetryAction(std::function<BOOL()> func, std::uint16_t retryCount, const DWORD& retryDelay)
{
BOOL ret = FALSE;
while (retryCount-- && !(ret = func()))
{
::Sleep(retryDelay);
}
return ret;
}
#define RetryableAction(exec, count, delayMs) RetryAction([&]()->BOOL{return exec;}, count, delayMs)
#define DEFAULT_RETRY_COUNT 10
#define DEFAULT_RETRY_DELAY_MS 1000
static BOOL RetryDeleteFileIfExists(const String& filePath)
{
BOOL ret = TRUE;
if (::PathFileExists(filePath.c_str()))
{
ret = RetryableAction(::DeleteFile(filePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS);
}
return ret;
}
String AFX_EXT_API GenerateSha256(const String& str);
NS_END(2)

View File

@@ -204,6 +204,7 @@
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">
</PrecompiledHeader>
</ClCompile>
<ClCompile Include="EventSystem.cpp" />
<ClCompile Include="SiaApi.cpp" />
<ClCompile Include="SiaCommon.cpp" />
<ClCompile Include="SiaConsensus.cpp" />
@@ -228,6 +229,7 @@
</ItemGroup>
<ItemGroup>
<ClInclude Include="AutoThread.h" />
<ClInclude Include="EventSystem.h" />
<ClInclude Include="json.hpp" />
<ClInclude Include="Resource.h" />
<ClInclude Include="SiaApi.h" />

View File

@@ -60,6 +60,9 @@
<ClCompile Include="UploadManager.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="EventSystem.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<None Include="SiaDrive.Api.def">
@@ -100,6 +103,9 @@
<ClInclude Include="UploadManager.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="EventSystem.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="SiaDrive.Api.rc">

View File

@@ -39,7 +39,7 @@ void CSiaDriveConfig::Load( )
if (f.Open(GetFilePath().c_str(), CFile::modeRead))
{
std::string s;
s.resize(f.GetLength());
s.resize(static_cast<std::size_t>(f.GetLength()));
f.Read(&s[0], static_cast<UINT>(s.length()));

View File

@@ -2,12 +2,12 @@
#include "UploadManager.h"
#include "SiaDriveConfig.h"
#include "SiaApi.h"
#include "EventSystem.h"
using namespace Sia::Api;
#define TABLE_CREATE L"create table if not exists %s (%s);"
#define UPLOAD_TABLE L"upload_table"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, temp_path text unique not null, status integer not null"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, siadrive_path text unique not null, status integer not null"
#define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;"
#define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;"
@@ -48,6 +48,57 @@ CUploadManager::~CUploadManager()
StopAutoThread();
}
void CUploadManager::FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
{
CEventSystem::EventSystem.NotifyEvent(CreatingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath));
if (RetryableAction(::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
{
// Delete existing '.siadrive' file, if found
// !!Should never come here. If so, there was a problem with startup clean-up
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath));
}
// Rename '.siadrive.temp' to '.siadrive'
CEventSystem::EventSystem.NotifyEvent(RenamingTemporarySiaDriveFile(siaPath, filePath, tempSourcePath, siaDriveFilePath));
if (RetryableAction(::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()), DEFAULT_RETRY_COUNT, DEFAULT_RETRY_DELAY_MS))
{
// TODO Change status to 'Queued'
}
else
{
CEventSystem::EventSystem.NotifyEvent(RenamingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath, siaDriveFilePath));
if (!RetryDeleteFileIfExists(tempSourcePath))
{
CEventSystem::EventSystem.NotifyEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath));
}
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath));
}
// Requeue
AddOrUpdate(siaPath, filePath);
}
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreatingTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath));
// If temp copy fails, try to delete
// If partial copy and file is unable to be deleted, log warning
if (!RetryDeleteFileIfExists(tempSourcePath))
{
CEventSystem::EventSystem.NotifyEvent(DeleteTemporarySiaDriveFileFailed(siaPath, filePath, tempSourcePath));
}
// Requeue
AddOrUpdate(siaPath, filePath);
}
}
void CUploadManager::FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
{
std::function<void()> nextFile = nullptr;
@@ -107,6 +158,7 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
else if ((*it)->GetUploadProgress() >= 100)
{
// upload complete - change status
}
else
{
@@ -158,7 +210,6 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
}
}
UploadStatus CUploadManager::GetUploadStatus(const String& siaPath)
{
UploadStatus uploadStatus = UploadStatus::NotFound;
@@ -182,16 +233,29 @@ UploadStatus CUploadManager::GetUploadStatus(const String& siaPath)
// will be deleted from Sia and treated as new.
// Uploads will always use the real file. User i/o will occur against the hidden file only temporarily. Since upload will take
// longer to handle than a normal file copy, this seems to be the best compromise for performance.
//
// ERROR SCENARIOS
// While copy is active, point to normal file.
// After copy, use temp file as real file until upload is complete
// This allows modifications to the file to occur in a more timely manner.
// Error Scenarios:
// Crash before db update to status copying - file will be re-uploaded automatically, if complete; otherwise, deleted
// Need to keep track of files as being copied and then there status
// Crash before copy begins - on startup, check for copying status with no .siadrive
// Crash during copy - on startup, check for copying status and delete .siadrive
// Crash after copy but before db update - on startup, check for copying status and delete .siadrive
UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
{
UploadError ret = UploadError::Success;
// Relative to absolute and grab parent folder of source
String rootPath;
{
String temp;
temp.resize(MAX_PATH + 1);
filePath = _wfullpath(&temp[0], filePath.c_str(), MAX_PATH);
if (::PathIsRelative(filePath.c_str()))
{
temp.resize(MAX_PATH + 1);
filePath = _wfullpath(&temp[0], filePath.c_str(), MAX_PATH);
}
temp = filePath;
::PathRemoveFileSpec(&temp[0]);
@@ -209,7 +273,9 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
// Check copying
if (query.executeStep())
{
if (static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2))) == UploadStatus::Uploading)
UploadStatus uploadStatus = static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2)));
CEventSystem::EventSystem.NotifyEvent(ExistingUploadFound(siaPath, filePath, uploadStatus));
if (uploadStatus == UploadStatus::Uploading)
{
// set to modified
// update file path
@@ -217,23 +283,6 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
}
else
{
/*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD);
addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz);
addOrUpdate.bind("@temp_path", CW2A(tempPath.c_str()).m_psz);
addOrUpdate.bind("@status", static_cast<unsigned>(UploadStatus::Copying));*/
// While copy is active, point to normal file.
// After copy, use temp file as real file until upload is complete
// This allows modifications to the file to occur in a more timely manner.
// Error Scenarios:
// Crash before db update to status copying - file will be re-uploaded automatically, if complete; otherwise, deleted
// Need to keep track of files as being copied and then there status
// Crash before copy begins - on startup, check for copying status with no .siadrive
// Crash during copy - on startup, check for copying status and delete .siadrive
// Crash after copy but before db update - on startup, check for copying status and delete .siadrive
// Strip drive specification (i.e. C:\)
// TODO If mount to drive is ever enabled, this will need to change
String siaDriveFileName = GenerateSha256(&filePath[3]) + L".siadrive";
@@ -246,39 +295,17 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
tempSourcePath.resize(MAX_PATH + 1);
PathCombine(&tempSourcePath[0], rootPath.c_str(), (siaDriveFileName + L".temp").c_str());
// Queue file operations
// Add to db
/*SQLite::Statement addOrUpdate(_uploadDatabase, ADD_UPDATE_UPLOAD);
addOrUpdate.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
addOrUpdate.bind("@file_path", CW2A(filePath.c_str()).m_psz);
addOrUpdate.bind("@siadrive_path", CW2A(siaDriveFilePath.c_str()).m_psz);
addOrUpdate.bind("@status", static_cast<unsigned>(UploadStatus::Copying));*/
// Queue file upload operation
std::lock_guard<std::mutex> l2(_fileQueueMutex);
_fileQueue.push_back([this,filePath, tempSourcePath, siaDriveFilePath]()
{
// TODO Retry a few times
if (::CopyFile(filePath.c_str(), tempSourcePath.c_str(), FALSE))
{
// Rename to real temp
// TODO Retry a few times
if (::PathFileExists(siaDriveFilePath.c_str()))
{
// TODO Retry a few times
::DeleteFile(siaDriveFilePath.c_str());
}
if (::MoveFile(tempSourcePath.c_str(), siaDriveFilePath.c_str()))
{
// Change status to queued
}
}
else
{
// If temp copy fails, try to delete
// If partial copy and file is unable to be deleted, log warning
if (::PathFileExists(tempSourcePath.c_str()))
{
// TODO Retry a few times
::DeleteFile(tempSourcePath.c_str());
}
// Requeue
}
});
_fileQueue.push_back([=]() { this->FileUploadAction(siaPath, filePath, tempSourcePath, siaDriveFilePath); });
CEventSystem::EventSystem.NotifyEvent(NewUploadAdded(siaPath, filePath, siaDriveFilePath));
}
return ret;

View File

@@ -2,6 +2,7 @@
#include "AutoThread.h"
#include "SQLiteCpp/Database.h"
#include <deque>
#include <EventSystem.h>
NS_BEGIN(Sia)
NS_BEGIN(Api)
@@ -50,6 +51,9 @@ private:
std::mutex _fileQueueMutex;
std::deque<std::function<void()>> _fileQueue;
private:
void FileUploadAction(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath);
protected:
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;
void FileThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig);
@@ -65,4 +69,84 @@ public:
typedef Sia::Api::CUploadManager::_UploadStatus UploadStatus;
typedef Sia::Api::CUploadManager::_UploadError UploadError;
// Event Notifications
class CreatingTemporarySiaDriveFile :
public CEvent
{
public:
CreatingTemporarySiaDriveFile(const String& siaPath, const String& filePath, const String& tempSourcePath)
{
}
};
class CreatingTemporarySiaDriveFileFailed :
public CEvent
{
public:
CreatingTemporarySiaDriveFileFailed(const String& siaPath, const String& filePath, const String& tempSourcePath)
{
}
};
class DeleteSiaDriveFileFailed :
public CEvent
{
public:
DeleteSiaDriveFileFailed(const String& siaPath, const String& filePath, const String& siaDriveFilePath)
{
}
};
class DeleteTemporarySiaDriveFileFailed :
public CEvent
{
public:
DeleteTemporarySiaDriveFileFailed(const String& siaPath, const String& filePath, const String& tempSourcePath)
{
}
};
class RenamingTemporarySiaDriveFile :
public CEvent
{
public:
RenamingTemporarySiaDriveFile(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
{
}
};
class RenamingTemporarySiaDriveFileFailed :
public CEvent
{
public:
RenamingTemporarySiaDriveFileFailed(const String& siaPath, const String& filePath, const String& tempSourcePath, const String& siaDriveFilePath)
{
}
};
class ExistingUploadFound :
public CEvent
{
public:
ExistingUploadFound(const String& siaPath, const String& filePath, const UploadStatus& uploadStatus)
{
}
};
class NewUploadAdded :
public CEvent
{
public:
NewUploadAdded(const String& siaPath, const String& filePath, const String& siaDriveFilePath)
{
}
};
NS_END(2)

View File

@@ -15,18 +15,18 @@ namespace UnitTests
TEST_METHOD(DefaultFileCreation)
{
DeleteFile(TEST_CONFIG_FILE);
Assert::IsFalse(PathFileExists(TEST_CONFIG_FILE));
Assert::IsFalse(PathFileExists(TEST_CONFIG_FILE) ? true : false);
Sia::Api::CSiaDriveConfig cfg(TEST_CONFIG_FILE);
Assert::AreEqual(static_cast<uint8_t>(0), cfg.GetUI_Main_TabIndex());
Assert::IsTrue(PathFileExists(TEST_CONFIG_FILE));
Assert::IsTrue(PathFileExists(TEST_CONFIG_FILE) ? true : false);
Assert::AreEqual(TEST_CONFIG_FILE, cfg.GetFilePath().c_str());
CFile f;
Assert::IsTrue(f.Open(cfg.GetFilePath().c_str(), CFile::modeRead));
Assert::IsTrue(f.Open(cfg.GetFilePath().c_str(), CFile::modeRead) ? true : false);
std::string s;
s.resize(f.GetLength());
s.resize(static_cast<std::size_t>(f.GetLength()));
f.Read(&s[0], s.length());