diff --git a/assignment-client/src/octree/OctreeQueryNode.cpp b/assignment-client/src/octree/OctreeQueryNode.cpp index 57f408adf0..2d8d8d357e 100644 --- a/assignment-client/src/octree/OctreeQueryNode.cpp +++ b/assignment-client/src/octree/OctreeQueryNode.cpp @@ -389,7 +389,7 @@ const QByteArray* OctreeQueryNode::getNextNackedPacket() { return NULL; } -void OctreeQueryNode::parseNackPacket(QByteArray& packet) { +void OctreeQueryNode::parseNackPacket(const QByteArray& packet) { int numBytesPacketHeader = numBytesForPacketHeader(packet); const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; diff --git a/assignment-client/src/octree/OctreeQueryNode.h b/assignment-client/src/octree/OctreeQueryNode.h index aeb5c852e4..d7b660f18a 100644 --- a/assignment-client/src/octree/OctreeQueryNode.h +++ b/assignment-client/src/octree/OctreeQueryNode.h @@ -109,7 +109,7 @@ public: OCTREE_PACKET_SEQUENCE getSequenceNumber() const { return _sequenceNumber; } - void parseNackPacket(QByteArray& packet); + void parseNackPacket(const QByteArray& packet); bool hasNextNackedPacket() const; const QByteArray* getNextNackedPacket(); diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index ca6c498aa3..52a027852c 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -25,6 +25,7 @@ #include "OctreeServer.h" #include "OctreeServerConsts.h" +#include "OctreeServerDatagramProcessor.h" OctreeServer* OctreeServer::_instance = NULL; int OctreeServer::_clientCount = 0; @@ -827,55 +828,83 @@ void OctreeServer::parsePayload() { } } -void OctreeServer::readPendingDatagrams() { - QByteArray receivedPacket; - HifiSockAddr senderSockAddr; - +void OctreeServer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) { NodeList* nodeList = NodeList::getInstance(); - while (readAvailableDatagram(receivedPacket, senderSockAddr)) { - if (nodeList->packetVersionAndHashMatch(receivedPacket)) { - PacketType packetType = packetTypeForPacket(receivedPacket); - SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket); - if (packetType == getMyQueryMessageType()) { - // If we got a query packet, then we're talking to an agent, and we - // need to make sure we have it in our nodeList. - if (matchingNode) { - nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket); - OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); - if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { - - // NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to - // know that the OctreeServer/Assignment will not get deleted on it while it's still active. The - // solution is to get the shared pointer for the current assignment. We need to make sure this is the - // same SharedAssignmentPointer that was ref counted by the assignment client. - SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment(); - nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode); - } + if (nodeList->packetVersionAndHashMatch(receivedPacket)) { + PacketType packetType = packetTypeForPacket(receivedPacket); + SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket); + if (packetType == getMyQueryMessageType()) { + // If we got a query packet, then we're talking to an agent, and we + // need to make sure we have it in our nodeList. + if (matchingNode) { + nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket); + OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); + if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { + + // NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to + // know that the OctreeServer/Assignment will not get deleted on it while it's still active. The + // solution is to get the shared pointer for the current assignment. We need to make sure this is the + // same SharedAssignmentPointer that was ref counted by the assignment client. + SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment(); + nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode); } - } else if (packetType == PacketTypeOctreeDataNack) { - // If we got a nack packet, then we're talking to an agent, and we - // need to make sure we have it in our nodeList. - if (matchingNode) { - OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); - if (nodeData) { - nodeData->parseNackPacket(receivedPacket); - } - } - } else if (packetType == PacketTypeJurisdictionRequest) { - _jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket); - } else if (packetType == PacketTypeSignedTransactionPayment) { - handleSignedTransactionPayment(packetType, receivedPacket); - } else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) { - _octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket); - } else { - // let processNodeData handle it. - NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket); } + } else if (packetType == PacketTypeOctreeDataNack) { + // If we got a nack packet, then we're talking to an agent, and we + // need to make sure we have it in our nodeList. + if (matchingNode) { + OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); + if (nodeData) { + nodeData->parseNackPacket(receivedPacket); + } + } + } else if (packetType == PacketTypeJurisdictionRequest) { + _jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket); + } else if (packetType == PacketTypeSignedTransactionPayment) { + handleSignedTransactionPayment(packetType, receivedPacket); + } else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) { + _octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket); + } else { + // let processNodeData handle it. + NodeList::getInstance()->processNodeData(senderSockAddr, receivedPacket); } } } +void OctreeServer::setupDatagramProcessingThread() { + NodeList* nodeList = NodeList::getInstance(); + + // we do not want this event loop to be the handler for UDP datagrams, so disconnect + disconnect(&nodeList->getNodeSocket(), 0, this, 0); + + // setup a QThread with us as parent that will house the AudioMixerDatagramProcessor + _datagramProcessingThread = new QThread(this); + + // create an AudioMixerDatagramProcessor and move it to that thread + OctreeServerDatagramProcessor* datagramProcessor = new OctreeServerDatagramProcessor(nodeList->getNodeSocket(), thread()); + datagramProcessor->moveToThread(_datagramProcessingThread); + + // remove the NodeList as the parent of the node socket + nodeList->getNodeSocket().setParent(NULL); + nodeList->getNodeSocket().moveToThread(_datagramProcessingThread); + + // let the datagram processor handle readyRead from node socket + connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, + datagramProcessor, &OctreeServerDatagramProcessor::readPendingDatagrams); + + // connect to the datagram processing thread signal that tells us we have to handle a packet + connect(datagramProcessor, &OctreeServerDatagramProcessor::packetRequiresProcessing, this, &OctreeServer::readPendingDatagram); + + // delete the datagram processor and the associated thread when the QThread quits + connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater); + connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater); + + // start the datagram processing thread + _datagramProcessingThread->start(); + +} + void OctreeServer::run() { _safeServerName = getMyServerName(); @@ -887,6 +916,8 @@ void OctreeServer::run() { // use common init to setup common timers and logging commonInit(getMyLoggingServerTargetName(), getMyNodeType()); + setupDatagramProcessingThread(); + // Now would be a good time to parse our arguments, if we got them as assignment if (getPayload().size() > 0) { parsePayload(); diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index f24cdc5af2..fda3187892 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -123,13 +123,15 @@ public: public slots: /// runs the voxel server assignment void run(); - void readPendingDatagrams(); void nodeAdded(SharedNodePointer node); void nodeKilled(SharedNodePointer node); void sendStatsPacket(); void handleSignedTransactionPaymentResponse(const QJsonObject& jsonObject); + void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle + void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); + protected: void parsePayload(); void initHTTPManager(int port); @@ -140,6 +142,7 @@ protected: QString getStatusLink(); void handleSignedTransactionPayment(PacketType packetType, const QByteArray& datagram); + void setupDatagramProcessingThread(); int _argc; const char** _argv; diff --git a/assignment-client/src/octree/OctreeServerDatagramProcessor.cpp b/assignment-client/src/octree/OctreeServerDatagramProcessor.cpp new file mode 100644 index 0000000000..0d3c622900 --- /dev/null +++ b/assignment-client/src/octree/OctreeServerDatagramProcessor.cpp @@ -0,0 +1,55 @@ +// +// OctreeServerDatagramProcessor.cpp +// assignment-client/src +// +// Created by Brad Hefta-Gaub on 2014-09-05 +// Copyright 2014 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include + +#include +#include +#include +#include + +#include "OctreeServerDatagramProcessor.h" + +OctreeServerDatagramProcessor::OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) : + _nodeSocket(nodeSocket), + _previousNodeSocketThread(previousNodeSocketThread) +{ + +} + +OctreeServerDatagramProcessor::~OctreeServerDatagramProcessor() { + // return the node socket to its previous thread + _nodeSocket.moveToThread(_previousNodeSocketThread); +} + +void OctreeServerDatagramProcessor::readPendingDatagrams() { + + HifiSockAddr senderSockAddr; + static QByteArray incomingPacket; + + // read everything that is available + while (_nodeSocket.hasPendingDatagrams()) { + incomingPacket.resize(_nodeSocket.pendingDatagramSize()); + + // just get this packet off the stack + _nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(), + senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); + + PacketType packetType = packetTypeForPacket(incomingPacket); + if (packetType == PacketTypePing) { + NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket); + return; // don't emit + } + + // emit the signal to tell AudioMixer it needs to process a packet + emit packetRequiresProcessing(incomingPacket, senderSockAddr); + } +} diff --git a/assignment-client/src/octree/OctreeServerDatagramProcessor.h b/assignment-client/src/octree/OctreeServerDatagramProcessor.h new file mode 100644 index 0000000000..cefe03a64a --- /dev/null +++ b/assignment-client/src/octree/OctreeServerDatagramProcessor.h @@ -0,0 +1,32 @@ +// +// OctreeServerDatagramProcessor.h +// assignment-client/src +// +// Created by Stephen Birarda on 2014-08-14. +// Copyright 2014 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_OctreeServerDatagramProcessor_h +#define hifi_OctreeServerDatagramProcessor_h + +#include +#include + +class OctreeServerDatagramProcessor : public QObject { + Q_OBJECT +public: + OctreeServerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread); + ~OctreeServerDatagramProcessor(); +public slots: + void readPendingDatagrams(); +signals: + void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); +private: + QUdpSocket& _nodeSocket; + QThread* _previousNodeSocketThread; +}; + +#endif // hifi_OctreeServerDatagramProcessor_h \ No newline at end of file