add an UploadAssetTask so uploading is on diff thread

This commit is contained in:
Stephen Birarda 2015-08-28 11:12:52 -07:00
parent 9575b47e4e
commit 3f85e8a2fe
7 changed files with 147 additions and 77 deletions

View file

@ -25,6 +25,7 @@
#include "NetworkLogging.h"
#include "NodeType.h"
#include "SendAssetTask.h"
#include "UploadAssetTask.h"
const QString ASSET_SERVER_LOGGING_TARGET_NAME = "asset-server";
@ -35,7 +36,8 @@ AssetServer::AssetServer(NLPacket& packet) :
// Most of the work will be I/O bound, reading from disk and constructing packet objects,
// so the ideal is greater than the number of cores on the system.
_taskPool.setMaxThreadCount(20);
static const int TASK_POOL_THREAD_COUNT = 50;
_taskPool.setMaxThreadCount(TASK_POOL_THREAD_COUNT);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet");
@ -153,76 +155,9 @@ void AssetServer::handleAssetGet(QSharedPointer<NLPacket> packet, SharedNodePoin
}
void AssetServer::handleAssetUpload(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
qDebug() << "Starting an UploadAssetTask for upload from" << uuidStringWithoutCurlyBraces(senderNode->getUUID());
auto data = packetList->getMessage();
QBuffer buffer { &data };
buffer.open(QIODevice::ReadOnly);
MessageID messageID;
buffer.read(reinterpret_cast<char*>(&messageID), sizeof(messageID));
if (!senderNode->getCanRez()) {
// this is a node the domain told us is not allowed to rez entities
// for now this also means it isn't allowed to add assets
// so return a packet with error that indicates that
auto permissionErrorPacket = NLPacket::create(PacketType::AssetUploadReply, sizeof(MessageID) + sizeof(AssetServerError));
// write the message ID and a permission denied error
permissionErrorPacket->writePrimitive(messageID);
permissionErrorPacket->writePrimitive(AssetServerError::PERMISSION_DENIED);
// send off the packet
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(permissionErrorPacket), *senderNode);
// return so we're not attempting to handle upload
return;
}
uint8_t extensionLength;
buffer.read(reinterpret_cast<char*>(&extensionLength), sizeof(extensionLength));
QByteArray extension = buffer.read(extensionLength);
qDebug() << "Got extension: " << extension;
uint64_t fileSize;
buffer.read(reinterpret_cast<char*>(&fileSize), sizeof(fileSize));
qDebug() << "Receiving a file of size " << fileSize;
auto replyPacket = NLPacket::create(PacketType::AssetUploadReply);
replyPacket->writePrimitive(messageID);
if (fileSize > MAX_UPLOAD_SIZE) {
replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE);
} else {
QByteArray fileData = buffer.read(fileSize);
auto hash = hashData(fileData);
auto hexHash = hash.toHex();
qDebug() << "Got data: (" << hexHash << ") ";
QFile file { _resourcesDirectory.filePath(QString(hexHash)) + "." + QString(extension) };
if (file.exists()) {
qDebug() << "[WARNING] This file already exists: " << hexHash;
} else {
file.open(QIODevice::WriteOnly);
file.write(fileData);
file.close();
}
replyPacket->writePrimitive(AssetServerError::NO_ERROR);
replyPacket->write(hash);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *senderNode);
}
QByteArray AssetServer::hashData(const QByteArray& data) {
return QCryptographicHash::hash(data, QCryptographicHash::Sha256);
auto task = new UploadAssetTask(packetList, senderNode, _resourcesDirectory);
_taskPool.start(task);
}

View file

