Merge pull request #37 from birarda/upload-task

add a task for upload in asset-server
This commit is contained in:
Ryan Huffman 2015-08-31 14:55:37 -07:00
commit 423370dccb
9 changed files with 180 additions and 114 deletions

View file

@ -25,6 +25,7 @@
#include "NetworkLogging.h"
#include "NodeType.h"
#include "SendAssetTask.h"
#include "UploadAssetTask.h"
const QString ASSET_SERVER_LOGGING_TARGET_NAME = "asset-server";
@ -35,7 +36,8 @@ AssetServer::AssetServer(NLPacket& packet) :
// Most of the work will be I/O bound, reading from disk and constructing packet objects,
// so the ideal is greater than the number of cores on the system.
_taskPool.setMaxThreadCount(20);
static const int TASK_POOL_THREAD_COUNT = 50;
_taskPool.setMaxThreadCount(TASK_POOL_THREAD_COUNT);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet");
@ -123,51 +125,36 @@ void AssetServer::handleAssetGetInfo(QSharedPointer<NLPacket> packet, SharedNode
}
void AssetServer::handleAssetGet(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
MessageID messageID;
QByteArray assetHash;
uint8_t extensionLength;
DataOffset start;
DataOffset end;
auto minSize = qint64(sizeof(messageID) + SHA256_HASH_LENGTH + sizeof(extensionLength) + sizeof(start) + sizeof(end));
auto minSize = qint64(sizeof(MessageID) + SHA256_HASH_LENGTH + sizeof(uint8_t) + sizeof(DataOffset) + sizeof(DataOffset));
if (packet->getPayloadSize() < minSize) {
qDebug() << "ERROR bad file request";
return;
}
packet->readPrimitive(&messageID);
assetHash = packet->read(SHA256_HASH_LENGTH);
packet->readPrimitive(&extensionLength);
QByteArray extension = packet->read(extensionLength);
packet->readPrimitive(&start);
packet->readPrimitive(&end);
QByteArray hexHash = assetHash.toHex();
qDebug() << "Received a request for the file (" << messageID << "): " << hexHash << " from " << start << " to " << end;
// Queue task
QString filePath = _resourcesDirectory.filePath(QString(hexHash) + "." + QString(extension));
auto task = new SendAssetTask(messageID, assetHash, filePath, start, end, senderNode);
auto task = new SendAssetTask(packet, senderNode, _resourcesDirectory);
_taskPool.start(task);
}
void AssetServer::handleAssetUpload(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
auto data = packetList->getMessage();
QBuffer buffer { &data };
buffer.open(QIODevice::ReadOnly);
MessageID messageID;
buffer.read(reinterpret_cast<char*>(&messageID), sizeof(messageID));
if (!senderNode->getCanRez()) {
if (senderNode->getCanRez()) {
qDebug() << "Starting an UploadAssetTask for upload from" << uuidStringWithoutCurlyBraces(senderNode->getUUID());
auto task = new UploadAssetTask(packetList, senderNode, _resourcesDirectory);
_taskPool.start(task);
} else {
// this is a node the domain told us is not allowed to rez entities
// for now this also means it isn't allowed to add assets
// so return a packet with error that indicates that
auto permissionErrorPacket = NLPacket::create(PacketType::AssetUploadReply, sizeof(MessageID) + sizeof(AssetServerError));
MessageID messageID;
packetList->readPrimitive(&messageID);
// write the message ID and a permission denied error
permissionErrorPacket->writePrimitive(messageID);
permissionErrorPacket->writePrimitive(AssetServerError::PERMISSION_DENIED);
@ -175,54 +162,6 @@ void AssetServer::handleAssetUpload(QSharedPointer<NLPacketList> packetList, Sha
// send off the packet
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(permissionErrorPacket), *senderNode);
// return so we're not attempting to handle upload
return;
}
uint8_t extensionLength;
buffer.read(reinterpret_cast<char*>(&extensionLength), sizeof(extensionLength));
QByteArray extension = buffer.read(extensionLength);
qDebug() << "Got extension: " << extension;
uint64_t fileSize;
buffer.read(reinterpret_cast<char*>(&fileSize), sizeof(fileSize));
qDebug() << "Receiving a file of size " << fileSize;
auto replyPacket = NLPacket::create(PacketType::AssetUploadReply);
replyPacket->writePrimitive(messageID);
if (fileSize > MAX_UPLOAD_SIZE) {
replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE);
} else {
QByteArray fileData = buffer.read(fileSize);
auto hash = hashData(fileData);
auto hexHash = hash.toHex();
qDebug() << "Got data: (" << hexHash << ") ";
QFile file { _resourcesDirectory.filePath(QString(hexHash)) + "." + QString(extension) };
if (file.exists()) {
qDebug() << "[WARNING] This file already exists: " << hexHash;
} else {
file.open(QIODevice::WriteOnly);
file.write(fileData);
file.close();
}
replyPacket->writePrimitive(AssetServerError::NO_ERROR);
replyPacket->write(hash);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *senderNode);
}
QByteArray AssetServer::hashData(const QByteArray& data) {
return QCryptographicHash::hash(data, QCryptographicHash::Sha256);
}

