1
0

Upload manager changes

This commit is contained in:
Scott E. Graves
2017-02-25 20:48:27 -06:00
parent 169143dd74
commit ced91ecaa9
8 changed files with 79 additions and 36 deletions

View File

@@ -6,6 +6,7 @@
using namespace Sia::Api; using namespace Sia::Api;
CSiaApi::CSiaApi(const SiaHostConfig& hostConfig, CSiaDriveConfig* siaDriveConfig) : CSiaApi::CSiaApi(const SiaHostConfig& hostConfig, CSiaDriveConfig* siaDriveConfig) :
_hostConfig(hostConfig),
_siaCurl(hostConfig), _siaCurl(hostConfig),
_siaDriveConfig(siaDriveConfig), _siaDriveConfig(siaDriveConfig),
_wallet(new CSiaWallet(_siaCurl, siaDriveConfig)), _wallet(new CSiaWallet(_siaCurl, siaDriveConfig)),
@@ -56,4 +57,9 @@ CSiaRenterPtr CSiaApi::GetRenter() const
CSiaConsensusPtr CSiaApi::GetConsensus() const CSiaConsensusPtr CSiaApi::GetConsensus() const
{ {
return _consensus; return _consensus;
}
SiaHostConfig CSiaApi::GetHostConfig() const
{
return _hostConfig;
} }

View File

@@ -159,9 +159,7 @@ public:
public: public:
_SiaApiError FileExists(const String& siaPath, bool& exists) const; _SiaApiError FileExists(const String& siaPath, bool& exists) const;
_SiaApiError DeleteFile(const String& siaPath);
_SiaApiError DownloadFile(const String& siaPath, const String& location) const; _SiaApiError DownloadFile(const String& siaPath, const String& location) const;
_SiaApiError QueueUploadFile(const String& siaPath, const String& filePath);
_SiaApiError GetFileTree(std::shared_ptr<_CSiaFileTree>& siaFileTree) const; _SiaApiError GetFileTree(std::shared_ptr<_CSiaFileTree>& siaFileTree) const;
}; };
@@ -193,6 +191,7 @@ public:
~CSiaApi(); ~CSiaApi();
private: private:
SiaHostConfig _hostConfig;
CSiaCurl _siaCurl; CSiaCurl _siaCurl;
CSiaDriveConfig* _siaDriveConfig; CSiaDriveConfig* _siaDriveConfig;
std::shared_ptr<_CSiaWallet> _wallet; std::shared_ptr<_CSiaWallet> _wallet;
@@ -207,6 +206,7 @@ public:
std::shared_ptr<_CSiaRenter> GetRenter() const; std::shared_ptr<_CSiaRenter> GetRenter() const;
std::shared_ptr<_CSiaConsensus> GetConsensus() const; std::shared_ptr<_CSiaConsensus> GetConsensus() const;
String GetServerVersion() const; String GetServerVersion() const;
SiaHostConfig GetHostConfig() const;
}; };
typedef CSiaApi::_SiaApiError SiaApiError; typedef CSiaApi::_SiaApiError SiaApiError;

View File