@ -24,8 +24,6 @@ class AssetServer : public ThreadedAssignment {
public:
AssetServer(NLPacket& packet);
static QByteArray hashData(const QByteArray& data);
public slots:
void run();

View file

@ -27,9 +27,6 @@ public:
void run();
signals:
void finished();
private:
MessageID _messageID;
QByteArray _assetHash;

View file

@ -0,0 +1,99 @@
//
// UploadAssetTask.cpp
// assignment-client/src/assets
//
// Created by Stephen Birarda on 2015-08-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "UploadAssetTask.h"
#include <QtCore/QBuffer>
#include <QtCore/QFile>
#include <AssetUtils.h>
#include <NodeList.h>
#include <NLPacketList.h>
UploadAssetTask::UploadAssetTask(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode,
const QDir& resourcesDir) :
_packetList(packetList),
_senderNode(senderNode),
_resourcesDir(resourcesDir)
{
}
void UploadAssetTask::run() {
auto data = _packetList->getMessage();
QBuffer buffer { &data };
buffer.open(QIODevice::ReadOnly);
MessageID messageID;
buffer.read(reinterpret_cast<char*>(&messageID), sizeof(messageID));
if (!_senderNode->getCanRez()) {
// this is a node the domain told us is not allowed to rez entities
// for now this also means it isn't allowed to add assets
// so return a packet with error that indicates that
auto permissionErrorPacket = NLPacket::create(PacketType::AssetUploadReply, sizeof(MessageID) + sizeof(AssetServerError));
// write the message ID and a permission denied error
permissionErrorPacket->writePrimitive(messageID);
permissionErrorPacket->writePrimitive(AssetServerError::PERMISSION_DENIED);
// send off the packet
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(permissionErrorPacket), *_senderNode);
// return so we're not attempting to handle upload
return;
}
uint8_t extensionLength;
buffer.read(reinterpret_cast<char*>(&extensionLength), sizeof(extensionLength));
QByteArray extension = buffer.read(extensionLength);
uint64_t fileSize;
buffer.read(reinterpret_cast<char*>(&fileSize), sizeof(fileSize));
qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes and extension" << extension << "from"
<< uuidStringWithoutCurlyBraces(_senderNode->getUUID());
auto replyPacket = NLPacket::create(PacketType::AssetUploadReply);
replyPacket->writePrimitive(messageID);
if (fileSize > MAX_UPLOAD_SIZE) {
replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE);
} else {
QByteArray fileData = buffer.read(fileSize);
auto hash = hashData(fileData);
auto hexHash = hash.toHex();
qDebug() << "Hash for uploaded file from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID())
<< "is: (" << hexHash << ") ";
QFile file { _resourcesDir.filePath(QString(hexHash)) + "." + QString(extension) };
if (file.exists()) {
qDebug() << "[WARNING] This file already exists: " << hexHash;
} else {
file.open(QIODevice::WriteOnly);
file.write(fileData);
file.close();
}
replyPacket->writePrimitive(AssetServerError::NO_ERROR);
replyPacket->write(hash);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *_senderNode);
}

View file

@ -0,0 +1,37 @@
//
// UploadAssetTask.h
// assignment-client/src/assets
//
// Created by Stephen Birarda on 2015-08-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_UploadAssetTask_h
#define hifi_UploadAssetTask_h
#include <QtCore/QDir>
#include <QtCore/QObject>
#include <QtCore/QRunnable>
#include <QtCore/QSharedPointer>
class NLPacketList;
class Node;
class UploadAssetTask : public QRunnable {
public:
UploadAssetTask(QSharedPointer<NLPacketList> packetList, QSharedPointer<Node> senderNode, const QDir& resourcesDir);
void run();
private:
QSharedPointer<NLPacketList> _packetList;
QSharedPointer<Node> _senderNode;
QDir _resourcesDir;
};
#endif // hifi_UploadAssetTask_h

View file

@ -12,7 +12,7 @@
#ifndef hifi_AssetUtils_h
#define hifi_AssetUtils_h
#include "NLPacketList.h"
#include <QtCore/QCryptographicHash>
using MessageID = uint32_t;
using DataOffset = int64_t;
@ -31,4 +31,6 @@ enum AssetServerError : uint8_t {
const QString ATP_SCHEME = "atp";
inline QByteArray hashData(const QByteArray& data) { return QCryptographicHash::hash(data, QCryptographicHash::Sha256); }
#endif

View file

@ -111,6 +111,8 @@ private:
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
time_point _flowWindowFullSince;
mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend