First draft for mapping requests

This commit is contained in:
Atlante45 2016-02-29 16:57:24 -08:00 committed by Ryan Huffman
parent b0c11e6562
commit 7b179ab91a
8 changed files with 128 additions and 21 deletions

View file

@ -39,6 +39,7 @@ AssetServer::AssetServer(ReceivedMessage& message) :
_taskPool.setMaxThreadCount(TASK_POOL_THREAD_COUNT);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGetMapping, this, "handleAssetGetMapping");
packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet");
packetReceiver.registerListener(PacketType::AssetGetInfo, this, "handleAssetGetInfo");
packetReceiver.registerListener(PacketType::AssetUpload, this, "handleAssetUpload");
@ -161,6 +162,32 @@ void AssetServer::completeSetup() {
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
}
void AssetServer::handleAssetGetMapping(QSharedPointer<ReceivedMessage> packet, SharedNodePointer senderNode) {
MessageID messageID;
message->readPrimitive(&messageID);
QString assetPath = message->readAll();
auto replyPacket = NLPacket::create(PacketType::AssetGetMappingReply);
replyPacket->writePrimitive(messageID);
auto it = _fileMapping.find(assetPath.toStdString());
if (it != _fileMapping.end()) {
auto assetHash = QString::fromStdString(it.second);
qDebug() << "Found mapping for: " << assetPath << "=>" << assetHash;
replyPacket->writePrimitive(AssetServerError::NoError);
replyPacket->write(assetHash.toHex());
} else {
qDebug() << "Mapping not found for: " << assetPath;
replyPacket->writePrimitive(AssetServerError::AssetNotFound);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *senderNode);
}
void AssetServer::handleAssetGetInfo(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
QByteArray assetHash;
MessageID messageID;

View file

@ -31,6 +31,7 @@ public slots:
private slots:
void completeSetup();
void handleAssetGetMapping(QSharedPointer<ReceivedMessage> packet, SharedNodePointer senderNode);
void handleAssetGetInfo(QSharedPointer<ReceivedMessage> packet, SharedNodePointer senderNode);
void handleAssetGet(QSharedPointer<ReceivedMessage> packet, SharedNodePointer senderNode);
void handleAssetUpload(QSharedPointer<ReceivedMessage> packetList, SharedNodePointer senderNode);
@ -39,6 +40,12 @@ private slots:
private:
static void writeError(NLPacketList* packetList, AssetServerError error);
using Path = std::string;
using Hash = std::string;
using Mapping = std::map<Path, Hash>;
Mapping _fileMapping;
QDir _resourcesDirectory;
QThreadPool _taskPool;
};

View file

