Merge pull request #5787 from huffman/cleanup-asset-requests-on-node-kill

DO NOT MERGE: Cleanup asset requests on node kill
This commit is contained in:
Stephen Birarda 2015-09-14 13:08:01 -07:00
commit 8308dbf4a3
11 changed files with 219 additions and 93 deletions

View file

@ -134,6 +134,9 @@ void AssetUploadDialogFactory::handleUploadFinished(AssetUpload* upload, const Q
case AssetUpload::FileOpenError:
additionalError = "The file could not be opened. Please check your permissions and try again.";
break;
case AssetUpload::NetworkError:
additionalError = "The file could not be opened. Please check your network connectivity.";
break;
default:
// not handled, do not show a message box
return;

View file

@ -31,23 +31,16 @@ AssetClient::AssetClient() {
static_cast<AssetClient*>(dependency)->deleteLater();
});
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGetInfoReply, this, "handleAssetGetInfoReply");
packetReceiver.registerMessageListener(PacketType::AssetGetReply, this, "handleAssetGetReply");
packetReceiver.registerListener(PacketType::AssetUploadReply, this, "handleAssetUploadReply");
connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &AssetClient::handleNodeKilled);
}
AssetRequest* AssetClient::createRequest(const QString& hash, const QString& extension) {
if (QThread::currentThread() != thread()) {
AssetRequest* req;
QMetaObject::invokeMethod(this, "createRequest",
Qt::BlockingQueuedConnection,
Q_RETURN_ARG(AssetRequest*, req),
Q_ARG(QString, hash),
Q_ARG(QString, extension));
return req;
}
if (hash.length() != SHA256_HASH_HEX_LENGTH) {
qDebug() << "Invalid hash size";
return nullptr;
@ -62,19 +55,15 @@ AssetRequest* AssetClient::createRequest(const QString& hash, const QString& ext
return nullptr;
}
return new AssetRequest(this, hash, extension);
auto request = new AssetRequest(hash, extension);
// Move to the AssetClient thread in case we are not currently on that thread (which will usually be the case)
request->moveToThread(thread());
return request;
}
AssetUpload* AssetClient::createUpload(const QString& filename) {
if (QThread::currentThread() != thread()) {
AssetUpload* upload;
QMetaObject::invokeMethod(this, "createUpload",
Qt::BlockingQueuedConnection,
Q_RETURN_ARG(AssetUpload*, upload),
Q_ARG(QString, filename));
return upload;
}
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
@ -83,7 +72,11 @@ AssetUpload* AssetClient::createUpload(const QString& filename) {
return nullptr;
}
return new AssetUpload(this, filename);
auto upload = new AssetUpload(this, filename);
upload->moveToThread(thread());
return upload;
}
bool AssetClient::getAsset(const QString& hash, const QString& extension, DataOffset start, DataOffset end,
@ -118,7 +111,7 @@ bool AssetClient::getAsset(const QString& hash, const QString& extension, DataOf
nodeList->sendPacket(std::move(packet), *assetServer);
_pendingRequests[messageID] = callback;
_pendingRequests[assetServer][messageID] = callback;
return true;
}
@ -143,7 +136,7 @@ bool AssetClient::getAssetInfo(const QString& hash, const QString& extension, Ge
nodeList->sendPacket(std::move(packet), *assetServer);
_pendingInfoRequests[messageID] = callback;
_pendingInfoRequests[assetServer][messageID] = callback;
return true;
}
@ -165,9 +158,23 @@ void AssetClient::handleAssetGetInfoReply(QSharedPointer<NLPacket> packet, Share
packet->readPrimitive(&info.size);
}
if (_pendingInfoRequests.contains(messageID)) {
auto callback = _pendingInfoRequests.take(messageID);
callback(error, info);
// Check if we have any pending requests for this node
auto messageMapIt = _pendingInfoRequests.find(senderNode);
if (messageMapIt != _pendingInfoRequests.end()) {
// Found the node, get the MessageID -> Callback map
auto& messageCallbackMap = messageMapIt->second;
// Check if we have this pending request
auto requestIt = messageCallbackMap.find(messageID);
if (requestIt != messageCallbackMap.end()) {
auto callback = requestIt->second;
callback(true, error, info);
messageCallbackMap.erase(requestIt);
}
// Although the messageCallbackMap may now be empty, we won't delete the node until we have disconnected from
// it to avoid constantly creating/deleting the map on subsequent requests.
}
}
@ -194,9 +201,23 @@ void AssetClient::handleAssetGetReply(QSharedPointer<NLPacketList> packetList, S
qDebug() << "Failure getting asset: " << error;
}
if (_pendingRequests.contains(messageID)) {
auto callback = _pendingRequests.take(messageID);
callback(error, data);
// Check if we have any pending requests for this node
auto messageMapIt = _pendingRequests.find(senderNode);
if (messageMapIt != _pendingRequests.end()) {
// Found the node, get the MessageID -> Callback map
auto& messageCallbackMap = messageMapIt->second;
// Check if we have this pending request
auto requestIt = messageCallbackMap.find(messageID);
if (requestIt != messageCallbackMap.end()) {
auto callback = requestIt->second;
callback(true, error, data);
messageCallbackMap.erase(requestIt);
}
// Although the messageCallbackMap may now be empty, we won't delete the node until we have disconnected from
// it to avoid constantly creating/deleting the map on subsequent requests.
}
}
@ -219,7 +240,7 @@ bool AssetClient::uploadAsset(const QByteArray& data, const QString& extension,
nodeList->sendPacketList(std::move(packetList), *assetServer);
_pendingUploads[messageID] = callback;
_pendingUploads[assetServer][messageID] = callback;
return true;
}
@ -244,8 +265,59 @@ void AssetClient::handleAssetUploadReply(QSharedPointer<NLPacket> packet, Shared
qDebug() << "Successfully uploaded asset to asset-server - SHA256 hash is " << hashString;
}
if (_pendingUploads.contains(messageID)) {
auto callback = _pendingUploads.take(messageID);
callback(error, hashString);
// Check if we have any pending requests for this node
auto messageMapIt = _pendingUploads.find(senderNode);
if (messageMapIt != _pendingUploads.end()) {
// Found the node, get the MessageID -> Callback map
auto& messageCallbackMap = messageMapIt->second;
// Check if we have this pending request
auto requestIt = messageCallbackMap.find(messageID);
if (requestIt != messageCallbackMap.end()) {
auto callback = requestIt->second;
callback(true, error, hashString);
messageCallbackMap.erase(requestIt);
}
// Although the messageCallbackMap may now be empty, we won't delete the node until we have disconnected from
// it to avoid constantly creating/deleting the map on subsequent requests.
}
}
void AssetClient::handleNodeKilled(SharedNodePointer node) {
if (node->getType() != NodeType::AssetServer) {
return;
}
{
auto messageMapIt = _pendingRequests.find(node);
if (messageMapIt != _pendingRequests.end()) {
for (const auto& value : messageMapIt->second) {
value.second(false, AssetServerError::NoError, QByteArray());
}
messageMapIt->second.clear();
}
}
{
auto messageMapIt = _pendingInfoRequests.find(node);
if (messageMapIt != _pendingInfoRequests.end()) {
AssetInfo info { "", 0 };
for (const auto& value : messageMapIt->second) {
value.second(false, AssetServerError::NoError, info);
}
messageMapIt->second.clear();
}
}
{
auto messageMapIt = _pendingUploads.find(node);
if (messageMapIt != _pendingUploads.end()) {
for (const auto& value : messageMapIt->second) {
value.second(false, AssetServerError::NoError, "");
}
messageMapIt->second.clear();
}
}
}

View file

@ -20,6 +20,7 @@
#include "AssetUtils.h"
#include "LimitedNodeList.h"
#include "NLPacket.h"
#include "Node.h"
class AssetRequest;
class AssetUpload;
@ -29,9 +30,11 @@ struct AssetInfo {
int64_t size;
};
using ReceivedAssetCallback = std::function<void(AssetServerError serverError, const QByteArray& data)>;
using GetInfoCallback = std::function<void(AssetServerError serverError, AssetInfo info)>;
using UploadResultCallback = std::function<void(AssetServerError serverError, const QString& hash)>;
using ReceivedAssetCallback = std::function<void(bool responseReceived, AssetServerError serverError, const QByteArray& data)>;
using GetInfoCallback = std::function<void(bool responseReceived, AssetServerError serverError, AssetInfo info)>;
using UploadResultCallback = std::function<void(bool responseReceived, AssetServerError serverError, const QString& hash)>;
class AssetClient : public QObject, public Dependency {
Q_OBJECT
@ -46,15 +49,17 @@ private slots:
void handleAssetGetReply(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);
void handleAssetUploadReply(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleNodeKilled(SharedNodePointer node);
private:
bool getAssetInfo(const QString& hash, const QString& extension, GetInfoCallback callback);
bool getAsset(const QString& hash, const QString& extension, DataOffset start, DataOffset end, ReceivedAssetCallback callback);
bool uploadAsset(const QByteArray& data, const QString& extension, UploadResultCallback callback);
static MessageID _currentID;
QHash<MessageID, ReceivedAssetCallback> _pendingRequests;
QHash<MessageID, GetInfoCallback> _pendingInfoRequests;
QHash<MessageID, UploadResultCallback> _pendingUploads;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, ReceivedAssetCallback>> _pendingRequests;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, GetInfoCallback>> _pendingInfoRequests;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, UploadResultCallback>> _pendingUploads;
friend class AssetRequest;
friend class AssetUpload;

View file

@ -19,8 +19,8 @@
#include "NetworkLogging.h"
#include "NodeList.h"
AssetRequest::AssetRequest(QObject* parent, const QString& hash, const QString& extension) :
QObject(parent),
AssetRequest::AssetRequest(const QString& hash, const QString& extension) :
QObject(),
_hash(hash),
_extension(extension)
{
@ -33,29 +33,40 @@ void AssetRequest::start() {
return;
}
if (_state != NOT_STARTED) {
if (_state != NotStarted) {
qCWarning(networking) << "AssetRequest already started.";
return;
}
_state = WAITING_FOR_INFO;
_state = WaitingForInfo;
auto assetClient = DependencyManager::get<AssetClient>();
assetClient->getAssetInfo(_hash, _extension, [this](AssetServerError serverError, AssetInfo info) {
assetClient->getAssetInfo(_hash, _extension, [this](bool responseReceived, AssetServerError serverError, AssetInfo info) {
_info = info;
if (serverError != AssetServerError::NoError) {
if (!responseReceived) {
_error = NetworkError;
} else if (serverError != AssetServerError::NoError) {
switch(serverError) {
case AssetServerError::AssetNotFound:
_error = NotFound;
break;
default:
_error = UnknownError;
break;
}
}
if (_error != NoError) {
qCDebug(networking) << "Got error retrieving asset info for" << _hash;
_state = FINISHED;
_state = Finished;
emit finished(this);
_error = (serverError == AssetServerError::AssetNotFound) ? NotFound : UnknownError;
return;
}
_state = WAITING_FOR_DATA;
_state = WaitingForData;
_data.resize(info.size);
qCDebug(networking) << "Got size of " << _hash << " : " << info.size << " bytes";
@ -63,23 +74,13 @@ void AssetRequest::start() {
int start = 0, end = _info.size;
auto assetClient = DependencyManager::get<AssetClient>();
assetClient->getAsset(_hash, _extension, start, end, [this, start, end](AssetServerError serverError,
assetClient->getAsset(_hash, _extension, start, end, [this, start, end](bool responseReceived, AssetServerError serverError,
const QByteArray& data) {
Q_ASSERT(data.size() == (end - start));
if (serverError == AssetServerError::NoError) {
// we need to check the hash of the received data to make sure it matches what we expect
if (hashData(data).toHex() == _hash) {
memcpy(_data.data() + start, data.constData(), data.size());
_totalReceived += data.size();
emit progress(_totalReceived, _info.size);
} else {
// hash doesn't match - we have an error
_error = HashVerificationFailed;
}
} else {
if (!responseReceived) {
_error = NetworkError;
} else if (serverError != AssetServerError::NoError) {
switch (serverError) {
case AssetServerError::AssetNotFound:
_error = NotFound;
@ -91,13 +92,25 @@ void AssetRequest::start() {
_error = UnknownError;
break;
}
} else {
// we need to check the hash of the received data to make sure it matches what we expect
if (hashData(data).toHex() == _hash) {
memcpy(_data.data() + start, data.constData(), data.size());
_totalReceived += data.size();
emit progress(_totalReceived, _info.size);
} else {
// hash doesn't match - we have an error
_error = HashVerificationFailed;
}
}
if (_error != NoError) {
qCDebug(networking) << "Got error retrieving asset" << _hash << "- error code" << _error;
}
_state = FINISHED;
_state = Finished;
emit finished(this);
});
});

View file

@ -24,10 +24,10 @@ class AssetRequest : public QObject {
Q_OBJECT
public:
enum State {
NOT_STARTED = 0,
WAITING_FOR_INFO,
WAITING_FOR_DATA,
FINISHED
NotStarted = 0,
WaitingForInfo,
WaitingForData,
Finished
};
enum Error {
@ -35,10 +35,11 @@ public:
NotFound,
InvalidByteRange,
HashVerificationFailed,
NetworkError,
UnknownError
};
AssetRequest(QObject* parent, const QString& hash, const QString& extension);
AssetRequest(const QString& hash, const QString& extension);
Q_INVOKABLE void start();
@ -51,7 +52,7 @@ signals:
void progress(qint64 totalReceived, qint64 total);
private:
State _state = NOT_STARTED;
State _state = NotStarted;
Error _error = NoError;
AssetInfo _info;
uint64_t _totalReceived { 0 };

View file

@ -12,9 +12,14 @@
#include "AssetResourceRequest.h"
#include "AssetClient.h"
#include "AssetRequest.h"
#include "AssetUtils.h"
AssetResourceRequest::~AssetResourceRequest() {
if (_assetRequest) {
_assetRequest->deleteLater();
}
}
void AssetResourceRequest::doSend() {
// Make request to atp
auto assetClient = DependencyManager::get<AssetClient>();
@ -31,9 +36,9 @@ void AssetResourceRequest::doSend() {
return;
}
auto request = assetClient->createRequest(hash, extension);
_assetRequest = assetClient->createRequest(hash, extension);
if (!request) {
if (!_assetRequest) {
_result = ServerUnavailable;
_state = Finished;
@ -42,9 +47,10 @@ void AssetResourceRequest::doSend() {
return;
}
connect(request, &AssetRequest::progress, this, &AssetResourceRequest::progress);
QObject::connect(request, &AssetRequest::finished, [this](AssetRequest* req) mutable {
connect(_assetRequest, &AssetRequest::progress, this, &AssetResourceRequest::progress);
QObject::connect(_assetRequest, &AssetRequest::finished, [this](AssetRequest* req) mutable {
Q_ASSERT(_state == InProgress);
Q_ASSERT(req == _assetRequest);
Q_ASSERT(req->getState() == AssetRequest::FINISHED);
switch (req->getError()) {
@ -55,6 +61,9 @@ void AssetResourceRequest::doSend() {
case AssetRequest::Error::NotFound:
_result = NotFound;
break;
case AssetRequest::Error::NetworkError:
_result = ServerUnavailable;
break;
default:
_result = Error;
break;
@ -62,9 +71,12 @@ void AssetResourceRequest::doSend() {
_state = Finished;
emit finished();
_assetRequest->deleteLater();
_assetRequest = nullptr;
});
request->start();
_assetRequest->start();
}
void AssetResourceRequest::onDownloadProgress(qint64 bytesReceived, qint64 bytesTotal) {

View file

@ -14,18 +14,23 @@
#include <QUrl>
#include "AssetRequest.h"
#include "ResourceRequest.h"
class AssetResourceRequest : public ResourceRequest {
Q_OBJECT
public:
AssetResourceRequest(QObject* parent, const QUrl& url) : ResourceRequest(parent, url) { }
~AssetResourceRequest();
protected:
virtual void doSend() override;
private slots:
void onDownloadProgress(qint64 bytesReceived, qint64 bytesTotal);
private:
AssetRequest* _assetRequest;
};
#endif

View file

@ -43,20 +43,24 @@ void AssetUpload::start() {
qDebug() << "Attempting to upload" << _filename << "to asset-server.";
assetClient->uploadAsset(data, _extension, [this](AssetServerError error, const QString& hash){
switch (error) {
case AssetServerError::NoError:
_error = NoError;
break;
case AssetServerError::AssetTooLarge:
_error = TooLarge;
break;
case AssetServerError::PermissionDenied:
_error = PermissionDenied;
break;
default:
_error = FileOpenError;
break;
assetClient->uploadAsset(data, _extension, [this](bool responseReceived, AssetServerError error, const QString& hash){
if (!responseReceived) {
_error = NetworkError;
} else {
switch (error) {
case AssetServerError::NoError:
_error = NoError;
break;
case AssetServerError::AssetTooLarge:
_error = TooLarge;
break;
case AssetServerError::PermissionDenied:
_error = PermissionDenied;
break;
default:
_error = FileOpenError;
break;
}
}
emit finished(this, hash);
});

View file

@ -28,6 +28,7 @@ public:
enum Error {
NoError = 0,
NetworkError,
Timeout,
TooLarge,
PermissionDenied,

View file

@ -24,7 +24,7 @@ const size_t SHA256_HASH_HEX_LENGTH = 64;
const uint64_t MAX_UPLOAD_SIZE = 1000 * 1000 * 1000; // 1GB
enum AssetServerError : uint8_t {
NoError,
NoError = 0,
AssetNotFound,
InvalidByteRange,
AssetTooLarge,

View file

@ -88,6 +88,16 @@ private:
typedef QSharedPointer<Node> SharedNodePointer;
Q_DECLARE_METATYPE(SharedNodePointer)
namespace std {
template<>
struct hash<SharedNodePointer> {
size_t operator()(const SharedNodePointer& p) const {
// Return the hash of the pointer
return hash<Node*>()(p.data());
}
};
}
QDebug operator<<(QDebug debug, const Node& node);
#endif // hifi_Node_h