@@ -130,21 +130,11 @@ SiaApiError CSiaApi::_CSiaRenter::FileExists(const String& siaPath, bool& exists
return ret; return ret;
} }
SiaApiError CSiaApi::_CSiaRenter::DeleteFile(const String& siaPath)
{
return SiaApiError::NotImplemented;
}
SiaApiError CSiaApi::_CSiaRenter::DownloadFile(const String& siaPath, const String& location) const SiaApiError CSiaApi::_CSiaRenter::DownloadFile(const String& siaPath, const String& location) const
{ {
return SiaApiError::NotImplemented; return SiaApiError::NotImplemented;
} }
SiaApiError CSiaApi::_CSiaRenter::QueueUploadFile(const String& siaPath, const String& filePath)
{
return SiaApiError::NotImplemented;
}
SiaApiError CSiaApi::_CSiaRenter::GetFileTree(CSiaFileTreePtr& siaFileTree) const SiaApiError CSiaApi::_CSiaRenter::GetFileTree(CSiaFileTreePtr& siaFileTree) const
{ {
SiaApiError ret = SiaApiError::RequestError; SiaApiError ret = SiaApiError::RequestError;

View File

@@ -7,13 +7,13 @@ using namespace Sia::Api;
#define TABLE_CREATE L"create table if not exists %s (%s);" #define TABLE_CREATE L"create table if not exists %s (%s);"
#define UPLOAD_TABLE L"upload_table" #define UPLOAD_TABLE L"upload_table"
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, status integer not null" #define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, file_path text unique not null, sd_file_path text unique not null, status integer not null"
#define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;" #define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;"
#define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;" #define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;"
#define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;" #define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, sd_file_path, status from upload_table where sia_path=@sia_path and (status=@status1 or status=@status2) order by id desc limit 1;"
#define UPDATE_STATUS "update upload_table set status=@status where sia_path=@sia_path;" #define UPDATE_STATUS "update upload_table set status=@status where sia_path=@sia_path;"
#define INSERT_UPLOAD "insert into upload_table (sia_path, status, file_path) values (@sia_path, @status, @file_path)" #define INSERT_UPLOAD "insert into upload_table (sia_path, status, file_path, sd_file_path) values (@sia_path, @status, @file_path, @sd_file_path)"
#define SET_STATUS(status, success_event, fail_event)\ #define SET_STATUS(status, success_event, fail_event)\
bool statusUpdated = false;\ bool statusUpdated = false;\
@@ -88,7 +88,6 @@ String CUploadManager::UploadStatusToString(const UploadStatus& uploadStatus)
CUploadManager::CUploadManager(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) : CUploadManager::CUploadManager(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) :
CAutoThread(siaCurl, siaDriveConfig), CAutoThread(siaCurl, siaDriveConfig),
_uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE), _uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE),
_siaDriveConfig(siaDriveConfig),
_fileThread(siaCurl, siaDriveConfig, [this](const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { this->FileThreadCallback(siaCurl, siaDriveConfig); }) _fileThread(siaCurl, siaDriveConfig, [this](const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) { this->FileThreadCallback(siaCurl, siaDriveConfig); })
{ {
CreateTableIfNotFound(&_uploadDatabase, UPLOAD_TABLE, UPLOAD_TABLE_COLUMNS); CreateTableIfNotFound(&_uploadDatabase, UPLOAD_TABLE, UPLOAD_TABLE_COLUMNS);
@@ -196,7 +195,8 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
{ {
String siaPath = CA2W(query.getColumn(1)).m_psz; String siaPath = CA2W(query.getColumn(1)).m_psz;
String filePath = CA2W(query.getColumn(2)).m_psz; String filePath = CA2W(query.getColumn(2)).m_psz;
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(3).getUInt()); String siaDriveFilePath = CA2W(query.getColumn(3)).m_psz;
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(4).getUInt());
auto fileList = fileTree->GetFileList(); auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr) auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
@@ -225,6 +225,10 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
else if ((*it)->GetUploadProgress() >= 100) else if ((*it)->GetUploadProgress() >= 100)
{ {
SET_STATUS(UploadStatus::Complete, UploadComplete, ModifyUploadStatusFailed) SET_STATUS(UploadStatus::Complete, UploadComplete, ModifyUploadStatusFailed)
if (!RetryDeleteFileIfExists(siaDriveFilePath))
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DeleteSiaDriveFileFailed(siaPath, filePath, siaDriveFilePath)));
}
} }
else else
{ {
@@ -242,7 +246,7 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
catch (const SQLite::Exception& e) catch (const SQLite::Exception& e)
{ {
// error condition - database not initialized (i.e. no table)? // error condition - database not initialized (i.e. no table)?
std::string msg = e.what(); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred(e)));
processNext = false; processNext = false;
} }
@@ -261,6 +265,7 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
String siaPath = CA2W(query.getColumn(1)).m_psz; String siaPath = CA2W(query.getColumn(1)).m_psz;
String filePath = CA2W(query.getColumn(2)).m_psz; String filePath = CA2W(query.getColumn(2)).m_psz;
// TODO Validate response
json response; json response;
SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", filePath} }, response); SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", filePath} }, response);
if (ApiSuccess(cerror)) if (ApiSuccess(cerror))
@@ -272,7 +277,7 @@ void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig
catch (const SQLite::Exception& e) catch (const SQLite::Exception& e)
{ {
// error condition // error condition
std::string msg = e.what(); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred(e)));
} }
} }
} }
@@ -343,7 +348,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ExistingUploadFound(siaPath, filePath, uploadStatus))); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(ExistingUploadFound(siaPath, filePath, uploadStatus)));
if (uploadStatus == UploadStatus::Uploading) if (uploadStatus == UploadStatus::Uploading)
{ {
SET_STATUS(UploadStatus::Modified, UploadModifiedInQueue, ModifyUploadStatusFailed) SET_STATUS(UploadStatus::Modified, UploadStatusModified, ModifyUploadStatusFailed)
} }
} }
else else
@@ -366,6 +371,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD); SQLite::Statement insert(_uploadDatabase, INSERT_UPLOAD);
insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz); insert.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
insert.bind("@file_path", CW2A(filePath.c_str()).m_psz); insert.bind("@file_path", CW2A(filePath.c_str()).m_psz);
insert.bind("@sd_file_path", CW2A(siaDriveFileName.c_str()).m_psz);
insert.bind("@status", static_cast<unsigned>(UploadStatus::Copying)); insert.bind("@status", static_cast<unsigned>(UploadStatus::Copying));
if (insert.exec() != 1) if (insert.exec() != 1)
{ {
@@ -382,7 +388,7 @@ UploadError CUploadManager::AddOrUpdate(const String& siaPath, String filePath)
} }
catch (SQLite::Exception e) catch (SQLite::Exception e)
{ {
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseInsertFailed(siaPath, filePath, CA2W(e.getErrorStr()).m_psz))); CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred(e)));
ret = UploadError::DatabaseError; ret = UploadError::DatabaseError;
} }
} }

