Merge pull request #12286 from Atlante45/feat/asset-server-backup

Add asset server backups capabilities to the Domain Server
This commit is contained in:
Stephen Birarda 2018-02-13 17:40:11 -07:00 committed by GitHub
commit 6ba2f4b279
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 675 additions and 144 deletions

View file

@ -464,32 +464,41 @@ void AssetServer::handleAssetMappingOperation(QSharedPointer<ReceivedMessage> me
auto replyPacket = NLPacketList::create(PacketType::AssetMappingOperationReply, QByteArray(), true, true);
replyPacket->writePrimitive(messageID);
bool canWriteToAssetServer = true;
if (senderNode) {
canWriteToAssetServer = senderNode->getCanWriteToAssetServer();
}
switch (operationType) {
case AssetMappingOperationType::Get:
handleGetMappingOperation(*message, senderNode, *replyPacket);
handleGetMappingOperation(*message, *replyPacket);
break;
case AssetMappingOperationType::GetAll:
handleGetAllMappingOperation(*message, senderNode, *replyPacket);
handleGetAllMappingOperation(*replyPacket);
break;
case AssetMappingOperationType::Set:
handleSetMappingOperation(*message, senderNode, *replyPacket);
handleSetMappingOperation(*message, canWriteToAssetServer, *replyPacket);
break;
case AssetMappingOperationType::Delete:
handleDeleteMappingsOperation(*message, senderNode, *replyPacket);
handleDeleteMappingsOperation(*message, canWriteToAssetServer, *replyPacket);
break;
case AssetMappingOperationType::Rename:
handleRenameMappingOperation(*message, senderNode, *replyPacket);
handleRenameMappingOperation(*message, canWriteToAssetServer, *replyPacket);
break;
case AssetMappingOperationType::SetBakingEnabled:
handleSetBakingEnabledOperation(*message, senderNode, *replyPacket);
handleSetBakingEnabledOperation(*message, canWriteToAssetServer, *replyPacket);
break;
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(replyPacket), *senderNode);
if (senderNode) {
nodeList->sendPacketList(std::move(replyPacket), *senderNode);
} else {
nodeList->sendPacketList(std::move(replyPacket), message->getSenderSockAddr());
}
}
void AssetServer::handleGetMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
void AssetServer::handleGetMappingOperation(ReceivedMessage& message, NLPacketList& replyPacket) {
QString assetPath = message.readString();
QUrl url { assetPath };
@ -568,7 +577,7 @@ void AssetServer::handleGetMappingOperation(ReceivedMessage& message, SharedNode
}
}
void AssetServer::handleGetAllMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
void AssetServer::handleGetAllMappingOperation(NLPacketList& replyPacket) {
replyPacket.writePrimitive(AssetUtils::AssetServerError::NoError);
uint32_t count = (uint32_t)_fileMappings.size();
@ -591,8 +600,8 @@ void AssetServer::handleGetAllMappingOperation(ReceivedMessage& message, SharedN
}
}
void AssetServer::handleSetMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
if (senderNode->getCanWriteToAssetServer()) {
void AssetServer::handleSetMappingOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket) {
if (hasWriteAccess) {
QString assetPath = message.readString();
auto assetHash = message.read(AssetUtils::SHA256_HASH_LENGTH).toHex();
@ -614,8 +623,8 @@ void AssetServer::handleSetMappingOperation(ReceivedMessage& message, SharedNode
}
}
void AssetServer::handleDeleteMappingsOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
if (senderNode->getCanWriteToAssetServer()) {
void AssetServer::handleDeleteMappingsOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket) {
if (hasWriteAccess) {
int numberOfDeletedMappings { 0 };
message.readPrimitive(&numberOfDeletedMappings);
@ -642,8 +651,8 @@ void AssetServer::handleDeleteMappingsOperation(ReceivedMessage& message, Shared
}
}
void AssetServer::handleRenameMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
if (senderNode->getCanWriteToAssetServer()) {
void AssetServer::handleRenameMappingOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket) {
if (hasWriteAccess) {
QString oldPath = message.readString();
QString newPath = message.readString();
@ -664,8 +673,8 @@ void AssetServer::handleRenameMappingOperation(ReceivedMessage& message, SharedN
}
}
void AssetServer::handleSetBakingEnabledOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket) {
if (senderNode->getCanWriteToAssetServer()) {
void AssetServer::handleSetBakingEnabledOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket) {
if (hasWriteAccess) {
bool enabled { true };
message.readPrimitive(&enabled);
@ -739,9 +748,14 @@ void AssetServer::handleAssetGet(QSharedPointer<ReceivedMessage> message, Shared
}
void AssetServer::handleAssetUpload(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
bool canWriteToAssetServer = true;
if (senderNode) {
canWriteToAssetServer = senderNode->getCanWriteToAssetServer();
}
if (senderNode->getCanWriteToAssetServer()) {
qCDebug(asset_server) << "Starting an UploadAssetTask for upload from" << uuidStringWithoutCurlyBraces(senderNode->getUUID());
if (canWriteToAssetServer) {
qCDebug(asset_server) << "Starting an UploadAssetTask for upload from" << uuidStringWithoutCurlyBraces(message->getSourceID());
auto task = new UploadAssetTask(message, senderNode, _filesDirectory, _filesizeLimit);
_transferTaskPool.start(task);
@ -761,7 +775,11 @@ void AssetServer::handleAssetUpload(QSharedPointer<ReceivedMessage> message, Sha
// send off the packet
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(permissionErrorPacket), *senderNode);
if (senderNode) {
nodeList->sendPacket(std::move(permissionErrorPacket), *senderNode);
} else {
nodeList->sendPacket(std::move(permissionErrorPacket), message->getSenderSockAddr());
}
}
}

View file

@ -29,9 +29,6 @@ namespace std {
}
struct AssetMeta {
AssetMeta() {
}
int bakeVersion { 0 };
bool failedLastBake { false };
QString lastBakeErrors;
@ -60,14 +57,15 @@ private slots:
void sendStatsPacket() override;
private:
using Mappings = std::unordered_map<QString, QString>;
void handleGetMappingOperation(ReceivedMessage& message, NLPacketList& replyPacket);
void handleGetAllMappingOperation(NLPacketList& replyPacket);
void handleSetMappingOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket);
void handleDeleteMappingsOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket);
void handleRenameMappingOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket);
void handleSetBakingEnabledOperation(ReceivedMessage& message, bool hasWriteAccess, NLPacketList& replyPacket);
void handleGetMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleGetAllMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleSetMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleDeleteMappingsOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleRenameMappingOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleSetBakingEnabledOperation(ReceivedMessage& message, SharedNodePointer senderNode, NLPacketList& replyPacket);
void handleAssetServerBackup(ReceivedMessage& message, NLPacketList& replyPacket);
void handleAssetServerRestore(ReceivedMessage& message, NLPacketList& replyPacket);
// Mapping file operations must be called from main assignment thread only
bool loadMappingsFromFile();
@ -111,7 +109,7 @@ private:
/// Remove baked paths when the original asset is deleteds
void removeBakedPathsForDeletedAsset(AssetUtils::AssetHash originalAssetHash);
Mappings _fileMappings;
AssetUtils::Mappings _fileMappings;
QDir _resourcesDirectory;
QDir _filesDirectory;

View file

@ -112,5 +112,9 @@ void SendAssetTask::run() {
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(replyPacketList), *_senderNode);
if (_senderNode) {
nodeList->sendPacketList(std::move(replyPacketList), *_senderNode);
} else {
nodeList->sendPacketList(std::move(replyPacketList), _message->getSenderSockAddr());
}
}

View file

@ -41,9 +41,12 @@ void UploadAssetTask::run() {
uint64_t fileSize;
buffer.read(reinterpret_cast<char*>(&fileSize), sizeof(fileSize));
qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes from"
<< uuidStringWithoutCurlyBraces(_senderNode->getUUID());
if (_senderNode) {
qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID());
} else {
qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes from" << _receivedMessage->getSenderSockAddr();
}
auto replyPacket = NLPacket::create(PacketType::AssetUploadReply, -1, true);
replyPacket->writePrimitive(messageID);
@ -55,9 +58,12 @@ void UploadAssetTask::run() {
auto hash = AssetUtils::hashData(fileData);
auto hexHash = hash.toHex();
qDebug() << "Hash for uploaded file from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID())
<< "is: (" << hexHash << ") ";
if (_senderNode) {
qDebug() << "Hash for uploaded file from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID()) << "is: (" << hexHash << ")";
} else {
qDebug() << "Hash for uploaded file from" << _receivedMessage->getSenderSockAddr() << "is: (" << hexHash << ")";
}
QFile file { _resourcesDir.filePath(QString(hexHash)) };
@ -103,5 +109,9 @@ void UploadAssetTask::run() {
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *_senderNode);
if (_senderNode) {
nodeList->sendPacket(std::move(replyPacket), *_senderNode);
} else {
nodeList->sendPacket(std::move(replyPacket), _receivedMessage->getSenderSockAddr());
}
}

View file

@ -0,0 +1,400 @@
//
// BackupSupervisor.cpp
// domain-server/src
//
// Created by Clement Brisset on 1/12/18.
// Copyright 2018 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "BackupSupervisor.h"
#include <QJsonDocument>
#include <QDate>
#include <AssetClient.h>
#include <AssetRequest.h>
#include <AssetUpload.h>
#include <MappingRequest.h>
#include <PathUtils.h>
const QString BACKUPS_DIR = "backups/";
const QString ASSETS_DIR = "files/";
const QString MAPPINGS_PREFIX = "mappings-";
using namespace std;
BackupSupervisor::BackupSupervisor() {
_backupsDirectory = PathUtils::getAppDataPath() + BACKUPS_DIR;
QDir backupDir { _backupsDirectory };
if (!backupDir.exists()) {
backupDir.mkpath(".");
}
_assetsDirectory = PathUtils::getAppDataPath() + BACKUPS_DIR + ASSETS_DIR;
QDir assetsDir { _assetsDirectory };
if (!assetsDir.exists()) {
assetsDir.mkpath(".");
}
loadAllBackups();
}
void BackupSupervisor::loadAllBackups() {
_backups.clear();
_assetsInBackups.clear();
_assetsOnDisk.clear();
_allBackupsLoadedSuccessfully = true;
QDir assetsDir { _assetsDirectory };
auto assetNames = assetsDir.entryList(QDir::Files);
qDebug() << "Loading" << assetNames.size() << "assets.";
// store all valid hashes
copy_if(begin(assetNames), end(assetNames),
inserter(_assetsOnDisk, begin(_assetsOnDisk)), AssetUtils::isValidHash);
QDir backupsDir { _backupsDirectory };
auto files = backupsDir.entryList({ MAPPINGS_PREFIX + "*.json" }, QDir::Files);
qDebug() << "Loading" << files.size() << "backups.";
for (const auto& fileName : files) {
auto filePath = backupsDir.filePath(fileName);
auto success = loadBackup(filePath);
if (!success) {
qCritical() << "Failed to load backup file" << filePath;
_allBackupsLoadedSuccessfully = false;
}
}
vector<AssetUtils::AssetHash> missingAssets;
set_difference(begin(_assetsInBackups), end(_assetsInBackups),
begin(_assetsOnDisk), end(_assetsOnDisk),
back_inserter(missingAssets));
if (missingAssets.size() > 0) {
qWarning() << "Found" << missingAssets.size() << "assets missing.";
}
vector<AssetUtils::AssetHash> deprecatedAssets;
set_difference(begin(_assetsOnDisk), end(_assetsOnDisk),
begin(_assetsInBackups), end(_assetsInBackups),
back_inserter(deprecatedAssets));
if (deprecatedAssets.size() > 0) {
qDebug() << "Found" << deprecatedAssets.size() << "assets to delete.";
if (_allBackupsLoadedSuccessfully) {
for (const auto& hash : deprecatedAssets) {
QFile::remove(_assetsDirectory + hash);
}
} else {
qWarning() << "Some backups did not load properly, aborting deleting for safety.";
}
}
}
bool BackupSupervisor::loadBackup(const QString& backupFile) {
_backups.push_back({ backupFile.toStdString(), {}, false });
auto& backup = _backups.back();
QFile file { backupFile };
if (!file.open(QFile::ReadOnly)) {
qCritical() << "Could not open backup file:" << backupFile;
backup.corruptedBackup = true;
return false;
}
QJsonParseError error;
auto document = QJsonDocument::fromJson(file.readAll(), &error);
if (document.isNull() || !document.isObject()) {
qCritical() << "Could not parse backup file to JSON object:" << backupFile;
qCritical() << " Error:" << error.errorString();
backup.corruptedBackup = true;
return false;
}
auto jsonObject = document.object();
for (auto it = begin(jsonObject); it != end(jsonObject); ++it) {
const auto& assetPath = it.key();
const auto& assetHash = it.value().toString();
if (!AssetUtils::isValidHash(assetHash)) {
qCritical() << "Corrupted mapping in backup file" << backupFile << ":" << it.key();
backup.corruptedBackup = true;
return false;
}
backup.mappings[assetPath] = assetHash;
_assetsInBackups.insert(assetHash);
}
_backups.push_back(backup);
return true;
}
void BackupSupervisor::backupAssetServer() {
if (backupInProgress() || restoreInProgress()) {
qWarning() << "There is already a backup/restore in progress.";
return;
}
auto assetClient = DependencyManager::get<AssetClient>();
auto request = assetClient->createGetAllMappingsRequest();
connect(request, &GetAllMappingsRequest::finished, this, [this](GetAllMappingsRequest* request) {
qDebug() << "Got" << request->getMappings().size() << "mappings!";
if (request->getError() != MappingRequest::NoError) {
qCritical() << "Could not complete backup.";
qCritical() << " Error:" << request->getErrorString();
finishBackup();
request->deleteLater();
return;
}
if (!writeBackupFile(request->getMappings())) {
finishBackup();
request->deleteLater();
return;
}
assert(!_backups.empty());
const auto& mappings = _backups.back().mappings;
backupMissingFiles(mappings);
request->deleteLater();
});
startBackup();
request->start();
}
void BackupSupervisor::backupMissingFiles(const AssetUtils::Mappings& mappings) {
_assetsLeftToRequest.reserve(mappings.size());
for (auto& mapping : mappings) {
const auto& hash = mapping.second;
if (_assetsOnDisk.find(hash) == end(_assetsOnDisk)) {
_assetsLeftToRequest.push_back(hash);
}
}
backupNextMissingFile();
}
void BackupSupervisor::backupNextMissingFile() {
if (_assetsLeftToRequest.empty()) {
finishBackup();
return;
}
auto hash = _assetsLeftToRequest.back();
_assetsLeftToRequest.pop_back();
auto assetClient = DependencyManager::get<AssetClient>();
auto assetRequest = assetClient->createRequest(hash);
connect(assetRequest, &AssetRequest::finished, this, [this](AssetRequest* request) {
if (request->getError() == AssetRequest::NoError) {
qDebug() << "Got" << request->getHash();
bool success = writeAssetFile(request->getHash(), request->getData());
if (!success) {
qCritical() << "Failed to write asset file" << request->getHash();
}
} else {
qCritical() << "Failed to backup asset" << request->getHash();
}
backupNextMissingFile();
request->deleteLater();
});
assetRequest->start();
}
bool BackupSupervisor::writeBackupFile(const AssetUtils::AssetMappings& mappings) {
auto filename = MAPPINGS_PREFIX + QDateTime::currentDateTimeUtc().toString(Qt::ISODate) + ".json";
QFile file { PathUtils::getAppDataPath() + BACKUPS_DIR + filename };
if (!file.open(QFile::WriteOnly)) {
qCritical() << "Could not open backup file" << file.fileName();
return false;
}
AssetServerBackup backup;
QJsonObject jsonObject;
for (auto& mapping : mappings) {
backup.mappings[mapping.first] = mapping.second.hash;
_assetsInBackups.insert(mapping.second.hash);
jsonObject.insert(mapping.first, mapping.second.hash);
}
QJsonDocument document(jsonObject);
file.write(document.toJson());
backup.filePath = file.fileName().toStdString();
_backups.push_back(backup);
return true;
}
bool BackupSupervisor::writeAssetFile(const AssetUtils::AssetHash& hash, const QByteArray& data) {
QDir assetsDir { _assetsDirectory };
QFile file { assetsDir.filePath(hash) };
if (!file.open(QFile::WriteOnly)) {
qCritical() << "Could not open backup file" << file.fileName();
return false;
}
file.write(data);
_assetsOnDisk.insert(hash);
return true;
}
void BackupSupervisor::restoreAssetServer(int backupIndex) {
if (backupInProgress() || restoreInProgress()) {
qWarning() << "There is already a backup/restore in progress.";
return;
}
auto assetClient = DependencyManager::get<AssetClient>();
auto request = assetClient->createGetAllMappingsRequest();
connect(request, &GetAllMappingsRequest::finished, this, [this, backupIndex](GetAllMappingsRequest* request) {
if (request->getError() == MappingRequest::NoError) {
const auto& newMappings = _backups.at(backupIndex).mappings;
computeServerStateDifference(request->getMappings(), newMappings);
restoreAllAssets();
} else {
finishRestore();
}
request->deleteLater();
});
startRestore();
request->start();
}
void BackupSupervisor::computeServerStateDifference(const AssetUtils::AssetMappings& currentMappings,
const AssetUtils::Mappings& newMappings) {
_mappingsLeftToSet.reserve((int)newMappings.size());
_assetsLeftToUpload.reserve((int)newMappings.size());
_mappingsLeftToDelete.reserve((int)currentMappings.size());
set<AssetUtils::AssetHash> currentAssets;
for (const auto& currentMapping : currentMappings) {
const auto& currentPath = currentMapping.first;
const auto& currentHash = currentMapping.second.hash;
if (newMappings.find(currentPath) == end(newMappings)) {
_mappingsLeftToDelete.push_back(currentPath);
}
currentAssets.insert(currentHash);
}
for (const auto& newMapping : newMappings) {
const auto& newPath = newMapping.first;
const auto& newHash = newMapping.second;
auto it = currentMappings.find(newPath);
if (it == end(currentMappings) || it->second.hash != newHash) {
_mappingsLeftToSet.push_back({ newPath, newHash });
}
if (currentAssets.find(newHash) == end(currentAssets)) {
_assetsLeftToUpload.push_back(newHash);
}
}
qDebug() << "Mappings to set:" << _mappingsLeftToSet.size();
qDebug() << "Mappings to del:" << _mappingsLeftToDelete.size();
qDebug() << "Assets to upload:" << _assetsLeftToUpload.size();
}
void BackupSupervisor::restoreAllAssets() {
restoreNextAsset();
}
void BackupSupervisor::restoreNextAsset() {
if (_assetsLeftToUpload.empty()) {
updateMappings();
return;
}
auto hash = _assetsLeftToUpload.back();
_assetsLeftToUpload.pop_back();
auto assetFilename = _assetsDirectory + hash;
auto assetClient = DependencyManager::get<AssetClient>();
auto request = assetClient->createUpload(assetFilename);
connect(request, &AssetUpload::finished, this, [this](AssetUpload* request) {
if (request->getError() != AssetUpload::NoError) {
qCritical() << "Failed to restore asset:" << request->getFilename();
qCritical() << " Error:" << request->getErrorString();
}
restoreNextAsset();
request->deleteLater();
});
request->start();
}
void BackupSupervisor::updateMappings() {
auto assetClient = DependencyManager::get<AssetClient>();
for (const auto& mapping : _mappingsLeftToSet) {
auto request = assetClient->createSetMappingRequest(mapping.first, mapping.second);
connect(request, &SetMappingRequest::finished, this, [this](SetMappingRequest* request) {
if (request->getError() != MappingRequest::NoError) {
qCritical() << "Failed to set mapping:" << request->getPath();
qCritical() << " Error:" << request->getErrorString();
}
if (--_mappingRequestsInFlight == 0) {
finishRestore();
}
request->deleteLater();
});
request->start();
++_mappingRequestsInFlight;
}
_mappingsLeftToSet.clear();
auto request = assetClient->createDeleteMappingsRequest(_mappingsLeftToDelete);
connect(request, &DeleteMappingsRequest::finished, this, [this](DeleteMappingsRequest* request) {
if (request->getError() != MappingRequest::NoError) {
qCritical() << "Failed to delete mappings";
qCritical() << " Error:" << request->getErrorString();
}
if (--_mappingRequestsInFlight == 0) {
finishRestore();
}
request->deleteLater();
});
_mappingsLeftToDelete.clear();
request->start();
++_mappingRequestsInFlight;
}
bool BackupSupervisor::deleteBackup(int backupIndex) {
if (backupInProgress() || restoreInProgress()) {
qWarning() << "There is a backup/restore in progress.";
return false;
}
const auto& filePath = _backups.at(backupIndex).filePath;
auto success = QFile::remove(filePath.c_str());
loadAllBackups();
return success;
}

View file

@ -0,0 +1,85 @@
//
// BackupSupervisor.h
// domain-server/src
//
// Created by Clement Brisset on 1/12/18.
// Copyright 2018 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_BackupSupervisor_h
#define hifi_BackupSupervisor_h
#include <set>
#include <map>
#include <QObject>
#include <AssetUtils.h>
#include <ReceivedMessage.h>
struct AssetServerBackup {
std::string filePath;
AssetUtils::Mappings mappings;
bool corruptedBackup;
};
class BackupSupervisor : public QObject {
Q_OBJECT
public:
BackupSupervisor();
void backupAssetServer();
void restoreAssetServer(int backupIndex);
bool deleteBackup(int backupIndex);
const std::vector<AssetServerBackup>& getBackups() const { return _backups; };
bool backupInProgress() const { return _backupInProgress; }
bool restoreInProgress() const { return _restoreInProgress; }
private:
void loadAllBackups();
bool loadBackup(const QString& backupFile);
void startBackup() { _backupInProgress = true; }
void finishBackup() { _backupInProgress = false; }
void backupMissingFiles(const AssetUtils::Mappings& mappings);
void backupNextMissingFile();
bool writeBackupFile(const AssetUtils::AssetMappings& mappings);
bool writeAssetFile(const AssetUtils::AssetHash& hash, const QByteArray& data);
void startRestore() { _restoreInProgress = true; }
void finishRestore() { _restoreInProgress = false; }
void computeServerStateDifference(const AssetUtils::AssetMappings& currentMappings,
const AssetUtils::Mappings& newMappings);
void restoreAllAssets();
void restoreNextAsset();
void updateMappings();
QString _backupsDirectory;
QString _assetsDirectory;
// Internal storage for backups on disk
bool _allBackupsLoadedSuccessfully { false };
std::vector<AssetServerBackup> _backups;
std::set<AssetUtils::AssetHash> _assetsInBackups;
std::set<AssetUtils::AssetHash> _assetsOnDisk;
// Internal storage for backup in progress
bool _backupInProgress { false };
std::vector<AssetUtils::AssetHash> _assetsLeftToRequest;
// Internal storage for restore in progress
bool _restoreInProgress { false };
std::vector<AssetUtils::AssetHash> _assetsLeftToUpload;
std::vector<std::pair<AssetUtils::AssetPath, AssetUtils::AssetHash>> _mappingsLeftToSet;
AssetUtils::AssetPathList _mappingsLeftToDelete;
int _mappingRequestsInFlight { 0 };
};
#endif /* hifi_BackupSupervisor_h */

View file

@ -26,6 +26,7 @@
#include <QCommandLineParser>
#include <AccountManager.h>
#include <AssetClient.h>
#include <BuildInfo.h>
#include <DependencyManager.h>
#include <HifiConfigVariantMap.h>
@ -343,6 +344,12 @@ void DomainServer::parseCommandLine() {
DomainServer::~DomainServer() {
qInfo() << "Domain Server is shutting down.";
// cleanup the AssetClient thread
DependencyManager::destroy<AssetClient>();
_assetClientThread.quit();
_assetClientThread.wait();
// destroy the LimitedNodeList before the DomainServer QCoreApplication is down
DependencyManager::destroy<LimitedNodeList>();
}
@ -684,11 +691,17 @@ void DomainServer::setupNodeListAndAssignments() {
packetReceiver.registerListener(PacketType::ICEServerHeartbeatDenied, this, "processICEServerHeartbeatDenialPacket");
packetReceiver.registerListener(PacketType::ICEServerHeartbeatACK, this, "processICEServerHeartbeatACK");
// add whatever static assignments that have been parsed to the queue
addStaticAssignmentsToQueue();
// set a custom packetVersionMatch as the verify packet operator for the udt::Socket
nodeList->setPacketFilterOperator(&DomainServer::isPacketVerified);
_assetClientThread.setObjectName("AssetClient Thread");
auto assetClient = DependencyManager::set<AssetClient>();
assetClient->moveToThread(&_assetClientThread);
QObject::connect(&_assetClientThread, &QThread::started, assetClient.data(), &AssetClient::init);
_assetClientThread.start();
// add whatever static assignments that have been parsed to the queue
addStaticAssignmentsToQueue();
}
bool DomainServer::resetAccountManagerAccessToken() {

View file

@ -18,6 +18,7 @@
#include <QtCore/QQueue>
#include <QtCore/QSharedPointer>
#include <QtCore/QStringList>
#include <QtCore/QThread>
#include <QtCore/QUrl>
#include <QAbstractNativeEventFilter>
@ -25,6 +26,7 @@
#include <HTTPSConnection.h>
#include <LimitedNodeList.h>
#include "BackupSupervisor.h"
#include "DomainGatekeeper.h"
#include "DomainMetadata.h"
#include "DomainServerSettingsManager.h"
@ -251,6 +253,8 @@ private:
bool _sendICEServerAddressToMetaverseAPIRedo { false };
QHash<QUuid, QPointer<HTTPSConnection>> _pendingOAuthConnections;
QThread _assetClientThread;
};

View file

@ -40,7 +40,7 @@ AssetClient::AssetClient() {
static_cast<AssetClient*>(dependency)->deleteLater();
});
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetMappingOperationReply, this, "handleAssetMappingOperationReply");
@ -308,7 +308,7 @@ void AssetClient::handleAssetMappingOperationReply(QSharedPointer<ReceivedMessag
}
bool haveAssetServer() {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (!assetServer) {
@ -402,7 +402,7 @@ MessageID AssetClient::getAsset(const QString& hash, AssetUtils::DataOffset star
return false;
}
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -435,7 +435,7 @@ MessageID AssetClient::getAsset(const QString& hash, AssetUtils::DataOffset star
MessageID AssetClient::getAssetInfo(const QString& hash, GetInfoCallback callback) {
Q_ASSERT(QThread::currentThread() == thread());
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -635,7 +635,7 @@ void AssetClient::handleCompleteCallback(const QWeakPointer<Node>& node, Message
MessageID AssetClient::getAssetMapping(const AssetUtils::AssetPath& path, MappingOperationCallback callback) {
Q_ASSERT(QThread::currentThread() == thread());
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -662,7 +662,7 @@ MessageID AssetClient::getAssetMapping(const AssetUtils::AssetPath& path, Mappin
MessageID AssetClient::getAllAssetMappings(MappingOperationCallback callback) {
Q_ASSERT(QThread::currentThread() == thread());
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -685,7 +685,7 @@ MessageID AssetClient::getAllAssetMappings(MappingOperationCallback callback) {
}
MessageID AssetClient::deleteAssetMappings(const AssetUtils::AssetPathList& paths, MappingOperationCallback callback) {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -716,7 +716,7 @@ MessageID AssetClient::deleteAssetMappings(const AssetUtils::AssetPathList& path
MessageID AssetClient::setAssetMapping(const QString& path, const AssetUtils::AssetHash& hash, MappingOperationCallback callback) {
Q_ASSERT(QThread::currentThread() == thread());
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -742,7 +742,7 @@ MessageID AssetClient::setAssetMapping(const QString& path, const AssetUtils::As
}
MessageID AssetClient::renameAssetMapping(const AssetUtils::AssetPath& oldPath, const AssetUtils::AssetPath& newPath, MappingOperationCallback callback) {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -769,7 +769,7 @@ MessageID AssetClient::renameAssetMapping(const AssetUtils::AssetPath& oldPath,
}
MessageID AssetClient::setBakingEnabled(const AssetUtils::AssetPathList& paths, bool enabled, MappingOperationCallback callback) {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
@ -859,7 +859,7 @@ bool AssetClient::cancelUploadAssetRequest(MessageID id) {
MessageID AssetClient::uploadAsset(const QByteArray& data, UploadResultCallback callback) {
Q_ASSERT(QThread::currentThread() == thread());
auto nodeList = DependencyManager::get<NodeList>();
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {

View file

@ -44,7 +44,9 @@ enum AssetServerError : uint8_t {
AssetTooLarge,
PermissionDenied,
MappingOperationFailed,
FileOperationFailed
FileOperationFailed,
NoAssetServer,
LostConnection
};
enum AssetMappingOperationType : uint8_t {
@ -71,7 +73,8 @@ struct MappingInfo {
QString bakingErrors;
};
using AssetMapping = std::map<AssetPath, MappingInfo>;
using AssetMappings = std::map<AssetPath, MappingInfo>;
using Mappings = std::map<AssetPath, AssetHash>;
QUrl getATPUrl(const QString& input);
AssetHash extractAssetHash(const QString& input);

View file

@ -49,7 +49,7 @@ public:
const QHostAddress& getIP() const { return _sockAddr.getAddress(); }
void setIPToLocalhost() { _sockAddr.setAddress(QHostAddress(QHostAddress::LocalHost)); }
const HifiSockAddr& getSockAddr() { return _sockAddr; }
const HifiSockAddr& getSockAddr() const { return _sockAddr; }
void setSockAddr(const HifiSockAddr& sockAddr, const QString& hostname);
unsigned short getPort() const { return _sockAddr.getPort(); }

View file

@ -90,21 +90,16 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) :
updateLocalSocket();
// set &PacketReceiver::handleVerifiedPacket as the verified packet callback for the udt::Socket
_nodeSocket.setPacketHandler(
[this](std::unique_ptr<udt::Packet> packet) {
_nodeSocket.setPacketHandler([this](std::unique_ptr<udt::Packet> packet) {
_packetReceiver->handleVerifiedPacket(std::move(packet));
}
);
_nodeSocket.setMessageHandler(
[this](std::unique_ptr<udt::Packet> packet) {
});
_nodeSocket.setMessageHandler([this](std::unique_ptr<udt::Packet> packet) {
_packetReceiver->handleVerifiedMessagePacket(std::move(packet));
}
);
_nodeSocket.setMessageFailureHandler(
[this](HifiSockAddr from, udt::Packet::MessageNumber messageNumber) {
});
_nodeSocket.setMessageFailureHandler([this](HifiSockAddr from,
udt::Packet::MessageNumber messageNumber) {
_packetReceiver->handleMessageFailure(from, messageNumber);
}
);
});
// set our isPacketVerified method as the verify operator for the udt::Socket
using std::placeholders::_1;
@ -309,8 +304,19 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
sourceNode = matchingNode.data();
}
if (!sourceNode &&
sourceID == getDomainUUID() &&
packet.getSenderSockAddr() == getDomainSockAddr() &&
PacketTypeEnum::getDomainSourcedPackets().contains(headerType)) {
// This is a packet sourced by the domain server
emit dataReceived(NodeType::Unassigned, packet.getPayloadSize());
return true;
}
if (sourceNode) {
if (!PacketTypeEnum::getNonVerifiedPackets().contains(headerType)) {
if (!PacketTypeEnum::getNonVerifiedPackets().contains(headerType) &&
!isDomainServer()) {
QByteArray packetHeaderHash = NLPacket::verificationHashInHeader(packet);
QByteArray expectedHash = NLPacket::hashForPacketAndSecret(packet, sourceNode->getConnectionSecret());

View file

@ -124,6 +124,10 @@ public:
PacketReceiver& getPacketReceiver() { return *_packetReceiver; }
virtual bool isDomainServer() const { return true; }
virtual QUuid getDomainUUID() const { assert(false); return QUuid(); }
virtual HifiSockAddr getDomainSockAddr() const { assert(false); return HifiSockAddr(); }
// use sendUnreliablePacket to send an unrelaible packet (that you do not need to move)
// either to a node (via its active socket) or to a manual sockaddr
qint64 sendUnreliablePacket(const NLPacket& packet, const Node& destinationNode);

View file

@ -106,9 +106,6 @@ void GetMappingRequest::doStart() {
});
};
GetAllMappingsRequest::GetAllMappingsRequest() {
};
void GetAllMappingsRequest::doStart() {
auto assetClient = DependencyManager::get<AssetClient>();
_mappingRequestID = assetClient->getAllAssetMappings(

View file

@ -120,17 +120,15 @@ private:
class GetAllMappingsRequest : public MappingRequest {
Q_OBJECT
public:
GetAllMappingsRequest();
AssetUtils::AssetMapping getMappings() const { return _mappings; }
AssetUtils::AssetMappings getMappings() const { return _mappings; }
signals:
void finished(GetAllMappingsRequest* thisRequest);
private:
virtual void doStart() override;
AssetUtils::AssetMapping _mappings;
AssetUtils::AssetMappings _mappings;
};
class SetBakingEnabledRequest : public MappingRequest {

View file

@ -92,6 +92,10 @@ public:
void removeFromIgnoreMuteSets(const QUuid& nodeID);
virtual bool isDomainServer() const override { return false; }
virtual QUuid getDomainUUID() const override { return _domainHandler.getUUID(); }
virtual HifiSockAddr getDomainSockAddr() const override { return _domainHandler.getSockAddr(); }
public slots:
void reset(bool skipDomainHandlerReset = false);
void resetFromDomainHandler() { reset(true); }

View file

@ -267,10 +267,7 @@ void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> recei
QMutexLocker packetListenerLocker(&_packetListenerLock);
bool listenerIsDead = false;
auto it = _messageListenerMap.find(receivedMessage->getType());
if (it != _messageListenerMap.end() && it->method.isValid()) {
auto listener = it.value();
@ -278,82 +275,61 @@ void PacketReceiver::handleVerifiedMessage(QSharedPointer<ReceivedMessage> recei
if ((listener.deliverPending && !justReceived) || (!listener.deliverPending && !receivedMessage->isComplete())) {
return;
}
if (listener.object) {
bool success = false;
bool success = false;
Qt::ConnectionType connectionType;
// check if this is a directly connected listener
{
QMutexLocker directConnectLocker(&_directConnectSetMutex);
connectionType = _directlyConnectedObjects.contains(listener.object) ? Qt::DirectConnection : Qt::AutoConnection;
}
PacketType packetType = receivedMessage->getType();
if (matchingNode) {
matchingNode->recordBytesReceived(receivedMessage->getSize());
QMetaMethod metaMethod = listener.method;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
// one final check on the QPointer before we go to invoke
if (listener.object) {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage));
}
} else {
listenerIsDead = true;
}
} else {
// one final check on the QPointer before we invoke
if (listener.object) {
success = listener.method.invoke(listener.object,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage));
} else {
listenerIsDead = true;
}
}
if (!success) {
qCDebug(networking).nospace() << "Error delivering packet " << packetType << " to listener "
<< listener.object << "::" << qPrintable(listener.method.methodSignature());
}
} else {
listenerIsDead = true;
Qt::ConnectionType connectionType;
// check if this is a directly connected listener
{
QMutexLocker directConnectLocker(&_directConnectSetMutex);
connectionType = _directlyConnectedObjects.contains(listener.object) ? Qt::DirectConnection : Qt::AutoConnection;
}
if (listenerIsDead) {
if (matchingNode) {
matchingNode->recordBytesReceived(receivedMessage->getSize());
}
QMetaMethod metaMethod = listener.method;
static const QByteArray QSHAREDPOINTER_NODE_NORMALIZED = QMetaObject::normalizedType("QSharedPointer<Node>");
static const QByteArray SHARED_NODE_NORMALIZED = QMetaObject::normalizedType("SharedNodePointer");
// one final check on the QPointer before we go to invoke
if (listener.object) {
if (metaMethod.parameterTypes().contains(SHARED_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage),
Q_ARG(SharedNodePointer, matchingNode));
} else if (metaMethod.parameterTypes().contains(QSHAREDPOINTER_NODE_NORMALIZED)) {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage),
Q_ARG(QSharedPointer<Node>, matchingNode));
} else {
success = metaMethod.invoke(listener.object,
connectionType,
Q_ARG(QSharedPointer<ReceivedMessage>, receivedMessage));
}
} else {
qCDebug(networking).nospace() << "Listener for packet " << receivedMessage->getType()
<< " has been destroyed. Removing from listener map.";
it = _messageListenerMap.erase(it);
// if it exists, remove the listener from _directlyConnectedObjects
{
QMutexLocker directConnectLocker(&_directConnectSetMutex);
_directlyConnectedObjects.remove(listener.object);
}
}
if (!success) {
qCDebug(networking).nospace() << "Error delivering packet " << receivedMessage->getType() << " to listener "
<< listener.object << "::" << qPrintable(listener.method.methodSignature());
}
} else if (it == _messageListenerMap.end()) {
qCWarning(networking) << "No listener found for packet type" << receivedMessage->getType();

View file

@ -177,6 +177,17 @@ public:
<< PacketTypeEnum::Value::ReplicatedKillAvatar << PacketTypeEnum::Value::ReplicatedBulkAvatarData;
return NON_SOURCED_PACKETS;
}
const static QSet<PacketTypeEnum::Value> getDomainSourcedPackets() {
const static QSet<PacketTypeEnum::Value> DOMAIN_SOURCED_PACKETS = QSet<PacketTypeEnum::Value>()
<< PacketTypeEnum::Value::AssetMappingOperation
<< PacketTypeEnum::Value::AssetMappingOperationReply
<< PacketTypeEnum::Value::AssetGet
<< PacketTypeEnum::Value::AssetGetReply
<< PacketTypeEnum::Value::AssetUpload
<< PacketTypeEnum::Value::AssetUploadReply;
return DOMAIN_SOURCED_PACKETS;
}
};
using PacketType = PacketTypeEnum::Value;