Upload manager changes
This commit is contained in:
@@ -8,6 +8,10 @@ using namespace Sia::Api;
|
|||||||
#define TABLE_CREATE L"create table if not exists %s (%s);"
|
#define TABLE_CREATE L"create table if not exists %s (%s);"
|
||||||
#define UPLOAD_TABLE L"upload_table"
|
#define UPLOAD_TABLE L"upload_table"
|
||||||
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, cache_path text unique not null, status integer not null"
|
#define UPLOAD_TABLE_COLUMNS L"id integer primary key autoincrement, sia_path text unique not null, cache_path text unique not null, status integer not null"
|
||||||
|
#define QUERY_UPLOADS_BY_STATUS "select id, sia_path, status from upload_table where status=@status order by id desc limit 1;"
|
||||||
|
#define QUERY_UPLOADS_BY_2_STATUS "select id, sia_path, status from upload_table where (status=@status1 or status=@status2) order by id desc limit 1;"
|
||||||
|
#define QUERY_UPLOADS_BY_SIA_PATH "select id, sia_path, status from upload_table where sia_path=@sia_path order by id desc limit 1;"
|
||||||
|
#define QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS "select id, sia_path, status from upload_table where sia_path=@sia_path (status=@status1 or status=@status2) order by id desc limit 1;"
|
||||||
|
|
||||||
template <typename... Ts>
|
template <typename... Ts>
|
||||||
String fmt(const String &fmt, Ts... vs)
|
String fmt(const String &fmt, Ts... vs)
|
||||||
@@ -42,81 +46,127 @@ CUploadManager::~CUploadManager()
|
|||||||
|
|
||||||
void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
|
void CUploadManager::AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig)
|
||||||
{
|
{
|
||||||
// TODO Lock here - if file is modified again before a prior upload is complete, delete it and start again later
|
|
||||||
bool processNext = true;
|
bool processNext = true;
|
||||||
|
|
||||||
|
try
|
||||||
{
|
{
|
||||||
try
|
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
|
||||||
|
json result;
|
||||||
|
if (ApiSuccess(siaCurl.Get(L"/renter/files", result)))
|
||||||
{
|
{
|
||||||
SQLite::Statement query(_uploadDatabase, "select sia_path, status from upload_table where status=@status1 or status=@status2");
|
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_2_STATUS);
|
||||||
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
|
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
|
||||||
query.bind("@status2", static_cast<unsigned>(UploadStatus::Modified));
|
query.bind("@status2", static_cast<unsigned>(UploadStatus::Modified));
|
||||||
|
|
||||||
CSiaFileTreePtr fileTree(new CSiaFileTree(siaCurl, siaDriveConfig));
|
// TODO Lock here - if file is modified again before a prior upload is complete, delete it and start again later
|
||||||
json result;
|
fileTree->BuildTree(result);
|
||||||
if (ApiSuccess(siaCurl.Get(L"/renter/files", result)))
|
if (query.executeStep())
|
||||||
{
|
{
|
||||||
fileTree->BuildTree(result);
|
std::string tempSiaPath = query.getColumn(1);
|
||||||
while (processNext && query.executeStep())
|
String siaPath = CA2W(tempSiaPath.c_str()).m_psz;
|
||||||
|
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(2).getUInt());
|
||||||
|
|
||||||
|
auto fileList = fileTree->GetFileList();
|
||||||
|
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
|
||||||
{
|
{
|
||||||
std::string tempSiaPath = query.getColumn(0);
|
return ptr->GetSiaPath() == siaPath;
|
||||||
String siaPath = CA2W(tempSiaPath.c_str()).m_psz;
|
});
|
||||||
UploadStatus uploadStatus = static_cast<UploadStatus>(query.getColumn(1).getUInt());
|
|
||||||
|
|
||||||
auto fileList = fileTree->GetFileList();
|
if (it == fileList.end())
|
||||||
auto it = std::find_if(fileList.begin(), fileList.end(), [&](const CSiaFilePtr& ptr)
|
{
|
||||||
{
|
// error condition - should always exist. delete from db and log warning, but continue processing
|
||||||
return ptr->GetSiaPath() == siaPath;
|
}
|
||||||
});
|
else if (uploadStatus == UploadStatus::Modified)
|
||||||
|
{
|
||||||
if (it == fileList.end())
|
// delete existing, change status to queued
|
||||||
{
|
}
|
||||||
// error condition - should always exist. delete from db and log warning, but continue processing
|
else if ((*it)->GetUploadProgress() >= 100)
|
||||||
}
|
{
|
||||||
else if (uploadStatus == UploadStatus::Modified)
|
// upload complete - change status
|
||||||
{
|
}
|
||||||
// delete existing, change status to uploading and upload to sia
|
else
|
||||||
processNext = false;
|
{
|
||||||
}
|
// upload still active
|
||||||
else if ((*it)->GetUploadProgress() >= 100)
|
processNext = false;
|
||||||
{
|
|
||||||
// upload complete - change status
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// upload still active
|
|
||||||
processNext = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
// error condition - host down?
|
|
||||||
processNext = false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (const SQLite::Exception& e)
|
else
|
||||||
{
|
{
|
||||||
// error condition - database not initialized (i.e. no table)?
|
// error condition - host down?
|
||||||
std::string msg = e.what();
|
|
||||||
processNext = false;
|
processNext = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (const SQLite::Exception& e)
|
||||||
|
{
|
||||||
|
// error condition - database not initialized (i.e. no table)?
|
||||||
|
std::string msg = e.what();
|
||||||
|
processNext = false;
|
||||||
|
}
|
||||||
|
|
||||||
if (processNext)
|
if (processNext)
|
||||||
{
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_STATUS);
|
||||||
|
query.bind("@status", static_cast<unsigned>(UploadStatus::Queued));
|
||||||
|
// TODO Lock here - if file is modified again before a prior upload is complete, delete it and start again later
|
||||||
|
if (query.executeStep())
|
||||||
|
{
|
||||||
|
String siaPath = CA2W(query.getColumn(1)).m_psz;
|
||||||
|
String tempFilePath;
|
||||||
|
|
||||||
|
json response;
|
||||||
|
SiaCurlError cerror = siaCurl.Post(String(L"/renter/upload/") + siaPath, { {L"source", tempFilePath} }, response);
|
||||||
|
if (ApiSuccess(cerror))
|
||||||
|
{
|
||||||
|
// TODO Update status in DB
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (const SQLite::Exception& e)
|
||||||
|
{
|
||||||
|
// error condition
|
||||||
|
std::string msg = e.what();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
UploadStatus CUploadManager::GetUploadStatus(const String& siaPath) const
|
UploadStatus CUploadManager::GetUploadStatus(const String& siaPath)
|
||||||
{
|
{
|
||||||
return UploadStatus::NotFound;
|
UploadStatus uploadStatus = UploadStatus::NotFound;
|
||||||
|
|
||||||
|
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH);
|
||||||
|
query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||||
|
if (query.executeStep())
|
||||||
|
{
|
||||||
|
uploadStatus = static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return uploadStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
void CUploadManager::AddOrUpdate(const String& siaPath, const String& filePath)
|
void CUploadManager::AddOrUpdate(const String& siaPath, const String& filePath)
|
||||||
{
|
{
|
||||||
|
SQLite::Statement query(_uploadDatabase, QUERY_UPLOADS_BY_SIA_PATH_AND_2_STATUS);
|
||||||
|
query.bind("@sia_path", CW2A(siaPath.c_str()).m_psz);
|
||||||
|
query.bind("@status1", static_cast<unsigned>(UploadStatus::Uploading));
|
||||||
|
query.bind("@status2", static_cast<unsigned>(UploadStatus::Modified));
|
||||||
|
|
||||||
|
// TODO Lock here - if file is modified again before a prior upload is complete, delete it and start again later
|
||||||
|
if (query.executeStep())
|
||||||
|
{
|
||||||
|
if (static_cast<UploadStatus>(static_cast<unsigned>(query.getColumn(2))) == UploadStatus::Uploading)
|
||||||
|
{
|
||||||
|
// set to modified
|
||||||
|
// update file path
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void CUploadManager::PurgeCompleteStatus()
|
void CUploadManager::PurgeCompleteStatus()
|
||||||
|
@@ -43,7 +43,7 @@ protected:
|
|||||||
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;
|
virtual void AutoThreadCallback(const CSiaCurl& siaCurl, CSiaDriveConfig* siaDriveConfig) override;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
_UploadStatus GetUploadStatus(const String& siaPath) const;
|
_UploadStatus GetUploadStatus(const String& siaPath);
|
||||||
void AddOrUpdate(const String& siaPath, const String& filePath);
|
void AddOrUpdate(const String& siaPath, const String& filePath);
|
||||||
void Remove(const String& siaPath);
|
void Remove(const String& siaPath);
|
||||||
void PurgeCompleteStatus();
|
void PurgeCompleteStatus();
|
||||||
|
@@ -232,6 +232,7 @@ private:
|
|||||||
// If file isn't cached, delete from Sia only
|
// If file isn't cached, delete from Sia only
|
||||||
if (!PathFileExists(cacheFilePath.c_str()) || ::DeleteFile(cacheFilePath.c_str()))
|
if (!PathFileExists(cacheFilePath.c_str()) || ::DeleteFile(cacheFilePath.c_str()))
|
||||||
{
|
{
|
||||||
|
// TODO May not be necessary to delete, just queue another upload
|
||||||
if (!ApiSuccess(_siaApi->GetRenter()->DeleteFile(siaPath)))
|
if (!ApiSuccess(_siaApi->GetRenter()->DeleteFile(siaPath)))
|
||||||
{
|
{
|
||||||
ret = STATUS_INVALID_SERVER_STATE;
|
ret = STATUS_INVALID_SERVER_STATE;
|
||||||
|
Reference in New Issue
Block a user