add support for byte range requests to ATP

This commit is contained in:
Stephen Birarda 2017-04-17 19:08:06 -07:00 committed by Atlante45
parent e708a71b64
commit 397a29039e
7 changed files with 132 additions and 112 deletions

View file

@ -11,6 +11,8 @@
#include "SendAssetTask.h" #include "SendAssetTask.h"
#include <cmath>
#include <QFile> #include <QFile>
#include <DependencyManager.h> #include <DependencyManager.h>
@ -21,6 +23,7 @@
#include <udt/Packet.h> #include <udt/Packet.h>
#include "AssetUtils.h" #include "AssetUtils.h"
#include "ByteRange.h"
#include "ClientServerUtils.h" #include "ClientServerUtils.h"
SendAssetTask::SendAssetTask(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& sendToNode, const QDir& resourcesDir) : SendAssetTask::SendAssetTask(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& sendToNode, const QDir& resourcesDir) :
@ -34,20 +37,21 @@ SendAssetTask::SendAssetTask(QSharedPointer<ReceivedMessage> message, const Shar
void SendAssetTask::run() { void SendAssetTask::run() {
MessageID messageID; MessageID messageID;
DataOffset start, end; ByteRange byteRange;
_message->readPrimitive(&messageID); _message->readPrimitive(&messageID);
QByteArray assetHash = _message->read(SHA256_HASH_LENGTH); QByteArray assetHash = _message->read(SHA256_HASH_LENGTH);
// `start` and `end` indicate the range of data to retrieve for the asset identified by `assetHash`. // `start` and `end` indicate the range of data to retrieve for the asset identified by `assetHash`.
// `start` is inclusive, `end` is exclusive. Requesting `start` = 1, `end` = 10 will retrieve 9 bytes of data, // `start` is inclusive, `end` is exclusive. Requesting `start` = 1, `end` = 10 will retrieve 9 bytes of data,
// starting at index 1. // starting at index 1.
_message->readPrimitive(&start); _message->readPrimitive(&byteRange.fromInclusive);
_message->readPrimitive(&end); _message->readPrimitive(&byteRange.toExclusive);
QString hexHash = assetHash.toHex(); QString hexHash = assetHash.toHex();
qDebug() << "Received a request for the file (" << messageID << "): " << hexHash << " from " << start << " to " << end; qDebug() << "Received a request for the file (" << messageID << "): " << hexHash << " from "
<< byteRange.fromInclusive << " to " << byteRange.toExclusive;
qDebug() << "Starting task to send asset: " << hexHash << " for messageID " << messageID; qDebug() << "Starting task to send asset: " << hexHash << " for messageID " << messageID;
auto replyPacketList = NLPacketList::create(PacketType::AssetGetReply, QByteArray(), true, true); auto replyPacketList = NLPacketList::create(PacketType::AssetGetReply, QByteArray(), true, true);
@ -56,7 +60,7 @@ void SendAssetTask::run() {
replyPacketList->writePrimitive(messageID); replyPacketList->writePrimitive(messageID);
if (end <= start) { if (byteRange.toExclusive < byteRange.fromInclusive) {
replyPacketList->writePrimitive(AssetServerError::InvalidByteRange); replyPacketList->writePrimitive(AssetServerError::InvalidByteRange);
} else { } else {
QString filePath = _resourcesDir.filePath(QString(hexHash)); QString filePath = _resourcesDir.filePath(QString(hexHash));
@ -64,15 +68,47 @@ void SendAssetTask::run() {
QFile file { filePath }; QFile file { filePath };
if (file.open(QIODevice::ReadOnly)) { if (file.open(QIODevice::ReadOnly)) {
if (file.size() < end) { if (byteRange.isSet()) {
// if the byte range is not set, force it to be from 0 to the end of the file
byteRange.fromInclusive = 0;
byteRange.toExclusive = file.size();
}
if (file.size() < std::abs(byteRange.toExclusive)) {
replyPacketList->writePrimitive(AssetServerError::InvalidByteRange); replyPacketList->writePrimitive(AssetServerError::InvalidByteRange);
qCDebug(networking) << "Bad byte range: " << hexHash << " " << start << ":" << end; qCDebug(networking) << "Bad byte range: " << hexHash << " "
<< byteRange.fromInclusive << ":" << byteRange.toExclusive;
} else { } else {
auto size = end - start; // we have a valid byte range, handle it and send the asset
file.seek(start); auto size = byteRange.size();
replyPacketList->writePrimitive(AssetServerError::NoError);
replyPacketList->writePrimitive(size); if (byteRange.fromInclusive > 0) {
replyPacketList->write(file.read(size)); // this range is positive, meaning we just need to seek into the file and then read from there
file.seek(byteRange.fromInclusive);
replyPacketList->writePrimitive(AssetServerError::NoError);
replyPacketList->writePrimitive(size);
replyPacketList->write(file.read(size));
} else {
// this range is negative, at least the first part of the read will be back into the end of the file
// seek to the part of the file where the negative range begins
file.seek(file.size() + byteRange.fromInclusive);
replyPacketList->writePrimitive(AssetServerError::NoError);
replyPacketList->writePrimitive(size);
// first write everything from the negative range to the end of the file
replyPacketList->write(file.read(-byteRange.fromInclusive));
if (byteRange.toExclusive != 0) {
// this range has a portion that is at the front of the file
// seek to the beginning and read what is left over
file.seek(0);
replyPacketList->write(file.read(byteRange.toExclusive));
}
}
qCDebug(networking) << "Sending asset: " << hexHash; qCDebug(networking) << "Sending asset: " << hexHash;
} }
file.close(); file.close();

View file

@ -67,7 +67,6 @@ void AssetClient::init() {
} }
} }
void AssetClient::cacheInfoRequest(QObject* reciever, QString slot) { void AssetClient::cacheInfoRequest(QObject* reciever, QString slot) {
if (QThread::currentThread() != thread()) { if (QThread::currentThread() != thread()) {
QMetaObject::invokeMethod(this, "cacheInfoRequest", Qt::QueuedConnection, QMetaObject::invokeMethod(this, "cacheInfoRequest", Qt::QueuedConnection,
@ -182,8 +181,8 @@ RenameMappingRequest* AssetClient::createRenameMappingRequest(const AssetPath& o
return request; return request;
} }
AssetRequest* AssetClient::createRequest(const AssetHash& hash) { AssetRequest* AssetClient::createRequest(const AssetHash& hash, ByteRange byteRange) {
auto request = new AssetRequest(hash); auto request = new AssetRequest(hash, byteRange);
// Move to the AssetClient thread in case we are not currently on that thread (which will usually be the case) // Move to the AssetClient thread in case we are not currently on that thread (which will usually be the case)
request->moveToThread(thread()); request->moveToThread(thread());

View file

@ -21,6 +21,7 @@
#include <DependencyManager.h> #include <DependencyManager.h>
#include "AssetUtils.h" #include "AssetUtils.h"
#include "ByteRange.h"
#include "ClientServerUtils.h" #include "ClientServerUtils.h"
#include "LimitedNodeList.h" #include "LimitedNodeList.h"
#include "Node.h" #include "Node.h"
@ -55,7 +56,7 @@ public:
Q_INVOKABLE DeleteMappingsRequest* createDeleteMappingsRequest(const AssetPathList& paths); Q_INVOKABLE DeleteMappingsRequest* createDeleteMappingsRequest(const AssetPathList& paths);
Q_INVOKABLE SetMappingRequest* createSetMappingRequest(const AssetPath& path, const AssetHash& hash); Q_INVOKABLE SetMappingRequest* createSetMappingRequest(const AssetPath& path, const AssetHash& hash);
Q_INVOKABLE RenameMappingRequest* createRenameMappingRequest(const AssetPath& oldPath, const AssetPath& newPath); Q_INVOKABLE RenameMappingRequest* createRenameMappingRequest(const AssetPath& oldPath, const AssetPath& newPath);
Q_INVOKABLE AssetRequest* createRequest(const AssetHash& hash); Q_INVOKABLE AssetRequest* createRequest(const AssetHash& hash, ByteRange byteRange = ByteRange());
Q_INVOKABLE AssetUpload* createUpload(const QString& filename); Q_INVOKABLE AssetUpload* createUpload(const QString& filename);
Q_INVOKABLE AssetUpload* createUpload(const QByteArray& data); Q_INVOKABLE AssetUpload* createUpload(const QByteArray& data);

View file

@ -23,10 +23,12 @@
static int requestID = 0; static int requestID = 0;
AssetRequest::AssetRequest(const QString& hash) : AssetRequest::AssetRequest(const QString& hash, ByteRange byteRange) :
_requestID(++requestID), _requestID(++requestID),
_hash(hash) _hash(hash),
_byteRange(byteRange)
{ {
} }
AssetRequest::~AssetRequest() { AssetRequest::~AssetRequest() {
@ -34,9 +36,6 @@ AssetRequest::~AssetRequest() {
if (_assetRequestID) { if (_assetRequestID) {
assetClient->cancelGetAssetRequest(_assetRequestID); assetClient->cancelGetAssetRequest(_assetRequestID);
} }
if (_assetInfoRequestID) {
assetClient->cancelGetAssetInfoRequest(_assetInfoRequestID);
}
} }
void AssetRequest::start() { void AssetRequest::start() {
@ -62,108 +61,74 @@ void AssetRequest::start() {
// Try to load from cache // Try to load from cache
_data = loadFromCache(getUrl()); _data = loadFromCache(getUrl());
if (!_data.isNull()) { if (!_data.isNull()) {
_info.hash = _hash;
_info.size = _data.size();
_error = NoError; _error = NoError;
_state = Finished; _state = Finished;
emit finished(this); emit finished(this);
return; return;
} }
_state = WaitingForInfo; _state = WaitingForData;
auto assetClient = DependencyManager::get<AssetClient>(); auto assetClient = DependencyManager::get<AssetClient>();
_assetInfoRequestID = assetClient->getAssetInfo(_hash, auto that = QPointer<AssetRequest>(this); // Used to track the request's lifetime
[this](bool responseReceived, AssetServerError serverError, AssetInfo info) { auto hash = _hash;
_assetInfoRequestID = INVALID_MESSAGE_ID; _assetRequestID = assetClient->getAsset(_hash, _byteRange.fromInclusive, _byteRange.toExclusive,
[this, that, hash](bool responseReceived, AssetServerError serverError, const QByteArray& data) {
_info = info; if (!that) {
qCWarning(asset_client) << "Got reply for dead asset request " << hash << "- error code" << _error;
// If the request is dead, return
return;
}
_assetRequestID = INVALID_MESSAGE_ID;
if (!responseReceived) { if (!responseReceived) {
_error = NetworkError; _error = NetworkError;
} else if (serverError != AssetServerError::NoError) { } else if (serverError != AssetServerError::NoError) {
switch(serverError) { switch (serverError) {
case AssetServerError::AssetNotFound: case AssetServerError::AssetNotFound:
_error = NotFound; _error = NotFound;
break; break;
case AssetServerError::InvalidByteRange:
_error = InvalidByteRange;
break;
default: default:
_error = UnknownError; _error = UnknownError;
break; break;
} }
} } else {
if (_byteRange.isSet()) {
// we had a byte range, the size of the data does not match what we expect, so we return an error
if (data.size() != _byteRange.size()) {
_error = SizeVerificationFailed;
}
} else if (hashData(data).toHex() != _hash) {
// the hash of the received data does not match what we expect, so we return an error
_error = HashVerificationFailed;
}
if (_error == NoError) {
_data = data;
_totalReceived += data.size();
emit progress(_totalReceived, data.size());
saveToCache(getUrl(), data);
}
}
if (_error != NoError) { if (_error != NoError) {
qCWarning(asset_client) << "Got error retrieving asset info for" << _hash; qCWarning(asset_client) << "Got error retrieving asset" << _hash << "- error code" << _error;
_state = Finished; }
emit finished(this);
_state = Finished;
emit finished(this);
}, [this, that](qint64 totalReceived, qint64 total) {
if (!that) {
// If the request is dead, return
return; return;
} }
emit progress(totalReceived, total);
_state = WaitingForData;
_data.resize(info.size);
qCDebug(asset_client) << "Got size of " << _hash << " : " << info.size << " bytes";
int start = 0, end = _info.size;
auto assetClient = DependencyManager::get<AssetClient>();
auto that = QPointer<AssetRequest>(this); // Used to track the request's lifetime
auto hash = _hash;
_assetRequestID = assetClient->getAsset(_hash, start, end,
[this, that, hash, start, end](bool responseReceived, AssetServerError serverError, const QByteArray& data) {
if (!that) {
qCWarning(asset_client) << "Got reply for dead asset request " << hash << "- error code" << _error;
// If the request is dead, return
return;
}
_assetRequestID = INVALID_MESSAGE_ID;
if (!responseReceived) {
_error = NetworkError;
} else if (serverError != AssetServerError::NoError) {
switch (serverError) {
case AssetServerError::AssetNotFound:
_error = NotFound;
break;
case AssetServerError::InvalidByteRange:
_error = InvalidByteRange;
break;
default:
_error = UnknownError;
break;
}
} else {
Q_ASSERT(data.size() == (end - start));
// we need to check the hash of the received data to make sure it matches what we expect
if (hashData(data).toHex() == _hash) {
memcpy(_data.data() + start, data.constData(), data.size());
_totalReceived += data.size();
emit progress(_totalReceived, _info.size);
saveToCache(getUrl(), data);
} else {
// hash doesn't match - we have an error
_error = HashVerificationFailed;
}
}
if (_error != NoError) {
qCWarning(asset_client) << "Got error retrieving asset" << _hash << "- error code" << _error;
}
_state = Finished;
emit finished(this);
}, [this, that](qint64 totalReceived, qint64 total) {
if (!that) {
// If the request is dead, return
return;
}
emit progress(totalReceived, total);
});
}); });
} }

View file

@ -17,15 +17,15 @@
#include <QString> #include <QString>
#include "AssetClient.h" #include "AssetClient.h"
#include "AssetUtils.h" #include "AssetUtils.h"
#include "ByteRange.h"
class AssetRequest : public QObject { class AssetRequest : public QObject {
Q_OBJECT Q_OBJECT
public: public:
enum State { enum State {
NotStarted = 0, NotStarted = 0,
WaitingForInfo,
WaitingForData, WaitingForData,
Finished Finished
}; };
@ -36,11 +36,12 @@ public:
InvalidByteRange, InvalidByteRange,
InvalidHash, InvalidHash,
HashVerificationFailed, HashVerificationFailed,
SizeVerificationFailed,
NetworkError, NetworkError,
UnknownError UnknownError
}; };
AssetRequest(const QString& hash); AssetRequest(const QString& hash, ByteRange byteRange);
virtual ~AssetRequest() override; virtual ~AssetRequest() override;
Q_INVOKABLE void start(); Q_INVOKABLE void start();
@ -59,13 +60,12 @@ private:
int _requestID; int _requestID;
State _state = NotStarted; State _state = NotStarted;
Error _error = NoError; Error _error = NoError;
AssetInfo _info;
uint64_t _totalReceived { 0 }; uint64_t _totalReceived { 0 };
QString _hash; QString _hash;
QByteArray _data; QByteArray _data;
int _numPendingRequests { 0 }; int _numPendingRequests { 0 };
MessageID _assetRequestID { INVALID_MESSAGE_ID }; MessageID _assetRequestID { INVALID_MESSAGE_ID };
MessageID _assetInfoRequestID { INVALID_MESSAGE_ID }; ByteRange _byteRange;
}; };
#endif #endif

View file

@ -0,0 +1,24 @@
//
// ByteRange.h
// libraries/networking/src
//
// Created by Stephen Birarda on 4/17/17.
// Copyright 2017 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_ByteRange_h
#define hifi_ByteRange_h
struct ByteRange {
int64_t fromInclusive { 0 };
int64_t toExclusive { 0 };
bool isSet() { return fromInclusive < 0 || fromInclusive < toExclusive; }
int64_t size() { return toExclusive - fromInclusive; }
};
#endif // hifi_ByteRange_h

View file

@ -17,12 +17,7 @@
#include <cstdint> #include <cstdint>
struct ByteRange { #include "ByteRange.h"
int64_t fromInclusive { 0 };
int64_t toExclusive { 0 };
bool isSet() { return fromInclusive < 0 || fromInclusive < toExclusive; }
};
class ResourceRequest : public QObject { class ResourceRequest : public QObject {
Q_OBJECT Q_OBJECT