@ -39,6 +39,7 @@ AssetClient::AssetClient() {
auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGetMappingReply, this, "handleAssetGetMappingReply");
packetReceiver.registerListener(PacketType::AssetGetInfoReply, this, "handleAssetGetInfoReply");
packetReceiver.registerListener(PacketType::AssetGetReply, this, "handleAssetGetReply", true);
packetReceiver.registerListener(PacketType::AssetUploadReply, this, "handleAssetUploadReply");
@ -110,7 +111,22 @@ bool haveAssetServer() {
return true;
}
AssetRequest* AssetClient::createRequest(const QString& hash, const QString& extension) {
AssetRequest* AssetClient::createRequest(const QUrl& url) {
auto parts = _url.path().split(".", QString::SkipEmptyParts);
auto hash = parts.length() > 0 ? parts[0] : "";
auto extension = parts.length() > 1 ? parts[1] : "";
if (hash.length() != SHA256_HASH_HEX_LENGTH) {
_result = InvalidURL;
_state = Finished;
emit finished();
return;
}
if (hash.length() != SHA256_HASH_HEX_LENGTH) {
qCWarning(asset_client) << "Invalid hash size";
return nullptr;
@ -155,6 +171,34 @@ AssetUpload* AssetClient::createUpload(const QByteArray& data, const QString& ex
}
}
bool AssetClient::getAssetMapping(const QString& path, GetMappingCallback callback) {
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer assetServer = nodeList->soloNodeOfType(NodeType::AssetServer);
if (assetServer) {
auto messageID = ++_currentID;
auto payload = path.toLatin1();
auto payloadSize = sizeof(messageID) + payload.size();
auto packet = NLPacket::create(PacketType::AssetGetMapping, payloadSize, true);
qCDebug(asset_client) << "Requesting mapping for" << path << "from asset-server.";
packet->writePrimitive(messageID);
auto bytesWritten = packet->write(payload);
Q_ASSERT(bytesWritten == payload.size());
nodeList->sendPacket(std::move(packet), *assetServer);
_pendingMappingRequests[assetServer][messageID] = callback;
return true;
}
return false;
}
bool AssetClient::getAsset(const QString& hash, const QString& extension, DataOffset start, DataOffset end,
ReceivedAssetCallback callback, ProgressCallback progressCallback) {
if (hash.length() != SHA256_HASH_HEX_LENGTH) {
@ -220,6 +264,39 @@ bool AssetClient::getAssetInfo(const QString& hash, const QString& extension, Ge
return false;
}
void AssetClient::handleAssetGetMappingReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
MessageID messageID;
message->readPrimitive(&messageID);
AssetServerError error;
message->readPrimitive(&error);
QString assetHash;
if (error == AssetServerError::NoError) {
assetHash = message->read(SHA256_HASH_LENGTH);
}
// Check if we have any pending requests for this node
auto messageMapIt = _pendingMappingRequests.find(senderNode);
if (messageMapIt != _pendingMappingRequests.end()) {
// Found the node, get the MessageID -> Callback map
auto& messageCallbackMap = messageMapIt->second;
// Check if we have this pending request
auto requestIt = messageCallbackMap.find(messageID);
if (requestIt != messageCallbackMap.end()) {
auto callback = requestIt->second;
callback(true, error, assetHash);
messageCallbackMap.erase(requestIt);
}
// Although the messageCallbackMap may now be empty, we won't delete the node until we have disconnected from
// it to avoid constantly creating/deleting the map on subsequent requests.
}
}
void AssetClient::handleAssetGetInfoReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
MessageID messageID;
message->readPrimitive(&messageID);

View file

@ -34,6 +34,7 @@ struct AssetInfo {
};
using ReceivedAssetCallback = std::function<void(bool responseReceived, AssetServerError serverError, const QByteArray& data)>;
using GetMappingCallback = std::function<void(bool responseReceived, AssetServerError serverError, const QString& hash)>;
using GetInfoCallback = std::function<void(bool responseReceived, AssetServerError serverError, AssetInfo info)>;
using UploadResultCallback = std::function<void(bool responseReceived, AssetServerError serverError, const QString& hash)>;
using ProgressCallback = std::function<void(qint64 totalReceived, qint64 total)>;
@ -44,7 +45,7 @@ class AssetClient : public QObject, public Dependency {
public:
AssetClient();
Q_INVOKABLE AssetRequest* createRequest(const QString& hash, const QString& extension);
Q_INVOKABLE AssetRequest* createRequest(const QUrl& url);
Q_INVOKABLE AssetUpload* createUpload(const QString& filename);
Q_INVOKABLE AssetUpload* createUpload(const QByteArray& data, const QString& extension);
@ -55,6 +56,7 @@ public slots:
void clearCache();
private slots:
void handleAssetGetMappingReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleAssetGetInfoReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleAssetGetReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleAssetUploadReply(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
@ -62,6 +64,7 @@ private slots:
void handleNodeKilled(SharedNodePointer node);
private:
bool getAssetMapping(const QString& path, GetMappingCallback callback);
bool getAssetInfo(const QString& hash, const QString& extension, GetInfoCallback callback);
bool getAsset(const QString& hash, const QString& extension, DataOffset start, DataOffset end,
ReceivedAssetCallback callback, ProgressCallback progressCallback);
@ -73,6 +76,7 @@ private:
};
static MessageID _currentID;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, GetMappingCallback>> _pendingMappingRequests;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, GetAssetCallbacks>> _pendingRequests;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, GetInfoCallback>> _pendingInfoRequests;
std::unordered_map<SharedNodePointer, std::unordered_map<MessageID, UploadResultCallback>> _pendingUploads;

View file

@ -20,9 +20,8 @@
#include "NodeList.h"
#include "ResourceCache.h"
AssetRequest::AssetRequest(const QString& hash, const QString& extension) :
QObject(),
_hash(hash),
AssetRequest::AssetRequest(const QString& url, const QString& extension) :
_url(url),
_extension(extension)
{
}

View file

@ -39,7 +39,7 @@ public:
UnknownError
};
AssetRequest(const QString& hash, const QString& extension);
AssetRequest(const QString& url, const QString& extension);
Q_INVOKABLE void start();
@ -52,12 +52,15 @@ signals:
void finished(AssetRequest* thisRequest);
void progress(qint64 totalReceived, qint64 total);
private slots:
void getAssetCallback(bool responseReceived, AssetServerError serverError, const QByteArray& data);
private:
State _state = NotStarted;
Error _error = NoError;
AssetInfo _info;
uint64_t _totalReceived { 0 };
QString _hash;
QString _url;
QString _extension;
QByteArray _data;
int _numPendingRequests { 0 };

View file

@ -23,19 +23,7 @@ AssetResourceRequest::~AssetResourceRequest() {
void AssetResourceRequest::doSend() {
// Make request to atp
auto assetClient = DependencyManager::get<AssetClient>();
auto parts = _url.path().split(".", QString::SkipEmptyParts);
auto hash = parts.length() > 0 ? parts[0] : "";
auto extension = parts.length() > 1 ? parts[1] : "";
if (hash.length() != SHA256_HASH_HEX_LENGTH) {
_result = InvalidURL;
_state = Finished;
emit finished();
return;
}
_assetRequest = assetClient->createRequest(hash, extension);
_assetRequest = assetClient->createRequest(_url);
if (!_assetRequest) {
_result = ServerUnavailable;

View file

@ -90,7 +90,9 @@ public:
DomainServerRemovedNode,
MessagesData,
MessagesSubscribe,
MessagesUnsubscribe
MessagesUnsubscribe,
AssetGetMapping,
AssetGetMappingReply
};
};