From ebfd65dea8cb86310f8c8c103b19c3217fac0e70 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 12:30:16 -0700 Subject: [PATCH] added OctreeInboundPacketProcessor::process() override to send nack periodically added code to remove dead nodes' stats in sendNackPackets() --- .../octree/OctreeInboundPacketProcessor.cpp | 62 +++++++++++++++++-- .../src/octree/OctreeInboundPacketProcessor.h | 7 ++- assignment-client/src/octree/OctreeServer.cpp | 3 + .../src/ReceivedPacketProcessor.cpp | 1 + .../networking/src/ReceivedPacketProcessor.h | 10 ++- 5 files changed, 74 insertions(+), 9 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 683fe188b4..9b849905c5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -25,7 +25,8 @@ OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServe _totalProcessTime(0), _totalLockWaitTime(0), _totalElementsInPacket(0), - _totalPackets(0) + _totalPackets(0), + _lastNackTime(usecTimestampNow()) { } @@ -35,10 +36,52 @@ void OctreeInboundPacketProcessor::resetStats() { _totalLockWaitTime = 0; _totalElementsInPacket = 0; _totalPackets = 0; + _lastNackTime = usecTimestampNow(); _singleSenderStats.clear(); } +bool OctreeInboundPacketProcessor::process() { + + const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; + quint64 now = usecTimestampNow(); + + if (_packets.size() == 0) { + // calculate time until next sendNackPackets() + quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; + quint64 now = usecTimestampNow(); + if (now >= nextNackTime) { + // send nacks if we're already past time to send it + _lastNackTime = now; + sendNackPackets(); + } + else { + // otherwise, wait until the next nack time or until a packet arrives + quint64 waitTimeMsecs = (nextNackTime - now) / USECS_PER_MSEC + 1; + _waitingOnPacketsMutex.lock(); + _hasPackets.wait(&_waitingOnPacketsMutex, waitTimeMsecs); + _waitingOnPacketsMutex.unlock(); + } + } + while (_packets.size() > 0) { + lock(); // lock to make sure nothing changes on us + NetworkPacket& packet = _packets.front(); // get the oldest packet + NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us + _packets.erase(_packets.begin()); // remove the oldest packet + _nodePacketCounts[temporary.getNode()->getUUID()]--; + unlock(); // let others add to the packets + processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy + + // if it's time to send nacks, send them. + if (usecTimestampNow() - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } + } + return isStillRunning(); // keep running till they terminate us +} + + void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { bool debugProcessPacket = _myServer->wantsVerboseDebug(); @@ -155,15 +198,21 @@ int OctreeInboundPacketProcessor::sendNackPackets() { int packetsSent = 0; - // if there are packets from _node that are waiting to be processed, - // don't send a NACK since the missing packets may be among those waiting packets. - - NodeToSenderStatsMap::const_iterator begin = _singleSenderStats.begin(), end = _singleSenderStats.end(); - for (NodeToSenderStatsMap::const_iterator i = begin; i != end; i++) { + NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); + while (i != _singleSenderStats.end()) { QUuid nodeUUID = i.key(); SingleSenderStats nodeStats = i.value(); + // check if this node is still alive. Remove its stats if it's dead. + if (!isAlive(nodeUUID)) { + printf("\t\t removing node %s\n", nodeUUID.toString().toLatin1().data()); + i = _singleSenderStats.erase(i); + continue; + } + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. if (hasPacketsToProcessFrom(nodeUUID)) { continue; } @@ -214,6 +263,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() { packetsSent++; } + i++; } return packetsSent; } diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 9649da2d61..378cc9a891 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -75,9 +75,12 @@ public: NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } protected: + virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); -public slots: + virtual bool process(); + +public: int sendNackPackets(); private: @@ -94,5 +97,7 @@ private: quint64 _totalPackets; NodeToSenderStatsMap _singleSenderStats; + + quint64 _lastNackTime; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index 39f51a0ba9..c6ef4aa5aa 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -1060,6 +1060,9 @@ void OctreeServer::nodeAdded(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) { quint64 start = usecTimestampNow(); + // calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!! + _octreeInboundPacketProcessor->nodeKilled(node); + qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node; OctreeQueryNode* nodeData = static_cast(node->getLinkedData()); if (nodeData) { diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index 3ef518bbc2..d85f09fb0a 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -54,4 +54,5 @@ void ReceivedPacketProcessor::nodeKilled(SharedNodePointer node) { lock(); _nodePacketCounts.remove(node->getUUID()); unlock(); + printf("\n\t\t nodeKilled()!!!!! --------------------------\n\n"); } diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 31cbc3a487..4322c87910 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -33,11 +33,17 @@ public: /// Are there received packets waiting to be processed bool hasPacketsToProcess() const { return _packets.size() > 0; } - /// Are there received packets waiting to be processed from a certain node + /// Is a specified node still alive? + bool isAlive(const QUuid& nodeUUID) const { + return _nodePacketCounts.contains(nodeUUID); + } + + /// Are there received packets waiting to be processed from a specified node bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { return hasPacketsToProcessFrom(sendingNode->getUUID()); } + /// Are there received packets waiting to be processed from a specified node bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const { return _nodePacketCounts[nodeUUID] > 0; } @@ -59,7 +65,7 @@ protected: virtual void terminating(); -private: +protected: QVector _packets; QHash _nodePacketCounts;