View File

@@ -48,7 +48,6 @@ public:
private: private:
SQLite::Database _uploadDatabase; SQLite::Database _uploadDatabase;
std::mutex _uploadMutex; std::mutex _uploadMutex;
CSiaDriveConfig* _siaDriveConfig;
CAutoThread _fileThread; CAutoThread _fileThread;
std::mutex _fileQueueMutex; std::mutex _fileQueueMutex;
std::deque<std::function<void()>> _fileQueue; std::deque<std::function<void()>> _fileQueue;
@@ -205,11 +204,11 @@ public:
} }
}; };
class UploadModifiedInQueue : class UploadStatusModified :
public CEvent public CEvent
{ {
public: public:
UploadModifiedInQueue(const String& siaPath, const String& filePath) : UploadStatusModified(const String& siaPath, const String& filePath) :
_siaPath(siaPath), _siaPath(siaPath),
_filePath(filePath) _filePath(filePath)
{ {
@@ -217,7 +216,7 @@ public:
} }
public: public:
virtual ~UploadModifiedInQueue() virtual ~UploadStatusModified()
{ {
} }
@@ -228,12 +227,12 @@ private:
public: public:
virtual String GetSingleLineMessage() const override virtual String GetSingleLineMessage() const override
{ {
return L"UploadModifiedInQueue|SP|" + _siaPath + L"|FP|" + _filePath; return L"UploadStatusModified|SP|" + _siaPath + L"|FP|" + _filePath;
} }
virtual std::shared_ptr<CEvent> Clone() const override virtual std::shared_ptr<CEvent> Clone() const override
{ {
return std::shared_ptr<CEvent>(new UploadModifiedInQueue(_siaPath, _filePath)); return std::shared_ptr<CEvent>(new UploadStatusModified(_siaPath, _filePath));
} }
}; };
@@ -615,4 +614,34 @@ public:
} }
}; };
class DatabaseExceptionOccurred :
public CEvent
{
public:
DatabaseExceptionOccurred(const SQLite::Exception& exception) :
_exception(exception)
{
}
public:
virtual ~DatabaseExceptionOccurred()
{
}
private:
const SQLite::Exception _exception;
public:
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new DatabaseExceptionOccurred(_exception));
}
virtual String GetSingleLineMessage() const override
{
return L"DatabaseExceptionOccurred|MSG|" + String(CA2W(_exception.getErrorStr()).m_psz);
}
};
NS_END(2) NS_END(2)

