1
0

Fix database exceptions

This commit is contained in:
Scott E. Graves
2017-04-15 22:00:33 -05:00
parent f71259d0c6
commit c79976c116
2 changed files with 225 additions and 114 deletions

View File

@@ -55,10 +55,10 @@ private:
std::shared_ptr<std::vector<CSiaFilePtr>> _uploadFileList;
private:
CSiaDriveConfig* GetSiaDriveConfig() const { return _siaDriveConfig; }
void HandleFileRemove(const CSiaCurl& siaCurl, const SString& siaPath);
void DeleteFilesNotFound(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig);
void DeleteFilesRemovedFromSia(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig, const bool& isStartup = false);
CSiaDriveConfig* GetSiaDriveConfig() const { return _siaDriveConfig; }
void HandleFileRemove(const CSiaCurl& siaCurl, const SString& siaPath);
protected:
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;

View File

@@ -253,6 +253,55 @@ public:
}
};
class FileRemovedFromDatabase :
public CEvent
{
public:
FileRemovedFromDatabase(const SString& siaPath, const SString& filePath) :
_siaPath(siaPath),
_filePath(filePath)
{
}
public:
virtual ~FileRemovedFromDatabase()
{
}
private:
const SString _siaPath;
const SString _filePath;
public:
virtual SString GetSingleLineMessage() const override
{
return GetEventName() +
"|SP|" + _siaPath +
"|FP|" + _filePath;
}
virtual std::shared_ptr<CEvent> Clone() const override
{
return std::shared_ptr<CEvent>(new FileRemovedFromDatabase(_siaPath, _filePath));
}
virtual SString GetEventName() const override
{
return "FileRemovedFromDatabase";
}
virtual json GetEventJson() const override
{
return
{
{ "event", GetEventName() },
{ "sia_path", _siaPath },
{ "file_path", _filePath }
};
}
};
class FailedToRemoveFileFromSia :
public CEvent
{
@@ -644,11 +693,12 @@ SString CUploadManager::UploadStatusToString(const UploadStatus& uploadStatus)
CUploadManager::CUploadManager(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) :
CAutoThread(siaCurl, siaDriveConfig),
_siaDriveConfig(siaDriveConfig),
_uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE)
_uploadDatabase(siaDriveConfig->GetRenter_UploadDbFilePath(), SQLite::OPEN_CREATE | SQLite::OPEN_READWRITE, 1000)
{
CreateTableIfNotFound(&_uploadDatabase, UPLOAD_TABLE, UPLOAD_TABLE_COLUMNS);
// Detect files that have been removed since last startup
DeleteFilesNotFound(siaCurl, siaDriveConfig);
DeleteFilesRemovedFromSia(siaCurl, siaDriveConfig, true);
// Begin normal processing
@@ -661,6 +711,31 @@ CUploadManager::~CUploadManager()
StopAutoThread();
}
void CUploadManager::DeleteFilesNotFound(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
{
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Queued));
query.bind("@status2", static_cast<unsigned>(UploadStatus::Uploading));
while (query.executeStep())
{
SString filePath = static_cast<const char*>(query.getColumn(query.getColumnIndex("file_path")));
if (!FilePath(filePath).IsFile())
{
SString siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
SQLite::Statement del(_uploadDatabase, DELETE_UPLOAD);
del.bind("@sia_path", SString::ToUtf8(siaPath).c_str());
if (del.exec() >= 0)
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(FileRemovedFromDatabase(siaPath, filePath)));
}
else
{
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseDeleteFailed(siaPath, filePath, del.getErrorMsg())));
}
}
}
}
void CUploadManager::DeleteFilesRemovedFromSia(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig, const bool& isStartup)
{
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
@@ -707,132 +782,168 @@ void CUploadManager::HandleFileRemove(const CSiaCurl& siaCurl, const SString& si
void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
{
bool processNext = true;
do
{
try
{
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
json result;
if (ApiSuccess(siaCurl.Get(L"/renter/files", result)))
{
fileTree->BuildTree(result);
try
{
UploadFileListPtr uploadFileList(new UploadFileList());
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Queued));
query.bind("@status2", static_cast<unsigned>(UploadStatus::Uploading));
while (query.executeStep())
{
SString siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList->begin(), fileList->end(), [&](const CSiaFilePtr& ptr)
{
return ptr->GetSiaPath() == siaPath;
});
try
{
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
json result;
if (ApiSuccess(siaCurl.Get(L"/renter/files", result)))
{
fileTree->BuildTree(result);
if (it == fileList->end())
{
// TODO Fix file size
CSiaFilePtr ptr(new CSiaFile(siaCurl, siaDriveConfig,
{
{"siapath", siaPath},
{"filesize", 0},
{"available", false},
{"renewing", false},
{"redundancy", 0},
{"uploadprogress", 0},
{"expiration", 0}
}));
uploadFileList->push_back(ptr);
}
else
{
uploadFileList->push_back(*it);
}
}
_uploadFileList = uploadFileList;
}
catch (const SQLite::Exception& e)
{
// error condition
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(uploadStatus)", e)));
}
// Lock here - if file is modified again before previously queued upload is complete, delete it and
// start again later
bool complete = false;
SString siaPath;
SString filePath;
std::lock_guard<std::mutex> l(_uploadMutex);
{
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
query.bind("@status", static_cast<unsigned>(UploadStatus::Uploading));
if (query.executeStep())
{
siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
filePath = static_cast<const char*>(query.getColumn(query.getColumnIndex("file_path")));
auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList->begin(), fileList->end(), [&](const CSiaFilePtr& ptr)
{
return ptr->GetSiaPath() == siaPath;
});
// Removed by another client
if (it == fileList->end())
{
HandleFileRemove(siaCurl, siaPath);
}
// Upload is complete
else if ((*it)->GetAvailable())
{
complete = true;
}
// Upload still active, don't process another file
else
{
SQLite::Statement count(_uploadDatabase, QUERY_UPLOAD_COUNT_BY_STATUS);
count.bind("@status", static_cast<unsigned>(UploadStatus::Uploading));
processNext = (count.executeStep() && (count.getColumn(0).getInt64() < _siaDriveConfig->GetMaxUploadCount()));
}
}
}
if (complete)
{
SET_STATUS(UploadStatus::Complete, UploadToSiaComplete, ModifyUploadStatusFailed)
}
}
else
{
// error condition - host down?
processNext = false;
}
}
catch (const SQLite::Exception& e)
{
// error condition - database not initialized (i.e. no table)?
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(/renter/files)", e)));
processNext = false;
}
if (processNext)
{
try
{
UploadFileListPtr uploadFileList(new UploadFileList());
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS);
query.bind("@status1", static_cast<unsigned>(UploadStatus::Queued));
query.bind("@status2", static_cast<unsigned>(UploadStatus::Uploading));
while (query.executeStep())
{
SString siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList->begin(), fileList->end(), [&](const CSiaFilePtr& ptr)
{
return ptr->GetSiaPath() == siaPath;
});
SString siaPath;
SString filePath;
bool uploading = false;
if (it == fileList->end())
std::lock_guard<std::mutex> l(_uploadMutex);
{
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
if (query.executeStep())
{
// TODO Fix file size
CSiaFilePtr ptr(new CSiaFile(siaCurl, siaDriveConfig,
siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
filePath = static_cast<const char*>(query.getColumn(query.getColumnIndex("file_path")));
if (FilePath(filePath).IsFile())
{
{"siapath", siaPath},
{"filesize", 0},
{"available", false},
{"renewing", false},
{"redundancy", 0},
{"uploadprogress", 0},
{"expiration", 0}
}));
uploadFileList->push_back(ptr);
json response;
SiaCurlError cerror = siaCurl.Post(SString(L"/renter/upload/") + siaPath, { {L"source", filePath} }, response);
if (ApiSuccess(cerror) || cerror.GetReason().Contains("already exists"))
{
uploading = true;
}
}
else
{
HandleFileRemove(siaCurl, siaPath);
}
}
else
{
uploadFileList->push_back(*it);
processNext = false;
}
}
_uploadFileList = uploadFileList;
if (uploading)
{
SET_STATUS(UploadStatus::Uploading, UploadToSiaStarted, ModifyUploadStatusFailed)
}
}
catch (const SQLite::Exception& e)
{
// error condition
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(uploadStatus)", e)));
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(processNext)", e)));
processNext = false;
}
// Lock here - if file is modified again before previously queued upload is complete, delete it and
// start again later
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
query.bind("@status", static_cast<unsigned>(UploadStatus::Uploading));
if (query.executeStep())
{
SString siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
SString filePath = static_cast<const char*>(query.getColumn(query.getColumnIndex("file_path")));
auto fileList = fileTree->GetFileList();
auto it = std::find_if(fileList->begin(), fileList->end(), [&](const CSiaFilePtr& ptr)
{
return ptr->GetSiaPath() == siaPath;
});
// Removed by another client
if (it == fileList->end())
{
HandleFileRemove(siaCurl, siaPath);
}
// Upload is complete
else if ((*it)->GetAvailable())
{
SET_STATUS(UploadStatus::Complete, UploadToSiaComplete, ModifyUploadStatusFailed)
}
// Upload still active, don't process another file
else
{
SQLite::Statement count(_uploadDatabase, QUERY_UPLOAD_COUNT_BY_STATUS);
count.bind("@status", static_cast<unsigned>(UploadStatus::Uploading));
processNext = (count.executeStep() && (count.getColumn(0).getInt64() < _siaDriveConfig->GetMaxUploadCount()));
}
}
}
else
{
// error condition - host down?
processNext = false;
}
}
catch (const SQLite::Exception& e)
{
// error condition - database not initialized (i.e. no table)?
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(/renter/files)", e)));
processNext = false;
}
if (processNext)
{
try
{
std::lock_guard<std::mutex> l(_uploadMutex);
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
if (query.executeStep())
{
SString siaPath = static_cast<const char*>(query.getColumn(query.getColumnIndex("sia_path")));
SString filePath = static_cast<const char*>(query.getColumn(query.getColumnIndex("file_path")));
json response;
SiaCurlError cerror = siaCurl.Post(SString(L"/renter/upload/") + siaPath, { {L"source", filePath} }, response);
if (ApiSuccess(cerror) || cerror.GetReason().Contains("already exists"))
{
SET_STATUS(UploadStatus::Uploading, UploadToSiaStarted, ModifyUploadStatusFailed)
}
}
}
catch (const SQLite::Exception& e)
{
// error condition
CEventSystem::EventSystem.NotifyEvent(CreateSystemEvent(DatabaseExceptionOccurred("AutoThreadCallback(processNext)", e)));
}
}
}
}
while (processNext);
}
UploadStatus CUploadManager::GetUploadStatus(const SString& siaPath)