View file

@ -24,8 +24,6 @@ class AssetServer : public ThreadedAssignment {
public:
AssetServer(NLPacket& packet);
static QByteArray hashData(const QByteArray& data);
public slots:
void run();

View file

@ -18,42 +18,56 @@
#include <NLPacket.h>
#include <NLPacketList.h>
#include <NodeList.h>
#include <udt/Packet.h>
#include "AssetUtils.h"
SendAssetTask::SendAssetTask(MessageID messageID, const QByteArray& assetHash, QString filePath, DataOffset start, DataOffset end,
const SharedNodePointer& sendToNode) :
SendAssetTask::SendAssetTask(QSharedPointer<NLPacket> packet, const SharedNodePointer& sendToNode, const QDir& resourcesDir) :
QRunnable(),
_messageID(messageID),
_assetHash(assetHash),
_filePath(filePath),
_start(start),
_end(end),
_sendToNode(sendToNode)
_packet(packet),
_senderNode(sendToNode),
_resourcesDir(resourcesDir)
{
}
void SendAssetTask::run() {
QString hexHash = _assetHash.toHex();
qDebug() << "Starting task to send asset: " << hexHash << " for messageID " << _messageID;
MessageID messageID;
uint8_t extensionLength;
DataOffset start, end;
_packet->readPrimitive(&messageID);
QByteArray assetHash = _packet->read(SHA256_HASH_LENGTH);
_packet->readPrimitive(&extensionLength);
QByteArray extension = _packet->read(extensionLength);
_packet->readPrimitive(&start);
_packet->readPrimitive(&end);
QString hexHash = assetHash.toHex();
qDebug() << "Received a request for the file (" << messageID << "): " << hexHash << " from " << start << " to " << end;
qDebug() << "Starting task to send asset: " << hexHash << " for messageID " << messageID;
auto replyPacketList = std::unique_ptr<NLPacketList>(new NLPacketList(PacketType::AssetGetReply, QByteArray(), true, true));
replyPacketList->write(_assetHash);
replyPacketList->write(assetHash);
replyPacketList->writePrimitive(_messageID);
replyPacketList->writePrimitive(messageID);
if (_end <= _start) {
if (end <= start) {
writeError(replyPacketList.get(), AssetServerError::INVALID_BYTE_RANGE);
} else {
QFile file { _filePath };
QString filePath = _resourcesDir.filePath(QString(hexHash) + "." + QString(extension));
QFile file { filePath };
if (file.open(QIODevice::ReadOnly)) {
if (file.size() < _end) {
if (file.size() < end) {
writeError(replyPacketList.get(), AssetServerError::INVALID_BYTE_RANGE);
qCDebug(networking) << "Bad byte range: " << hexHash << " " << _start << ":" << _end;
qCDebug(networking) << "Bad byte range: " << hexHash << " " << start << ":" << end;
} else {
auto size = _end - _start;
file.seek(_start);
auto size = end - start;
file.seek(start);
replyPacketList->writePrimitive(AssetServerError::NO_ERROR);
replyPacketList->writePrimitive(size);
replyPacketList->write(file.read(size));
@ -61,11 +75,11 @@ void SendAssetTask::run() {
}
file.close();
} else {
qCDebug(networking) << "Asset not found: " << _filePath << "(" << hexHash << ")";
qCDebug(networking) << "Asset not found: " << filePath << "(" << hexHash << ")";
writeError(replyPacketList.get(), AssetServerError::ASSET_NOT_FOUND);
}
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(replyPacketList), *_sendToNode);
nodeList->sendPacketList(std::move(replyPacketList), *_senderNode);
}

View file

@ -12,31 +12,27 @@
#ifndef hifi_SendAssetTask_h
#define hifi_SendAssetTask_h
#include <QByteArray>
#include <QString>
#include <QRunnable>
#include <QtCore/QByteArray>
#include <QtCore/QSharedPointer>
#include <QtCore/QString>
#include <QtCore/QRunnable>
#include "AssetUtils.h"
#include "AssetServer.h"
#include "Node.h"
class NLPacket;
class SendAssetTask : public QRunnable {
public:
SendAssetTask(MessageID messageID, const QByteArray& assetHash, QString filePath, DataOffset start, DataOffset end,
const SharedNodePointer& sendToNode);
SendAssetTask(QSharedPointer<NLPacket> packet, const SharedNodePointer& sendToNode, const QDir& resourcesDir);
void run();
signals:
void finished();
private:
MessageID _messageID;
QByteArray _assetHash;
QString _filePath;
DataOffset _start;
DataOffset _end;
SharedNodePointer _sendToNode;
QSharedPointer<NLPacket> _packet;
SharedNodePointer _senderNode;
QDir _resourcesDir;
};
#endif

View file

@ -0,0 +1,80 @@
//
// UploadAssetTask.cpp
// assignment-client/src/assets
//
// Created by Stephen Birarda on 2015-08-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "UploadAssetTask.h"
#include <QtCore/QBuffer>
#include <QtCore/QFile>
#include <AssetUtils.h>
#include <NodeList.h>
#include <NLPacketList.h>
UploadAssetTask::UploadAssetTask(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode,
const QDir& resourcesDir) :
_packetList(packetList),
_senderNode(senderNode),
_resourcesDir(resourcesDir)
{
}
void UploadAssetTask::run() {
auto data = _packetList->getMessage();
QBuffer buffer { &data };
buffer.open(QIODevice::ReadOnly);
MessageID messageID;
buffer.read(reinterpret_cast<char*>(&messageID), sizeof(messageID));
uint8_t extensionLength;
buffer.read(reinterpret_cast<char*>(&extensionLength), sizeof(extensionLength));
QByteArray extension = buffer.read(extensionLength);
uint64_t fileSize;
buffer.read(reinterpret_cast<char*>(&fileSize), sizeof(fileSize));
qDebug() << "UploadAssetTask reading a file of " << fileSize << "bytes and extension" << extension << "from"
<< uuidStringWithoutCurlyBraces(_senderNode->getUUID());
auto replyPacket = NLPacket::create(PacketType::AssetUploadReply);
replyPacket->writePrimitive(messageID);
if (fileSize > MAX_UPLOAD_SIZE) {
replyPacket->writePrimitive(AssetServerError::ASSET_TOO_LARGE);
} else {
QByteArray fileData = buffer.read(fileSize);
auto hash = hashData(fileData);
auto hexHash = hash.toHex();
qDebug() << "Hash for uploaded file from" << uuidStringWithoutCurlyBraces(_senderNode->getUUID())
<< "is: (" << hexHash << ") ";
QFile file { _resourcesDir.filePath(QString(hexHash)) + "." + QString(extension) };
if (file.exists()) {
qDebug() << "[WARNING] This file already exists: " << hexHash;
} else {
file.open(QIODevice::WriteOnly);
file.write(fileData);
file.close();
}
replyPacket->writePrimitive(AssetServerError::NO_ERROR);
replyPacket->write(hash);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacket(std::move(replyPacket), *_senderNode);
}

View file

@ -0,0 +1,37 @@
//
// UploadAssetTask.h
// assignment-client/src/assets
//
// Created by Stephen Birarda on 2015-08-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_UploadAssetTask_h
#define hifi_UploadAssetTask_h
#include <QtCore/QDir>
#include <QtCore/QObject>
#include <QtCore/QRunnable>
#include <QtCore/QSharedPointer>
class NLPacketList;
class Node;
class UploadAssetTask : public QRunnable {
public:
UploadAssetTask(QSharedPointer<NLPacketList> packetList, QSharedPointer<Node> senderNode, const QDir& resourcesDir);
void run();
private:
QSharedPointer<NLPacketList> _packetList;
QSharedPointer<Node> _senderNode;
QDir _resourcesDir;
};
#endif // hifi_UploadAssetTask_h

View file

@ -12,7 +12,7 @@
#ifndef hifi_AssetUtils_h
#define hifi_AssetUtils_h
#include "NLPacketList.h"
#include <QtCore/QCryptographicHash>
using MessageID = uint32_t;
using DataOffset = int64_t;
@ -31,4 +31,6 @@ enum AssetServerError : uint8_t {
const QString ATP_SCHEME = "atp";
inline QByteArray hashData(const QByteArray& data) { return QCryptographicHash::hash(data, QCryptographicHash::Sha256); }
#endif

View file

@ -415,6 +415,7 @@ void Connection::processControl(std::unique_ptr<ControlPacket> controlPacket) {
if (_hasReceivedHandshakeACK) {
processLightACK(move(controlPacket));
}
break;
case ControlPacket::ACK2:
if (_hasReceivedHandshake) {
processACK2(move(controlPacket));

View file

@ -20,7 +20,6 @@
#include <QtCore/QObject>
#include <QtCore/QReadWriteLock>
#include <QtCore/QTimer>
#include "../HifiSockAddr.h"