From 3f85e8a2fed5b3c4ec5e211e1c0048cace4f9f36 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 11:12:52 -0700 Subject: [PATCH] add an UploadAssetTask so uploading is on diff thread --- assignment-client/src/assets/AssetServer.cpp | 77 ++------------- assignment-client/src/assets/AssetServer.h | 2 - assignment-client/src/assets/SendAssetTask.h | 3 - .../src/assets/UploadAssetTask.cpp | 99 +++++++++++++++++++ .../src/assets/UploadAssetTask.h | 37 +++++++ libraries/networking/src/AssetUtils.h | 4 +- libraries/networking/src/udt/SendQueue.h | 2 + 7 files changed, 147 insertions(+), 77 deletions(-) create mode 100644 assignment-client/src/assets/UploadAssetTask.cpp create mode 100644 assignment-client/src/assets/UploadAssetTask.h diff --git a/assignment-client/src/assets/AssetServer.cpp b/assignment-client/src/assets/AssetServer.cpp index 8c4807f412..c046b39f66 100644 --- a/assignment-client/src/assets/AssetServer.cpp +++ b/assignment-client/src/assets/AssetServer.cpp @@ -25,6 +25,7 @@ #include "NetworkLogging.h" #include "NodeType.h" #include "SendAssetTask.h" +#include "UploadAssetTask.h" const QString ASSET_SERVER_LOGGING_TARGET_NAME = "asset-server"; @@ -35,7 +36,8 @@ AssetServer::AssetServer(NLPacket& packet) : // Most of the work will be I/O bound, reading from disk and constructing packet objects, // so the ideal is greater than the number of cores on the system. - _taskPool.setMaxThreadCount(20); + static const int TASK_POOL_THREAD_COUNT = 50; + _taskPool.setMaxThreadCount(TASK_POOL_THREAD_COUNT); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet"); @@ -153,76 +155,9 @@ void AssetServer::handleAssetGet(QSharedPointer packet, SharedNodePoin } void AssetServer::handleAssetUpload(QSharedPointer packetList, SharedNodePointer senderNode) { + qDebug() << "Starting an UploadAssetTask for upload from" << uuidStringWithoutCurlyBraces(senderNode->getUUID()); - auto data = packetList->getMessage(); - QBuffer buffer { &data }; - buffer.open(QIODevice::ReadOnly); - - MessageID messageID; - buffer.read(reinterpret_cast(&messageID), sizeof(messageID)); - - if (!senderNode->getCanRez()) { - // this is a node the domain told us is not allowed to rez entities - // for now this also means it isn't allowed to add assets - // so return a packet with error that indicates that - - auto permissionErrorPacket = NLPacket::create(PacketType::AssetUploadReply, sizeof(MessageID) + sizeof(AssetServerError)); - - // write the message ID and a permission denied error - permissionErrorPacket->writePrimitive(messageID); - permissionErrorPacket->writePrimitive(AssetServerError::PERMISSION_DENIED); - - // send off the packet - auto nodeList = DependencyManager::get(); - nodeList->sendPacket(std::move(permissionErrorPacket), *senderNode); - - // return so we're not attempting to handle upload - return; - } - - uint8_t extensionLength; - buffer.read(reinterpret_cast(&extensionLength), sizeof(extensionLength)); - - QByteArray extension = buffer.read(extensionLength); - - qDebug() << "Got extension: " << extension; - - uint64_t fileSize; - buffer.read(reinterpret_cast(&fileSize), sizeof(fileSize)); - - qDebug() << "Receiving a file of size " << fileSize; - - auto replyPacket = NLPacket::create(PacketType::AssetUploadReply); - replyPacket->writePrimitive(messageID); - - if (fileSize > MAX_UPLOAD_SIZE) { - replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE); - } else { - QByteArray fileData = buffer.read(fileSize); - - auto hash = hashData(fileData); - auto hexHash = hash.toHex(); - - qDebug() << "Got data: (" << hexHash << ") "; - - QFile file { _resourcesDirectory.filePath(QString(hexHash)) + "." + QString(extension) }; - - if (file.exists()) { - qDebug() << "[WARNING] This file already exists: " << hexHash; - } else { - file.open(QIODevice::WriteOnly); - file.write(fileData); - file.close(); - } - replyPacket->writePrimitive(AssetServerError::NO_ERROR); - replyPacket->write(hash); - } - - auto nodeList = DependencyManager::get(); - nodeList->sendPacket(std::move(replyPacket), *senderNode); -} - -QByteArray AssetServer::hashData(const QByteArray& data) { - return QCryptographicHash::hash(data, QCryptographicHash::Sha256); + auto task = new UploadAssetTask(packetList, senderNode, _resourcesDirectory); + _taskPool.start(task); } diff --git a/assignment-client/src/assets/AssetServer.h b/assignment-client/src/assets/AssetServer.h index 99a31b3768..1975f746a9 100644 --- a/assignment-client/src/assets/AssetServer.h +++ b/assignment-client/src/assets/AssetServer.h @@ -24,8 +24,6 @@ class AssetServer : public ThreadedAssignment { public: AssetServer(NLPacket& packet); - static QByteArray hashData(const QByteArray& data); - public slots: void run(); diff --git a/assignment-client/src/assets/SendAssetTask.h b/assignment-client/src/assets/SendAssetTask.h index 6b6c555326..477f42c1a3 100644 --- a/assignment-client/src/assets/SendAssetTask.h +++ b/assignment-client/src/assets/SendAssetTask.h @@ -27,9 +27,6 @@ public: void run(); -signals: - void finished(); - private: MessageID _messageID; QByteArray _assetHash; diff --git a/assignment-client/src/assets/UploadAssetTask.cpp b/assignment-client/src/assets/UploadAssetTask.cpp new file mode 100644 index 0000000000..66fdc80e90 --- /dev/null +++ b/assignment-client/src/assets/UploadAssetTask.cpp @@ -0,0 +1,99 @@ +// +// UploadAssetTask.cpp +// assignment-client/src/assets +// +// Created by Stephen Birarda on 2015-08-28. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "UploadAssetTask.h" + +#include +#include + +#include +#include +#include + + +UploadAssetTask::UploadAssetTask(QSharedPointer packetList, SharedNodePointer senderNode, + const QDir& resourcesDir) : + _packetList(packetList), + _senderNode(senderNode), + _resourcesDir(resourcesDir) +{ + +} + +void UploadAssetTask::run() { + auto data = _packetList->getMessage(); + + QBuffer buffer { &data }; + buffer.open(QIODevice::ReadOnly); + + MessageID messageID; + buffer.read(reinterpret_cast(&messageID), sizeof(messageID)); + + if (!_senderNode->getCanRez()) { + // this is a node the domain told us is not allowed to rez entities + // for now this also means it isn't allowed to add assets + // so return a packet with error that indicates that + + auto permissionErrorPacket = NLPacket::create(PacketType::AssetUploadReply, sizeof(MessageID) + sizeof(AssetServerError)); + + // write the message ID and a permission denied error + permissionErrorPacket->writePrimitive(messageID); + permissionErrorPacket->writePrimitive(AssetServerError::PERMISSION_DENIED); + + // send off the packet + auto nodeList = DependencyManager::get(); + nodeList->sendPacket(std::move(permissionErrorPacket), *_senderNode); + + // return so we're not attempting to handle upload + return; + } + + uint8_t extensionLength; + buffer.read(reinterpret_cast(&extensionLength), sizeof(extensionLength)); + + QByteArray extension = buffer.read(extensionLength); + + uint64_t fileSize; + buffer.read(reinterpret_cast(&fileSize), sizeof(fileSize)); + + qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes and extension" << extension << "from" + << uuidStringWithoutCurlyBraces(_senderNode->getUUID()); + + auto replyPacket = NLPacket::create(PacketType::AssetUploadReply); + replyPacket->writePrimitive(messageID); + + if (fileSize > MAX_UPLOAD_SIZE) { + replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE); + } else { + QByteArray fileData = buffer.read(fileSize); + + auto hash = hashData(fileData); + auto hexHash = hash.toHex(); + + qDebug() << "Hash for uploaded file from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID()) + << "is: (" << hexHash << ") "; + + QFile file { _resourcesDir.filePath(QString(hexHash)) + "." + QString(extension) }; + + if (file.exists()) { + qDebug() << "[WARNING] This file already exists: " << hexHash; + } else { + file.open(QIODevice::WriteOnly); + file.write(fileData); + file.close(); + } + replyPacket->writePrimitive(AssetServerError::NO_ERROR); + replyPacket->write(hash); + } + + auto nodeList = DependencyManager::get(); + nodeList->sendPacket(std::move(replyPacket), *_senderNode); +} diff --git a/assignment-client/src/assets/UploadAssetTask.h b/assignment-client/src/assets/UploadAssetTask.h new file mode 100644 index 0000000000..c310bfc948 --- /dev/null +++ b/assignment-client/src/assets/UploadAssetTask.h @@ -0,0 +1,37 @@ +// +// UploadAssetTask.h +// assignment-client/src/assets +// +// Created by Stephen Birarda on 2015-08-28. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#pragma once + +#ifndef hifi_UploadAssetTask_h +#define hifi_UploadAssetTask_h + +#include +#include +#include +#include + +class NLPacketList; +class Node; + +class UploadAssetTask : public QRunnable { +public: + UploadAssetTask(QSharedPointer packetList, QSharedPointer senderNode, const QDir& resourcesDir); + + void run(); + +private: + QSharedPointer _packetList; + QSharedPointer _senderNode; + QDir _resourcesDir; +}; + +#endif // hifi_UploadAssetTask_h diff --git a/libraries/networking/src/AssetUtils.h b/libraries/networking/src/AssetUtils.h index ce9f3f4354..ae6f51df64 100644 --- a/libraries/networking/src/AssetUtils.h +++ b/libraries/networking/src/AssetUtils.h @@ -12,7 +12,7 @@ #ifndef hifi_AssetUtils_h #define hifi_AssetUtils_h -#include "NLPacketList.h" +#include using MessageID = uint32_t; using DataOffset = int64_t; @@ -31,4 +31,6 @@ enum AssetServerError : uint8_t { const QString ATP_SCHEME = "atp"; +inline QByteArray hashData(const QByteArray& data) { return QCryptographicHash::hash(data, QCryptographicHash::Sha256); } + #endif diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index c6668f1d09..fe07495e55 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -111,6 +111,8 @@ private: std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC + time_point _flowWindowFullSince; + mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend