mirror of
https://thingvellir.net/git/overte
synced 2025-03-27 23:52:03 +01:00
Add thread pool to AssetGet requests
This commit is contained in:
parent
4191ccd60d
commit
b865a3f8fb
2 changed files with 92 additions and 8 deletions
|
@ -11,23 +11,108 @@
|
|||
|
||||
#include "AssetServer.h"
|
||||
|
||||
#include <QBuffer>
|
||||
#include <QCryptographicHash>
|
||||
#include <QDir>
|
||||
#include <QFile>
|
||||
#include <QFileInfo>
|
||||
#include <QCoreApplication>
|
||||
#include <QEventLoop>
|
||||
#include <QRunnable>
|
||||
#include <QString>
|
||||
|
||||
#include <NodeType.h>
|
||||
|
||||
const QString ASSET_SERVER_LOGGING_TARGET_NAME = "asset-server";
|
||||
|
||||
AssetServer::AssetServer(NLPacket& packet) : ThreadedAssignment(packet) {
|
||||
void writeError(NLPacketList* packetList, AssetServerError error) {
|
||||
packetList->writePrimitive(error);
|
||||
}
|
||||
|
||||
class SendAssetTask : public QRunnable {
|
||||
public:
|
||||
SendAssetTask(MessageID messageID, const QByteArray& assetHash, QString filePath, DataOffset start, DataOffset end, const SharedNodePointer& sendToNode) :
|
||||
QRunnable(),
|
||||
_messageID(messageID),
|
||||
_assetHash(assetHash),
|
||||
_filePath(filePath),
|
||||
_start(start),
|
||||
_end(end),
|
||||
_sendToNode(sendToNode)
|
||||
{
|
||||
}
|
||||
|
||||
void run();
|
||||
|
||||
signals:
|
||||
void finished();
|
||||
|
||||
private:
|
||||
MessageID _messageID;
|
||||
QByteArray _assetHash;
|
||||
QString _filePath;
|
||||
DataOffset _start;
|
||||
DataOffset _end;
|
||||
SharedNodePointer _sendToNode;
|
||||
};
|
||||
|
||||
void SendAssetTask::run() {
|
||||
qDebug() << "Starting task to send asset: " << _assetHash << " for messageID " << _messageID;
|
||||
auto replyPacketList = std::unique_ptr<NLPacketList>(new NLPacketList(PacketType::AssetGetReply, QByteArray(), true, true));
|
||||
|
||||
replyPacketList->write(_assetHash, HASH_HEX_LENGTH);
|
||||
|
||||
replyPacketList->writePrimitive(_messageID);
|
||||
|
||||
const int64_t MAX_LENGTH = 4294967296;
|
||||
|
||||
if (_end <= _start) {
|
||||
writeError(replyPacketList.get(), AssetServerError::INVALID_BYTE_RANGE);
|
||||
} else if (_end - _start > MAX_LENGTH) {
|
||||
writeError(replyPacketList.get(), AssetServerError::INVALID_BYTE_RANGE);
|
||||
} else {
|
||||
QFile file { _filePath };
|
||||
qDebug() << "Opening file: " << QString(QFileInfo(_assetHash).fileName());
|
||||
|
||||
if (file.open(QIODevice::ReadOnly)) {
|
||||
if (file.size() < _end) {
|
||||
writeError(replyPacketList.get(), AssetServerError::INVALID_BYTE_RANGE);
|
||||
} else {
|
||||
auto size = _end - _start;
|
||||
file.seek(_start);
|
||||
replyPacketList->writePrimitive(AssetServerError::NO_ERROR);
|
||||
replyPacketList->writePrimitive(size);
|
||||
while (file.pos() < file.size()) {
|
||||
static const int chunkSize = 20000;
|
||||
QByteArray data = file.read(chunkSize);
|
||||
replyPacketList->write(data, chunkSize);
|
||||
}
|
||||
qDebug() << "Done reading";
|
||||
}
|
||||
file.close();
|
||||
} else {
|
||||
qDebug() << "Asset not found";
|
||||
writeError(replyPacketList.get(), AssetServerError::ASSET_NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
qDebug() << "Sending asset";
|
||||
auto nodeList = DependencyManager::get<NodeList>();
|
||||
nodeList->sendPacketList(std::move(replyPacketList), *_sendToNode);
|
||||
}
|
||||
|
||||
AssetServer::AssetServer(NLPacket& packet) :
|
||||
ThreadedAssignment(packet),
|
||||
_taskPool(this) {
|
||||
|
||||
// Most of the work will be I/O bound, reading from disk and constructing packet objects,
|
||||
// so the ideal is greater than the number of cores on the system.
|
||||
_taskPool.setMaxThreadCount(20);
|
||||
|
||||
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
|
||||
packetReceiver.registerListener(PacketType::AssetGet, this, "handleAssetGet");
|
||||
packetReceiver.registerListener(PacketType::AssetGetInfo, this, "handleAssetGetInfo");
|
||||
packetReceiver.registerListener(PacketType::AssetUpload, this, "handleAssetUpload");
|
||||
packetReceiver.registerMessageListener(PacketType::AssetUpload, this, "handleAssetUpload");
|
||||
}
|
||||
|
||||
AssetServer::~AssetServer() {
|
||||
|
@ -75,13 +160,10 @@ void AssetServer::run() {
|
|||
while (!_isFinished) {
|
||||
// since we're a while loop we need to help Qt's event processing
|
||||
QCoreApplication::processEvents();
|
||||
// QCoreApplication::processEvents(QEventLoop::WaitForMoreEvents);
|
||||
}
|
||||
}
|
||||
|
||||
void AssetServer::writeError(NLPacket* packet, AssetServerError error) {
|
||||
packet->writePrimitive(error);
|
||||
}
|
||||
|
||||
void AssetServer::handleAssetGetInfo(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
|
||||
if (packet->getPayloadSize() < HASH_HEX_LENGTH) {
|
||||
qDebug() << "ERROR bad file request";
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include <QDir>
|
||||
|
||||
#include <ThreadedAssignment.h>
|
||||
#include <QThreadPool>
|
||||
|
||||
#include "AssetUtils.h"
|
||||
|
||||
|
@ -31,11 +32,12 @@ public slots:
|
|||
private slots:
|
||||
void handleAssetGetInfo(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
|
||||
void handleAssetGet(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
|
||||
void handleAssetUpload(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
|
||||
void handleAssetUpload(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);
|
||||
|
||||
private:
|
||||
static void writeError(NLPacket* packet, AssetServerError error);
|
||||
static void writeError(NLPacketList* packetList, AssetServerError error);
|
||||
QDir _resourcesDirectory;
|
||||
QThreadPool _taskPool;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in a new issue