diff --git a/assignment-client/src/models/ModelServer.cpp b/assignment-client/src/models/ModelServer.cpp index 07359f001a..ae6ffaf969 100644 --- a/assignment-client/src/models/ModelServer.cpp +++ b/assignment-client/src/models/ModelServer.cpp @@ -86,7 +86,7 @@ bool ModelServer::hasSpecialPacketToSend(const SharedNodePointer& node) { return shouldSendDeletedModels; } -int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { +int ModelServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { unsigned char outputBuffer[MAX_PACKET_SIZE]; size_t packetLength = 0; @@ -99,6 +99,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP bool hasMoreToSend = true; // TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 models? + packetsSent = 0; while (hasMoreToSend) { hasMoreToSend = tree->encodeModelsDeletedSince(queryNode->getSequenceNumber(), deletedModelsSentAt, outputBuffer, MAX_PACKET_SIZE, packetLength); @@ -107,6 +108,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); queryNode->packetSent(outputBuffer, packetLength); + packetsSent++; } nodeData->setLastDeletedModelsSentAt(deletePacketSentAt); diff --git a/assignment-client/src/models/ModelServer.h b/assignment-client/src/models/ModelServer.h index 7e7f239f2a..38acc7f1e1 100644 --- a/assignment-client/src/models/ModelServer.h +++ b/assignment-client/src/models/ModelServer.h @@ -37,7 +37,7 @@ public: // subclass may implement these method virtual void beforeRun(); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); - virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); + virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent); virtual void modelCreated(const ModelItem& newModel, const SharedNodePointer& senderNode); diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index 30e3011ae8..cb149b1d96 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -85,7 +85,6 @@ bool OctreeSendThread::process() { if (nodeData && !nodeData->isShuttingDown()) { bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); packetDistributor(nodeData, viewFrustumChanged); - resendNackedPackets(nodeData); } } } @@ -281,26 +280,6 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes return packetsSent; } -int OctreeSendThread::resendNackedPackets(OctreeQueryNode* nodeData) { - - const int MAX_PACKETS_RESEND = 10; - int packetsSent = 0; - - const QByteArray* packet; - while (nodeData->hasNextNackedPacket() && packetsSent < MAX_PACKETS_RESEND) { - packet = nodeData->getNextNackedPacket(); - if (packet) { - NodeList::getInstance()->writeDatagram(*packet, _node); - packetsSent++; - - _totalBytes += packet->size(); - _totalPackets++; - _totalWastedBytes += MAX_PACKET_SIZE - packet->size(); - } - } - return packetsSent; -} - /// Version of voxel distributor that sends the deepest LOD level at once int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) { @@ -311,6 +290,10 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus return 0; } + // calculate max number of packets that can be sent during this interval + int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxOctreePacketsPerSecond() / INTERVALS_PER_SECOND)); + int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval()); + int truePacketsSent = 0; int trueBytesSent = 0; int packetsSentThisInterval = 0; @@ -408,9 +391,6 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus //quint64 startCompressTimeMsecs = OctreePacketData::getCompressContentTime() / 1000; //quint64 startCompressCalls = OctreePacketData::getCompressContentCalls(); - int clientMaxPacketsPerInterval = std::max(1,(nodeData->getMaxOctreePacketsPerSecond() / INTERVALS_PER_SECOND)); - int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval()); - int extraPackingAttempts = 0; bool completedScene = false; while (somethingToSend && packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) { @@ -581,12 +561,26 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus // send the environment packet // TODO: should we turn this into a while loop to better handle sending multiple special packets if (_myServer->hasSpecialPacketToSend(_node) && !nodeData->isShuttingDown()) { - trueBytesSent += _myServer->sendSpecialPacket(nodeData, _node); + int specialPacketsSent; + trueBytesSent += _myServer->sendSpecialPacket(_node, nodeData, specialPacketsSent); nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed - truePacketsSent++; - packetsSentThisInterval++; + truePacketsSent += specialPacketsSent; + packetsSentThisInterval += specialPacketsSent; } + // Re-send packets that were nacked by the client + while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) { + const QByteArray* packet = nodeData->getNextNackedPacket(); + if (packet) { + NodeList::getInstance()->writeDatagram(*packet, _node); + truePacketsSent++; + packetsSentThisInterval++; + + _totalBytes += packet->size(); + _totalPackets++; + _totalWastedBytes += MAX_PACKET_SIZE - packet->size(); + } + } quint64 end = usecTimestampNow(); int elapsedmsec = (end - start)/USECS_PER_MSEC; diff --git a/assignment-client/src/octree/OctreeSendThread.h b/assignment-client/src/octree/OctreeSendThread.h index e7599ebcd2..d8eed27802 100644 --- a/assignment-client/src/octree/OctreeSendThread.h +++ b/assignment-client/src/octree/OctreeSendThread.h @@ -55,9 +55,6 @@ private: int _nodeMissingCount; bool _isShuttingDown; - - int resendNackedPackets(OctreeQueryNode* nodeData); - }; #endif // hifi_OctreeSendThread_h diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index f0db93feb3..5595d139be 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -72,7 +72,7 @@ public: // subclass may implement these method virtual void beforeRun() { }; virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; } - virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { return 0; } + virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { return 0; } static void attachQueryNodeToNode(Node* newNode); diff --git a/assignment-client/src/particles/ParticleServer.cpp b/assignment-client/src/particles/ParticleServer.cpp index e7a0f75dfd..674d22145f 100644 --- a/assignment-client/src/particles/ParticleServer.cpp +++ b/assignment-client/src/particles/ParticleServer.cpp @@ -86,7 +86,7 @@ bool ParticleServer::hasSpecialPacketToSend(const SharedNodePointer& node) { return shouldSendDeletedParticles; } -int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { +int ParticleServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { unsigned char outputBuffer[MAX_PACKET_SIZE]; size_t packetLength = 0; @@ -99,6 +99,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo bool hasMoreToSend = true; // TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 particles? + packetsSent = 0; while (hasMoreToSend) { hasMoreToSend = tree->encodeParticlesDeletedSince(queryNode->getSequenceNumber(), deletedParticlesSentAt, outputBuffer, MAX_PACKET_SIZE, packetLength); @@ -107,6 +108,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); queryNode->packetSent(outputBuffer, packetLength); + packetsSent++; } nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt); diff --git a/assignment-client/src/particles/ParticleServer.h b/assignment-client/src/particles/ParticleServer.h index 3066c5fa98..d444368a9d 100644 --- a/assignment-client/src/particles/ParticleServer.h +++ b/assignment-client/src/particles/ParticleServer.h @@ -37,7 +37,7 @@ public: // subclass may implement these method virtual void beforeRun(); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); - virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); + virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent); virtual void particleCreated(const Particle& newParticle, const SharedNodePointer& senderNode); diff --git a/assignment-client/src/voxels/VoxelServer.cpp b/assignment-client/src/voxels/VoxelServer.cpp index 34b01f529a..b021ddd9f6 100644 --- a/assignment-client/src/voxels/VoxelServer.cpp +++ b/assignment-client/src/voxels/VoxelServer.cpp @@ -40,7 +40,7 @@ bool VoxelServer::hasSpecialPacketToSend(const SharedNodePointer& node) { return shouldSendEnvironments; } -int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { +int VoxelServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { unsigned char* copyAt = _tempOutputBuffer; @@ -76,6 +76,7 @@ int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node)); queryNode->packetSent(_tempOutputBuffer, envPacketLength); + packetsSent = 1; return envPacketLength; } diff --git a/assignment-client/src/voxels/VoxelServer.h b/assignment-client/src/voxels/VoxelServer.h index 4e04c48cfd..b13f83b65f 100644 --- a/assignment-client/src/voxels/VoxelServer.h +++ b/assignment-client/src/voxels/VoxelServer.h @@ -46,7 +46,7 @@ public: // subclass may implement these method virtual void beforeRun(); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); - virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); + virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent); private: bool _sendEnvironments; diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 3cfec3190e..2b3256141e 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -158,7 +158,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) : _mousePressed(false), _audio(STARTUP_JITTER_SAMPLES), _enableProcessVoxelsThread(true), - _voxelProcessor(), + _octreeProcessor(), _voxelHideShowThread(&_voxels), _packetsPerSecond(0), _bytesPerSecond(0), @@ -244,6 +244,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) : connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), &_voxels, SLOT(nodeAdded(SharedNodePointer))); connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_voxels, SLOT(nodeKilled(SharedNodePointer))); + connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_octreeProcessor, SLOT(nodeKilled(SharedNodePointer))); connect(nodeList, &NodeList::uuidChanged, this, &Application::updateWindowTitle); connect(nodeList, SIGNAL(uuidChanged(const QUuid&)), _myAvatar, SLOT(setSessionUUID(const QUuid&))); connect(nodeList, &NodeList::limitOfSilentDomainCheckInsReached, nodeList, &NodeList::reset); @@ -418,7 +419,7 @@ Application::~Application() { _audio.thread()->quit(); _audio.thread()->wait(); - _voxelProcessor.terminate(); + _octreeProcessor.terminate(); _voxelHideShowThread.terminate(); _voxelEditSender.terminate(); _particleEditSender.terminate(); @@ -517,7 +518,7 @@ void Application::initializeGL() { qDebug( "init() complete."); // create thread for parsing of voxel data independent of the main network and rendering threads - _voxelProcessor.initialize(_enableProcessVoxelsThread); + _octreeProcessor.initialize(_enableProcessVoxelsThread); _voxelEditSender.initialize(_enableProcessVoxelsThread); _voxelHideShowThread.initialize(_enableProcessVoxelsThread); _particleEditSender.initialize(_enableProcessVoxelsThread); @@ -1884,7 +1885,7 @@ void Application::updateThreads(float deltaTime) { // parse voxel packets if (!_enableProcessVoxelsThread) { - _voxelProcessor.threadRoutine(); + _octreeProcessor.threadRoutine(); _voxelHideShowThread.threadRoutine(); _voxelEditSender.threadRoutine(); _particleEditSender.threadRoutine(); @@ -2095,7 +2096,7 @@ void Application::updateMyAvatar(float deltaTime) { } } - // sent a nack packet containing missing sequence numbers of received packets + // sent nack packets containing missing sequence numbers of received packets from nodes { quint64 now = usecTimestampNow(); quint64 sinceLastNack = now - _lastNackTime; @@ -2125,6 +2126,12 @@ void Application::sendNack() { || node->getType() == NodeType::ModelServer) ) { + // if there are octree packets from this node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. + if (_octreeProcessor.hasPacketsToProcessFrom(node)) { + continue; + } + QUuid nodeUUID = node->getUUID(); _octreeSceneStatsLock.lockForRead(); diff --git a/interface/src/Application.h b/interface/src/Application.h index 170be43493..2889dcb301 100644 --- a/interface/src/Application.h +++ b/interface/src/Application.h @@ -88,7 +88,7 @@ #include "voxels/VoxelFade.h" #include "voxels/VoxelHideShowThread.h" #include "voxels/VoxelImporter.h" -#include "voxels/VoxelPacketProcessor.h" +#include "voxels/OctreePacketProcessor.h" #include "voxels/VoxelSystem.h" @@ -129,7 +129,7 @@ static const float MIRROR_FIELD_OF_VIEW = 30.0f; class Application : public QApplication { Q_OBJECT - friend class VoxelPacketProcessor; + friend class OctreePacketProcessor; friend class VoxelEditPacketSender; friend class DatagramProcessor; @@ -192,7 +192,7 @@ public: ViewFrustum* getShadowViewFrustum() { return &_shadowViewFrustum; } VoxelSystem* getVoxels() { return &_voxels; } VoxelTree* getVoxelTree() { return _voxels.getTree(); } - const VoxelPacketProcessor& getVoxelPacketProcessor() const { return _voxelProcessor; } + const OctreePacketProcessor& getOctreePacketProcessor() const { return _octreeProcessor; } ParticleTreeRenderer* getParticles() { return &_particles; } MetavoxelSystem* getMetavoxels() { return &_metavoxels; } ModelTreeRenderer* getModels() { return &_models; } @@ -533,7 +533,7 @@ private: Audio _audio; bool _enableProcessVoxelsThread; - VoxelPacketProcessor _voxelProcessor; + OctreePacketProcessor _octreeProcessor; VoxelHideShowThread _voxelHideShowThread; VoxelEditPacketSender _voxelEditSender; ParticleEditPacketSender _particleEditSender; diff --git a/interface/src/DatagramProcessor.cpp b/interface/src/DatagramProcessor.cpp index 56078c1a8d..cdd7e7ef0f 100644 --- a/interface/src/DatagramProcessor.cpp +++ b/interface/src/DatagramProcessor.cpp @@ -71,7 +71,7 @@ void DatagramProcessor::processDatagrams() { case PacketTypeOctreeStats: case PacketTypeEnvironmentData: { PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), - "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); + "Application::networkReceive()... _octreeProcessor.queueReceivedPacket()"); bool wantExtraDebugging = application->getLogger()->extraDebugging(); if (wantExtraDebugging && packetTypeForPacket(incomingPacket) == PacketTypeVoxelData) { @@ -92,7 +92,7 @@ void DatagramProcessor::processDatagrams() { if (matchedNode) { // add this packet to our list of voxel packets and process them on the voxel processing - application->_voxelProcessor.queueReceivedPacket(matchedNode, incomingPacket); + application->_octreeProcessor.queueReceivedPacket(matchedNode, incomingPacket); } break; diff --git a/interface/src/ui/ApplicationOverlay.cpp b/interface/src/ui/ApplicationOverlay.cpp index f5de7459a8..a48e0a2c1a 100644 --- a/interface/src/ui/ApplicationOverlay.cpp +++ b/interface/src/ui/ApplicationOverlay.cpp @@ -45,7 +45,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) { QGLWidget* glWidget = application->getGLWidget(); MyAvatar* myAvatar = application->getAvatar(); Audio* audio = application->getAudio(); - const VoxelPacketProcessor& voxelPacketProcessor = application->getVoxelPacketProcessor(); + const OctreePacketProcessor& octreePacketProcessor = application->getOctreePacketProcessor(); BandwidthMeter* bandwidthMeter = application->getBandwidthMeter(); NodeBounds& nodeBoundsDisplay = application->getNodeBoundsDisplay(); @@ -200,7 +200,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) { if (Menu::getInstance()->isOptionChecked(MenuOption::Stats)) { // let's set horizontal offset to give stats some margin to mirror int horizontalOffset = MIRROR_VIEW_WIDTH + MIRROR_VIEW_LEFT_PADDING * 2; - int voxelPacketsToProcess = voxelPacketProcessor.packetsToProcessCount(); + int voxelPacketsToProcess = octreePacketProcessor.packetsToProcessCount(); // Onscreen text about position, servers, etc Stats::getInstance()->display(WHITE_TEXT, horizontalOffset, application->getFps(), application->getPacketsPerSecond(), application->getBytesPerSecond(), voxelPacketsToProcess); // Bandwidth meter diff --git a/interface/src/voxels/VoxelPacketProcessor.cpp b/interface/src/voxels/OctreePacketProcessor.cpp similarity index 92% rename from interface/src/voxels/VoxelPacketProcessor.cpp rename to interface/src/voxels/OctreePacketProcessor.cpp index 095c378c04..66190a5689 100644 --- a/interface/src/voxels/VoxelPacketProcessor.cpp +++ b/interface/src/voxels/OctreePacketProcessor.cpp @@ -1,5 +1,5 @@ // -// VoxelPacketProcessor.cpp +// OctreePacketProcessor.cpp // interface/src/voxels // // Created by Brad Hefta-Gaub on 8/12/13. @@ -13,18 +13,18 @@ #include "Application.h" #include "Menu.h" -#include "VoxelPacketProcessor.h" +#include "OctreePacketProcessor.h" -void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { +void OctreePacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), - "VoxelPacketProcessor::processPacket()"); + "OctreePacketProcessor::processPacket()"); QByteArray mutablePacket = packet; const int WAY_BEHIND = 300; if (packetsToProcessCount() > WAY_BEHIND && Application::getInstance()->getLogger()->extraDebugging()) { - qDebug("VoxelPacketProcessor::processPacket() packets to process=%d", packetsToProcessCount()); + qDebug("OctreePacketProcessor::processPacket() packets to process=%d", packetsToProcessCount()); } ssize_t messageLength = mutablePacket.size(); diff --git a/interface/src/voxels/VoxelPacketProcessor.h b/interface/src/voxels/OctreePacketProcessor.h similarity index 76% rename from interface/src/voxels/VoxelPacketProcessor.h rename to interface/src/voxels/OctreePacketProcessor.h index 36456c5cc2..bdf25806f3 100644 --- a/interface/src/voxels/VoxelPacketProcessor.h +++ b/interface/src/voxels/OctreePacketProcessor.h @@ -1,5 +1,5 @@ // -// VoxelPacketProcessor.h +// OctreePacketProcessor.h // interface/src/voxels // // Created by Brad Hefta-Gaub on 8/12/13. @@ -9,16 +9,16 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // -#ifndef hifi_VoxelPacketProcessor_h -#define hifi_VoxelPacketProcessor_h +#ifndef hifi_OctreePacketProcessor_h +#define hifi_OctreePacketProcessor_h #include /// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes /// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket() -class VoxelPacketProcessor : public ReceivedPacketProcessor { +class OctreePacketProcessor : public ReceivedPacketProcessor { Q_OBJECT protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); }; -#endif // hifi_VoxelPacketProcessor_h +#endif // hifi_OctreePacketProcessor_h diff --git a/libraries/networking/src/NetworkPacket.cpp b/libraries/networking/src/NetworkPacket.cpp index bf18aa9b37..a8110847e1 100644 --- a/libraries/networking/src/NetworkPacket.cpp +++ b/libraries/networking/src/NetworkPacket.cpp @@ -17,9 +17,9 @@ #include "NetworkPacket.h" -void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const QByteArray& packet) { +void NetworkPacket::copyContents(const SharedNodePointer& node, const QByteArray& packet) { if (packet.size() && packet.size() <= MAX_PACKET_SIZE) { - _destinationNode = destinationNode; + _node = node; _byteArray = packet; } else { qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size()); @@ -27,28 +27,28 @@ void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const } NetworkPacket::NetworkPacket(const NetworkPacket& packet) { - copyContents(packet.getDestinationNode(), packet.getByteArray()); + copyContents(packet.getNode(), packet.getByteArray()); } -NetworkPacket::NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { - copyContents(destinationNode, packet); +NetworkPacket::NetworkPacket(const SharedNodePointer& node, const QByteArray& packet) { + copyContents(node, packet); }; // copy assignment NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) { - copyContents(other.getDestinationNode(), other.getByteArray()); + copyContents(other.getNode(), other.getByteArray()); return *this; } #ifdef HAS_MOVE_SEMANTICS // move, same as copy, but other packet won't be used further NetworkPacket::NetworkPacket(NetworkPacket && packet) { - copyContents(packet.getDestinationNode(), packet.getByteArray()); + copyContents(packet.getNode(), packet.getByteArray()); } // move assignment NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) { - copyContents(other.getDestinationNode(), other.getByteArray()); + copyContents(other.getNode(), other.getByteArray()); return *this; } #endif diff --git a/libraries/networking/src/NetworkPacket.h b/libraries/networking/src/NetworkPacket.h index 94ddf8d56e..0be35f9fff 100644 --- a/libraries/networking/src/NetworkPacket.h +++ b/libraries/networking/src/NetworkPacket.h @@ -26,6 +26,7 @@ /// Storage of not-yet processed inbound, or not yet sent outbound generic UDP network packet class NetworkPacket { public: + NetworkPacket() { } NetworkPacket(const NetworkPacket& packet); // copy constructor NetworkPacket& operator= (const NetworkPacket& other); // copy assignment @@ -34,15 +35,15 @@ public: NetworkPacket& operator= (NetworkPacket&& other); // move assignment #endif - NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& byteArray); + NetworkPacket(const SharedNodePointer& node, const QByteArray& byteArray); - const SharedNodePointer& getDestinationNode() const { return _destinationNode; } + const SharedNodePointer& getNode() const { return _node; } const QByteArray& getByteArray() const { return _byteArray; } private: - void copyContents(const SharedNodePointer& destinationNode, const QByteArray& byteArray); + void copyContents(const SharedNodePointer& node, const QByteArray& byteArray); - SharedNodePointer _destinationNode; + SharedNodePointer _node; QByteArray _byteArray; }; diff --git a/libraries/networking/src/PacketSender.cpp b/libraries/networking/src/PacketSender.cpp index 5f7502a738..3edfc47c04 100644 --- a/libraries/networking/src/PacketSender.cpp +++ b/libraries/networking/src/PacketSender.cpp @@ -271,7 +271,7 @@ bool PacketSender::nonThreadedProcess() { unlock(); // send the packet through the NodeList... - NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getDestinationNode()); + NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getNode()); packetsSentThisCall++; _packetsOverCheckInterval++; _totalPacketsSent++; diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index d54e165285..d556e8a059 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -17,13 +17,14 @@ void ReceivedPacketProcessor::terminating() { _hasPackets.wakeAll(); } -void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { +void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { // Make sure our Node and NodeList knows we've heard from this node. - destinationNode->setLastHeardMicrostamp(usecTimestampNow()); + sendingNode->setLastHeardMicrostamp(usecTimestampNow()); - NetworkPacket networkPacket(destinationNode, packet); + NetworkPacket networkPacket(sendingNode, packet); lock(); _packets.push_back(networkPacket); + _nodePacketCounts[sendingNode->getUUID()]++; unlock(); // Make sure to wake our actual processing thread because we now have packets for it to process. @@ -42,8 +43,13 @@ bool ReceivedPacketProcessor::process() { NetworkPacket& packet = _packets.front(); // get the oldest packet NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us _packets.erase(_packets.begin()); // remove the oldest packet + _nodePacketCounts[temporary.getNode()->getUUID()]--; unlock(); // let others add to the packets - processPacket(temporary.getDestinationNode(), temporary.getByteArray()); // process our temporary copy + processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy } return isStillRunning(); // keep running till they terminate us } + +void ReceivedPacketProcessor::killNode(const SharedNodePointer& node) { + _nodePacketCounts.remove(node->getUUID()); +} diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index f8306b4896..80fe75aaa7 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -28,20 +28,26 @@ public: /// \param packetData pointer to received data /// \param ssize_t packetLength size of received data /// \thread network receive thread - void queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet); + void queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); /// Are there received packets waiting to be processed bool hasPacketsToProcess() const { return _packets.size() > 0; } + /// Are there received packets waiting to be processed from a certain node + bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { + return _nodePacketCounts[sendingNode->getUUID()] > 0; + } + /// How many received packets waiting are to be processed int packetsToProcessCount() const { return _packets.size(); } +public slots: + void killNode(const SharedNodePointer& node); + protected: /// Callback for processing of recieved packets. Implement this to process the incoming packets. - /// \param sockaddr& senderAddress the address of the sender - /// \param packetData pointer to received data - /// \param ssize_t packetLength size of received data - /// \thread "this" individual processing thread + /// \param SharedNodePointer& sendingNode the node that sent this packet + /// \param QByteArray& the packet to be processed virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) = 0; /// Implements generic processing behavior for this thread. @@ -51,7 +57,9 @@ protected: private: - std::vector _packets; + QVector _packets; + QHash _nodePacketCounts; + QWaitCondition _hasPackets; QMutex _waitingOnPacketsMutex; };