From 8c4e365958654580132ed955ae22bb1cdbc72aed Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 09:28:42 -0700 Subject: [PATCH] added forgotten i++ in sendNackPackets() plus minor style fixes --- .../octree/OctreeInboundPacketProcessor.cpp | 71 +++++++------------ .../src/octree/OctreeInboundPacketProcessor.h | 6 +- .../src/ReceivedPacketProcessor.cpp | 5 +- .../networking/src/ReceivedPacketProcessor.h | 12 ++++ libraries/octree/src/OctreeSceneStats.cpp | 11 ++- 5 files changed, 50 insertions(+), 55 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 91a5c62752..6bc085df4b 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -17,6 +17,7 @@ #include "OctreeInboundPacketProcessor.h" static QUuid DEFAULT_NODE_ID_REF; +const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServer) : _myServer(myServer), @@ -41,46 +42,33 @@ void OctreeInboundPacketProcessor::resetStats() { _singleSenderStats.clear(); } -bool OctreeInboundPacketProcessor::process() { - - const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; +unsigned long OctreeInboundPacketProcessor::getMaxWait() const { + // calculate time until next sendNackPackets() + quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; quint64 now = usecTimestampNow(); - - if (_packets.size() == 0) { - // calculate time until next sendNackPackets() - quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; - quint64 now = usecTimestampNow(); - if (now >= nextNackTime) { - // send nacks if we're already past time to send it - _lastNackTime = now; - sendNackPackets(); - } - else { - // otherwise, wait until the next nack time or until a packet arrives - quint64 waitTimeMsecs = (nextNackTime - now) / USECS_PER_MSEC + 1; - _waitingOnPacketsMutex.lock(); - _hasPackets.wait(&_waitingOnPacketsMutex, waitTimeMsecs); - _waitingOnPacketsMutex.unlock(); - } + if (now >= nextNackTime) { + return 0; } - while (_packets.size() > 0) { - lock(); // lock to make sure nothing changes on us - NetworkPacket& packet = _packets.front(); // get the oldest packet - NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us - _packets.erase(_packets.begin()); // remove the oldest packet - _nodePacketCounts[temporary.getNode()->getUUID()]--; - unlock(); // let others add to the packets - processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy - - // if it's time to send nacks, send them. - if (usecTimestampNow() - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { - _lastNackTime = now; - sendNackPackets(); - } - } - return isStillRunning(); // keep running till they terminate us + return (nextNackTime - now) / USECS_PER_MSEC + 1; } +void OctreeInboundPacketProcessor::preProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} + +void OctreeInboundPacketProcessor::midProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { @@ -211,6 +199,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() { // if there are packets from _node that are waiting to be processed, // don't send a NACK since the missing packets may be among those waiting packets. if (hasPacketsToProcessFrom(nodeUUID)) { + i++; continue; } @@ -279,7 +268,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { const int UINT16_RANGE = UINT16_MAX + 1; - const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; @@ -287,9 +275,7 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; - } - else { // out of order - + } else { // out of order int incoming = (int)incomingSequence; int expected = (int)expectedSequence; @@ -312,17 +298,13 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, // now that rollover has been corrected for (if it occurred), incoming and expected can be // compared to each other directly, though one of them might be negative - if (incoming > expected) { // early - // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); } _incomingLastSequence = incomingSequence; - } else { // late - // remove this from missing sequence number if it's in there _missingSequenceNumbers.remove(incomingSequence); @@ -333,7 +315,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, // prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP // will be removed. if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { - // some older sequence numbers may be from before a rollover point; this must be handled. // some sequence numbers in this list may be larger than _incomingLastSequence, indicating that they were received // before the most recent rollover. diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index d3b3b80208..46a57205cb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -77,9 +77,11 @@ protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); - virtual bool process(); + virtual unsigned long getMaxWait() const; + virtual void preProcess(); + virtual void midProcess(); -public: +private: int sendNackPackets(); private: diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index 3ef518bbc2..59e1ecd456 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -35,9 +35,10 @@ bool ReceivedPacketProcessor::process() { if (_packets.size() == 0) { _waitingOnPacketsMutex.lock(); - _hasPackets.wait(&_waitingOnPacketsMutex); + _hasPackets.wait(&_waitingOnPacketsMutex, getMaxWait()); _waitingOnPacketsMutex.unlock(); } + preProcess(); while (_packets.size() > 0) { lock(); // lock to make sure nothing changes on us NetworkPacket& packet = _packets.front(); // get the oldest packet @@ -46,7 +47,9 @@ bool ReceivedPacketProcessor::process() { _nodePacketCounts[temporary.getNode()->getUUID()]--; unlock(); // let others add to the packets processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy + midProcess(); } + postProcess(); return isStillRunning(); // keep running till they terminate us } diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 4322c87910..607f9e54c2 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -63,6 +63,18 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); + /// Determines the timeout of the wait when there are no packets to process. Default value means no timeout + virtual unsigned long getMaxWait() const { return ULONG_MAX; } + + /// Override to do work before the packets processing loop. Default does nothing. + virtual void preProcess() { } + + /// Override to do work inside the packet processing loop after a packet is processed. Default does nothing. + virtual void midProcess() { } + + /// Override to do work after the packets processing loop. Default does nothing. + virtual void postProcess() { } + virtual void terminating(); protected: diff --git a/libraries/octree/src/OctreeSceneStats.cpp b/libraries/octree/src/OctreeSceneStats.cpp index e99096dd5c..1ed078ed14 100644 --- a/libraries/octree/src/OctreeSceneStats.cpp +++ b/libraries/octree/src/OctreeSceneStats.cpp @@ -843,7 +843,7 @@ const char* OctreeSceneStats::getItemValue(Item item) { void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, bool wasStatsPacket, int nodeClockSkewUsec) { - const bool wantExtraDebugging = true; + const bool wantExtraDebugging = false; int numBytesPacketHeader = numBytesForPacketHeader(packet); const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; @@ -891,8 +891,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, if (wantExtraDebugging) { qDebug() << "last packet duplicate got:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; } - } - else { + } else { if (sequence != expected) { if (wantExtraDebugging) { qDebug() << "out of order... got:" << sequence << "expected:" << expected; @@ -967,8 +966,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, // only bump the last sequence if it was greater than our expected sequence, this will keep us from // accidentally going backwards when an out of order (recovered) packet comes in - } - else { // sequenceInt > expectedInt + } else { // sequenceInt > expectedInt // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] // if rollover between them: sequenceInt in [0, UINT16_RANGE-1], expectedInt in [-UINT16_RANGE, -1] @@ -992,8 +990,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, _incomingLastSequence = sequence; } - } - else { // sequence = expected + } else { // sequence = expected _incomingLastSequence = sequence; }