View File

@@ -22,6 +22,8 @@ private:
private: private:
static std::mutex _dokanMutex; static std::mutex _dokanMutex;
static CSiaApi* _siaApi; static CSiaApi* _siaApi;
static CSiaDriveConfig* _siaDriveConfig;
static std::unique_ptr<CUploadManager> _uploadManager;
static DOKAN_OPERATIONS _dokanOps; static DOKAN_OPERATIONS _dokanOps;
static DOKAN_OPTIONS _dokanOptions; static DOKAN_OPTIONS _dokanOptions;
static String _cacheLocation; static String _cacheLocation;
@@ -64,13 +66,15 @@ private:
if (size > 0) if (size > 0)
{ {
// TODO Always save for now - need to change to detect modifications // TODO Always save for now - need to change to detect modifications
_siaApi->GetRenter()->QueueUploadFile(_openFileMap[id].SiaPath, _openFileMap[id].CacheFilePath); // TODO Handle error return
_uploadManager->AddOrUpdate(_openFileMap[id].SiaPath, _openFileMap[id].CacheFilePath);
} }
else else
{ {
// Treat 0 length files as deleted in Sia - cache retains 0-length // Treat 0 length files as deleted in Sia - cache retains 0-length
// TODO Retain 0 length in cache? // TODO Retain 0 length in cache?
_siaApi->GetRenter()->DeleteFile(_openFileMap[id].SiaPath); // TODO Handle error return
_uploadManager->Remove(_openFileMap[id].SiaPath);
} }
} }
} }
@@ -232,8 +236,7 @@ private:
// If file isn't cached, delete from Sia only // If file isn't cached, delete from Sia only
if (!PathFileExists(cacheFilePath.c_str()) || ::DeleteFile(cacheFilePath.c_str())) if (!PathFileExists(cacheFilePath.c_str()) || ::DeleteFile(cacheFilePath.c_str()))
{ {
// TODO May not be necessary to delete, just queue another upload if (!ApiSuccess(_uploadManager->Remove(siaPath)))
if (!ApiSuccess(_siaApi->GetRenter()->DeleteFile(siaPath)))
{ {
ret = STATUS_INVALID_SERVER_STATE; ret = STATUS_INVALID_SERVER_STATE;
} }
@@ -389,9 +392,11 @@ private:
} }
public: public:
static void Initialize(CSiaApi* siaApi) static void Initialize(CSiaApi* siaApi, CSiaDriveConfig* siaDriveConfig)
{ {
_siaApi = siaApi; _siaApi = siaApi;
_siaDriveConfig = siaDriveConfig;
_uploadManager.reset(new CUploadManager(CSiaCurl(siaApi->GetHostConfig()), siaDriveConfig));
_dokanOps.Cleanup = nullptr; _dokanOps.Cleanup = nullptr;
_dokanOps.CloseFile = Sia_CloseFile; _dokanOps.CloseFile = Sia_CloseFile;
_dokanOps.DeleteDirectory = nullptr; _dokanOps.DeleteDirectory = nullptr;
@@ -453,8 +458,9 @@ public:
static void Shutdown() static void Shutdown()
{ {
Unmount(); Unmount();
_uploadManager.reset(nullptr);
_siaApi = nullptr; _siaApi = nullptr;
_siaDriveConfig = nullptr;
ZeroMemory(&_dokanOps, sizeof(_dokanOps)); ZeroMemory(&_dokanOps, sizeof(_dokanOps));
ZeroMemory(&_dokanOptions, sizeof(_dokanOptions)); ZeroMemory(&_dokanOptions, sizeof(_dokanOptions));
} }
@@ -472,6 +478,8 @@ public:
// Static member variables // Static member variables
std::mutex DokanImpl::_dokanMutex; std::mutex DokanImpl::_dokanMutex;
CSiaApi* DokanImpl::_siaApi = nullptr; CSiaApi* DokanImpl::_siaApi = nullptr;
CSiaDriveConfig* DokanImpl::_siaDriveConfig = nullptr;
std::unique_ptr<CUploadManager> DokanImpl::_uploadManager;
DOKAN_OPERATIONS DokanImpl::_dokanOps; DOKAN_OPERATIONS DokanImpl::_dokanOps;
DOKAN_OPTIONS DokanImpl::_dokanOptions; DOKAN_OPTIONS DokanImpl::_dokanOptions;
String DokanImpl::_cacheLocation; String DokanImpl::_cacheLocation;
@@ -485,15 +493,16 @@ NTSTATUS DokanImpl::_mountStatus = STATUS_SUCCESS;
String DokanImpl::_mountPoint; String DokanImpl::_mountPoint;
CSiaDokanDrive::CSiaDokanDrive(CSiaApi& siaApi) : CSiaDokanDrive::CSiaDokanDrive(CSiaApi& siaApi, CSiaDriveConfig* siaDriveConfig) :
_siaApi(siaApi), _siaApi(siaApi),
_siaDriveConfig(siaDriveConfig),
_Mounted(false) _Mounted(false)
{ {
std::lock_guard<std::mutex> l(DokanImpl::GetMutex()); std::lock_guard<std::mutex> l(DokanImpl::GetMutex());
if (DokanImpl::IsInitialized()) if (DokanImpl::IsInitialized())
throw SiaDokanDriveException("Sia drive has already been activated"); throw SiaDokanDriveException("Sia drive has already been activated");
DokanImpl::Initialize(&_siaApi); DokanImpl::Initialize(&_siaApi, _siaDriveConfig);
} }
CSiaDokanDrive::~CSiaDokanDrive() CSiaDokanDrive::~CSiaDokanDrive()

View File

@@ -21,7 +21,7 @@ class AFX_EXT_CLASS CSiaDokanDrive
{ {
public: public:
// throws SiaDokenDriveException // throws SiaDokenDriveException
CSiaDokanDrive(CSiaApi& siaApi); CSiaDokanDrive(CSiaApi& siaApi, CSiaDriveConfig* siaDriveConfig);
public: public:
~CSiaDokanDrive(); ~CSiaDokanDrive();
@@ -29,6 +29,8 @@ public:
private: private:
CSiaApi& _siaApi; CSiaApi& _siaApi;
CSiaDriveConfig* _siaDriveConfig;
Property(bool, Mounted, public, private) Property(bool, Mounted, public, private)
public: public:

View File

@@ -130,6 +130,7 @@ namespace UnitTests
Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000)); Assert::IsTrue(_eventAccumulator.WaitForEvent<UploadAddedToQueue>(5000));
Assert::IsFalse(_eventAccumulator.Contains<DatabaseInsertFailed>()); Assert::IsFalse(_eventAccumulator.Contains<DatabaseInsertFailed>());
Assert::IsFalse(_eventAccumulator.Contains<DatabaseExceptionOccurred>());
Assert::IsFalse(_eventAccumulator.Contains<DeleteSiaDriveFileFailed>()); Assert::IsFalse(_eventAccumulator.Contains<DeleteSiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<RenamingTemporarySiaDriveFileFailed>()); Assert::IsFalse(_eventAccumulator.Contains<RenamingTemporarySiaDriveFileFailed>());
Assert::IsFalse(_eventAccumulator.Contains<DeleteTemporarySiaDriveFileFailed>()); Assert::IsFalse(_eventAccumulator.Contains<DeleteTemporarySiaDriveFileFailed>());