From 7955979599cced0a45d9cbf9da89ec643f073094 Mon Sep 17 00:00:00 2001 From: wangyix Date: Thu, 12 Jun 2014 09:17:12 -0700 Subject: [PATCH 01/30] added _missingSequenceNumbers tracking to OctreeInboundPacketProcessor --- .../octree/OctreeInboundPacketProcessor.cpp | 83 ++++++++++++++----- .../src/octree/OctreeInboundPacketProcessor.h | 9 +- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 545c502036..52407f78d5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -123,13 +123,13 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin qDebug() << "sender has no known nodeUUID."; } } - trackInboundPackets(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); + trackInboundPacket(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); } else { qDebug("unknown packet ignored... packetType=%d", packetType); } } -void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, +void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { _totalTransitTime += transitTime; @@ -142,31 +142,74 @@ void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, in // see if this is the first we've heard of this node... if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { SingleSenderStats stats; - - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; - + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); _singleSenderStats[nodeUUID] = stats; } else { SingleSenderStats& stats = _singleSenderStats[nodeUUID]; - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); } } +SingleSenderStats::SingleSenderStats() + : _totalTransitTime(0), + _totalProcessTime(0), + _totalLockWaitTime(0), + _totalElementsInPacket(0), + _totalPackets(0), + _missingSequenceNumbers() +{ -SingleSenderStats::SingleSenderStats() { - _totalTransitTime = 0; - _totalProcessTime = 0; - _totalLockWaitTime = 0; - _totalElementsInPacket = 0; - _totalPackets = 0; } +void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime) { + const int MAX_REASONABLE_SEQUENCE_GAP = 1000; + const int MAX_MISSING_SEQUENCE_SIZE = 100; + + unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; + + if (incomingSequence == expectedSequence) { // on time + _incomingLastSequence = incomingSequence; + } + else { + // ignore packet if sequence number gap is unreasonable + if (std::abs(incomingSequence - expectedSequence) > MAX_REASONABLE_SEQUENCE_GAP) { + qDebug() << "ignoring unreasonable packet... sequence:" << incomingSequence + << "_incomingLastSequence:" << _incomingLastSequence; + return; + } + + if (incomingSequence > expectedSequence) { // early + + // add all sequence numbers that were skipped to the missing sequence numbers list + for (int missingSequence = expectedSequence; missingSequence < incomingSequence; missingSequence++) { + _missingSequenceNumbers.insert(missingSequence); + } + _incomingLastSequence = incomingSequence; + + } else { // late + + // remove this from missing sequence number if it's in there + _missingSequenceNumbers.remove(incomingSequence); + + // do not update _incomingLastSequence + } + } + + // prune missing sequence list if it gets too big + if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } + + // update other stats + _totalTransitTime += transitTime; + _totalProcessTime += processTime; + _totalLockWaitTime += lockWaitTime; + _totalElementsInPacket += editsInPacket; + _totalPackets++; +} \ No newline at end of file diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index f637a9e7c9..82471bcddf 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -32,12 +32,19 @@ public: { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } quint64 getAverageLockWaitTimePerElement() const { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } + + void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime); + quint64 _totalTransitTime; quint64 _totalProcessTime; quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; + + unsigned short int _incomingLastSequence; + QSet _missingSequenceNumbers; }; typedef std::map NodeToSenderStatsMap; @@ -69,7 +76,7 @@ protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); private: - void trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, + void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); OctreeServer* _myServer; From 15bd0878c407ede95f724009adadc81deeb7b4fd Mon Sep 17 00:00:00 2001 From: wangyix Date: Thu, 12 Jun 2014 11:37:05 -0700 Subject: [PATCH 02/30] added code for AC to send nack packets; no locking yet --- .../octree/OctreeInboundPacketProcessor.cpp | 2 +- .../src/octree/OctreeInboundPacketProcessor.h | 2 + .../src/octree/OctreeSendThread.cpp | 70 +++++++++++++++++++ .../src/octree/OctreeSendThread.h | 2 + assignment-client/src/octree/OctreeServer.h | 2 + libraries/networking/src/PacketHeaders.h | 3 +- 6 files changed, 79 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 52407f78d5..c1c564b58c 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -212,4 +212,4 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, _totalLockWaitTime += lockWaitTime; _totalElementsInPacket += editsInPacket; _totalPackets++; -} \ No newline at end of file +} diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 82471bcddf..3e3f5f2dcb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -35,6 +35,7 @@ public: void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime); + const QSet& getMissingSequenceNumbers() const { return _missingSequenceNumbers; } quint64 _totalTransitTime; @@ -71,6 +72,7 @@ public: void resetStats(); NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } + const NodeToSenderStatsMap& getSingleSenderStats() const { return _singleSenderStats; } protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index cb149b1d96..6b1119fca1 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -85,6 +85,7 @@ bool OctreeSendThread::process() { if (nodeData && !nodeData->isShuttingDown()) { bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); packetDistributor(nodeData, viewFrustumChanged); + sendNack(nodeData); } } } @@ -111,6 +112,75 @@ bool OctreeSendThread::process() { return isStillRunning(); // keep running till they terminate us } + +int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { + + // if we're shutting down, then exit early + if (nodeData->isShuttingDown()) { + return 0; + } + + const OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. + if (myServerPacketProcessor->hasPacketsToProcessFrom(_node)) { + return 0; + } + + // lock unlock required ????????? prolly: _singleSenderStats may have our node's entry deleted during this or something + // maybe just make a copy instead!! + + // lock + + const QSet missingSequenceNumbersFromNode = myServerPacketProcessor + ->getSingleSenderStats().at(_node->getUUID()).getMissingSequenceNumbers(); + + // check if there are any sequence numbers that need to be nacked + int numSequenceNumbersAvailable = missingSequenceNumbersFromNode.size(); + if (numSequenceNumbersAvailable == 0) { + //unlock + return 0; + } + + // construct nack packet + + char packet[MAX_PACKET_SIZE]; + + char* dataAt = packet; + int bytesRemaining = MAX_PACKET_SIZE; + + // pack header + int numBytesPacketHeader = populatePacketHeader(packet, PacketTypeOctreeEditNack); + dataAt += numBytesPacketHeader; + bytesRemaining -= numBytesPacketHeader; + int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int); + + // calculate and pack the number of sequence numbers + uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor); + uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt; + *numSequenceNumbersAt = numSequenceNumbers; + dataAt += sizeof(uint16_t); + + // pack sequence numbers + QSet::const_iterator begin = missingSequenceNumbersFromNode.begin(), end = missingSequenceNumbersFromNode.end(); + for (QSet::const_iterator i = begin; i != end; i++) { + unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; + *sequenceNumberAt = *i; + dataAt += sizeof(unsigned short int); + } + + // send it + OctreeServer::didCallWriteDatagram(this); + NodeList::getInstance()->writeDatagram((char*)packet, dataAt - packet, _node); + + return 1; +} + + + + + quint64 OctreeSendThread::_usleepTime = 0; quint64 OctreeSendThread::_usleepCalls = 0; diff --git a/assignment-client/src/octree/OctreeSendThread.h b/assignment-client/src/octree/OctreeSendThread.h index d8eed27802..041b44577c 100644 --- a/assignment-client/src/octree/OctreeSendThread.h +++ b/assignment-client/src/octree/OctreeSendThread.h @@ -50,6 +50,8 @@ private: int handlePacketSend(OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent); int packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged); + + int sendNack(OctreeQueryNode* nodeData); OctreePacketData _packetData; diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 5595d139be..39536c0981 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -51,6 +51,8 @@ public: int getPacketsPerClientPerSecond() const { return getPacketsPerClientPerInterval() * INTERVALS_PER_SECOND; } int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; } int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; } + + const OctreeInboundPacketProcessor* getInboundPacketProcessor() const { return _octreeInboundPacketProcessor; } static int getCurrentClientCount() { return _clientCount; } static void clientConnected() { _clientCount++; } diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 8ac5333d10..d7c6bdead6 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,7 +66,8 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, - PacketTypeOctreeDataNack + PacketTypeOctreeDataNack, + PacketTypeOctreeEditNack, }; typedef char PacketVersion; From c542da97076ba0d27f4e191ae5d8249dbf8afe7e Mon Sep 17 00:00:00 2001 From: wangyix Date: Thu, 12 Jun 2014 12:03:22 -0700 Subject: [PATCH 03/30] added locking on _singleSenderStats; untested! --- .../src/octree/OctreeInboundPacketProcessor.cpp | 8 +++++++- .../src/octree/OctreeInboundPacketProcessor.h | 3 +++ assignment-client/src/octree/OctreeSendThread.cpp | 13 ++++++------- assignment-client/src/octree/OctreeServer.h | 2 +- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index c1c564b58c..1912103bf0 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -25,7 +25,9 @@ OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServe _totalProcessTime(0), _totalLockWaitTime(0), _totalElementsInPacket(0), - _totalPackets(0) + _totalPackets(0), + _singleSenderStats(), + _singleSenderStatsLock() { } @@ -36,7 +38,9 @@ void OctreeInboundPacketProcessor::resetStats() { _totalElementsInPacket = 0; _totalPackets = 0; + _singleSenderStatsLock.lockForWrite(); _singleSenderStats.clear(); + _singleSenderStatsLock.unlock(); } @@ -143,7 +147,9 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { SingleSenderStats stats; stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); + _singleSenderStatsLock.lockForWrite(); _singleSenderStats[nodeUUID] = stats; + _singleSenderStatsLock.unlock(); } else { SingleSenderStats& stats = _singleSenderStats[nodeUUID]; stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 3e3f5f2dcb..c9760dcc5c 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -71,6 +71,8 @@ public: void resetStats(); + void lockSingleSenderStatsForRead() { _singleSenderStatsLock.lockForRead(); } + void unlockSingleSenderStats() { _singleSenderStatsLock.unlock(); } NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } const NodeToSenderStatsMap& getSingleSenderStats() const { return _singleSenderStats; } @@ -91,5 +93,6 @@ private: quint64 _totalPackets; NodeToSenderStatsMap _singleSenderStats; + QReadWriteLock _singleSenderStatsLock; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index 6b1119fca1..51696e35ea 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -120,7 +120,7 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { return 0; } - const OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); + OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); // if there are packets from _node that are waiting to be processed, // don't send a NACK since the missing packets may be among those waiting packets. @@ -128,18 +128,15 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { return 0; } - // lock unlock required ????????? prolly: _singleSenderStats may have our node's entry deleted during this or something - // maybe just make a copy instead!! + myServerPacketProcessor->lockSingleSenderStatsForRead(); - // lock - - const QSet missingSequenceNumbersFromNode = myServerPacketProcessor + const QSet& missingSequenceNumbersFromNode = myServerPacketProcessor ->getSingleSenderStats().at(_node->getUUID()).getMissingSequenceNumbers(); // check if there are any sequence numbers that need to be nacked int numSequenceNumbersAvailable = missingSequenceNumbersFromNode.size(); if (numSequenceNumbersAvailable == 0) { - //unlock + myServerPacketProcessor->unlockSingleSenderStats(); return 0; } @@ -170,6 +167,8 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { dataAt += sizeof(unsigned short int); } + myServerPacketProcessor->unlockSingleSenderStats(); + // send it OctreeServer::didCallWriteDatagram(this); NodeList::getInstance()->writeDatagram((char*)packet, dataAt - packet, _node); diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 39536c0981..22b2da0682 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -52,7 +52,7 @@ public: int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; } int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; } - const OctreeInboundPacketProcessor* getInboundPacketProcessor() const { return _octreeInboundPacketProcessor; } + OctreeInboundPacketProcessor* getInboundPacketProcessor() { return _octreeInboundPacketProcessor; } static int getCurrentClientCount() { return _clientCount; } static void clientConnected() { _clientCount++; } From 1491216962d21fb6d273156af2a9ee21793b2592 Mon Sep 17 00:00:00 2001 From: wangyix Date: Fri, 13 Jun 2014 09:58:45 -0700 Subject: [PATCH 04/30] Revert "added locking on _singleSenderStats; untested!" This reverts commit c542da97076ba0d27f4e191ae5d8249dbf8afe7e. --- .../src/octree/OctreeInboundPacketProcessor.cpp | 8 +------- .../src/octree/OctreeInboundPacketProcessor.h | 3 --- assignment-client/src/octree/OctreeSendThread.cpp | 13 +++++++------ assignment-client/src/octree/OctreeServer.h | 2 +- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 1912103bf0..c1c564b58c 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -25,9 +25,7 @@ OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServe _totalProcessTime(0), _totalLockWaitTime(0), _totalElementsInPacket(0), - _totalPackets(0), - _singleSenderStats(), - _singleSenderStatsLock() + _totalPackets(0) { } @@ -38,9 +36,7 @@ void OctreeInboundPacketProcessor::resetStats() { _totalElementsInPacket = 0; _totalPackets = 0; - _singleSenderStatsLock.lockForWrite(); _singleSenderStats.clear(); - _singleSenderStatsLock.unlock(); } @@ -147,9 +143,7 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { SingleSenderStats stats; stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); - _singleSenderStatsLock.lockForWrite(); _singleSenderStats[nodeUUID] = stats; - _singleSenderStatsLock.unlock(); } else { SingleSenderStats& stats = _singleSenderStats[nodeUUID]; stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index c9760dcc5c..3e3f5f2dcb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -71,8 +71,6 @@ public: void resetStats(); - void lockSingleSenderStatsForRead() { _singleSenderStatsLock.lockForRead(); } - void unlockSingleSenderStats() { _singleSenderStatsLock.unlock(); } NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } const NodeToSenderStatsMap& getSingleSenderStats() const { return _singleSenderStats; } @@ -93,6 +91,5 @@ private: quint64 _totalPackets; NodeToSenderStatsMap _singleSenderStats; - QReadWriteLock _singleSenderStatsLock; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index 51696e35ea..6b1119fca1 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -120,7 +120,7 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { return 0; } - OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); + const OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); // if there are packets from _node that are waiting to be processed, // don't send a NACK since the missing packets may be among those waiting packets. @@ -128,15 +128,18 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { return 0; } - myServerPacketProcessor->lockSingleSenderStatsForRead(); + // lock unlock required ????????? prolly: _singleSenderStats may have our node's entry deleted during this or something + // maybe just make a copy instead!! - const QSet& missingSequenceNumbersFromNode = myServerPacketProcessor + // lock + + const QSet missingSequenceNumbersFromNode = myServerPacketProcessor ->getSingleSenderStats().at(_node->getUUID()).getMissingSequenceNumbers(); // check if there are any sequence numbers that need to be nacked int numSequenceNumbersAvailable = missingSequenceNumbersFromNode.size(); if (numSequenceNumbersAvailable == 0) { - myServerPacketProcessor->unlockSingleSenderStats(); + //unlock return 0; } @@ -167,8 +170,6 @@ int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { dataAt += sizeof(unsigned short int); } - myServerPacketProcessor->unlockSingleSenderStats(); - // send it OctreeServer::didCallWriteDatagram(this); NodeList::getInstance()->writeDatagram((char*)packet, dataAt - packet, _node); diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 22b2da0682..39536c0981 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -52,7 +52,7 @@ public: int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; } int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; } - OctreeInboundPacketProcessor* getInboundPacketProcessor() { return _octreeInboundPacketProcessor; } + const OctreeInboundPacketProcessor* getInboundPacketProcessor() const { return _octreeInboundPacketProcessor; } static int getCurrentClientCount() { return _clientCount; } static void clientConnected() { _clientCount++; } From 3d4fae4b3f611bbc4ce98dade9c1845a92b55779 Mon Sep 17 00:00:00 2001 From: wangyix Date: Fri, 13 Jun 2014 09:58:56 -0700 Subject: [PATCH 05/30] Revert "added code for AC to send nack packets; no locking yet" This reverts commit 15bd0878c407ede95f724009adadc81deeb7b4fd. --- .../octree/OctreeInboundPacketProcessor.cpp | 2 +- .../src/octree/OctreeInboundPacketProcessor.h | 2 - .../src/octree/OctreeSendThread.cpp | 70 ------------------- .../src/octree/OctreeSendThread.h | 2 - assignment-client/src/octree/OctreeServer.h | 2 - libraries/networking/src/PacketHeaders.h | 3 +- 6 files changed, 2 insertions(+), 79 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index c1c564b58c..52407f78d5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -212,4 +212,4 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, _totalLockWaitTime += lockWaitTime; _totalElementsInPacket += editsInPacket; _totalPackets++; -} +} \ No newline at end of file diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 3e3f5f2dcb..82471bcddf 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -35,7 +35,6 @@ public: void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime); - const QSet& getMissingSequenceNumbers() const { return _missingSequenceNumbers; } quint64 _totalTransitTime; @@ -72,7 +71,6 @@ public: void resetStats(); NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } - const NodeToSenderStatsMap& getSingleSenderStats() const { return _singleSenderStats; } protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); diff --git a/assignment-client/src/octree/OctreeSendThread.cpp b/assignment-client/src/octree/OctreeSendThread.cpp index 6b1119fca1..cb149b1d96 100644 --- a/assignment-client/src/octree/OctreeSendThread.cpp +++ b/assignment-client/src/octree/OctreeSendThread.cpp @@ -85,7 +85,6 @@ bool OctreeSendThread::process() { if (nodeData && !nodeData->isShuttingDown()) { bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); packetDistributor(nodeData, viewFrustumChanged); - sendNack(nodeData); } } } @@ -112,75 +111,6 @@ bool OctreeSendThread::process() { return isStillRunning(); // keep running till they terminate us } - -int OctreeSendThread::sendNack(OctreeQueryNode* nodeData) { - - // if we're shutting down, then exit early - if (nodeData->isShuttingDown()) { - return 0; - } - - const OctreeInboundPacketProcessor* myServerPacketProcessor = _myServer->getInboundPacketProcessor(); - - // if there are packets from _node that are waiting to be processed, - // don't send a NACK since the missing packets may be among those waiting packets. - if (myServerPacketProcessor->hasPacketsToProcessFrom(_node)) { - return 0; - } - - // lock unlock required ????????? prolly: _singleSenderStats may have our node's entry deleted during this or something - // maybe just make a copy instead!! - - // lock - - const QSet missingSequenceNumbersFromNode = myServerPacketProcessor - ->getSingleSenderStats().at(_node->getUUID()).getMissingSequenceNumbers(); - - // check if there are any sequence numbers that need to be nacked - int numSequenceNumbersAvailable = missingSequenceNumbersFromNode.size(); - if (numSequenceNumbersAvailable == 0) { - //unlock - return 0; - } - - // construct nack packet - - char packet[MAX_PACKET_SIZE]; - - char* dataAt = packet; - int bytesRemaining = MAX_PACKET_SIZE; - - // pack header - int numBytesPacketHeader = populatePacketHeader(packet, PacketTypeOctreeEditNack); - dataAt += numBytesPacketHeader; - bytesRemaining -= numBytesPacketHeader; - int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int); - - // calculate and pack the number of sequence numbers - uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor); - uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt; - *numSequenceNumbersAt = numSequenceNumbers; - dataAt += sizeof(uint16_t); - - // pack sequence numbers - QSet::const_iterator begin = missingSequenceNumbersFromNode.begin(), end = missingSequenceNumbersFromNode.end(); - for (QSet::const_iterator i = begin; i != end; i++) { - unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; - *sequenceNumberAt = *i; - dataAt += sizeof(unsigned short int); - } - - // send it - OctreeServer::didCallWriteDatagram(this); - NodeList::getInstance()->writeDatagram((char*)packet, dataAt - packet, _node); - - return 1; -} - - - - - quint64 OctreeSendThread::_usleepTime = 0; quint64 OctreeSendThread::_usleepCalls = 0; diff --git a/assignment-client/src/octree/OctreeSendThread.h b/assignment-client/src/octree/OctreeSendThread.h index 041b44577c..d8eed27802 100644 --- a/assignment-client/src/octree/OctreeSendThread.h +++ b/assignment-client/src/octree/OctreeSendThread.h @@ -50,8 +50,6 @@ private: int handlePacketSend(OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent); int packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged); - - int sendNack(OctreeQueryNode* nodeData); OctreePacketData _packetData; diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 39536c0981..5595d139be 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -51,8 +51,6 @@ public: int getPacketsPerClientPerSecond() const { return getPacketsPerClientPerInterval() * INTERVALS_PER_SECOND; } int getPacketsTotalPerInterval() const { return _packetsTotalPerInterval; } int getPacketsTotalPerSecond() const { return getPacketsTotalPerInterval() * INTERVALS_PER_SECOND; } - - const OctreeInboundPacketProcessor* getInboundPacketProcessor() const { return _octreeInboundPacketProcessor; } static int getCurrentClientCount() { return _clientCount; } static void clientConnected() { _clientCount++; } diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index d7c6bdead6..8ac5333d10 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,8 +66,7 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, - PacketTypeOctreeDataNack, - PacketTypeOctreeEditNack, + PacketTypeOctreeDataNack }; typedef char PacketVersion; From b210b07b81c8a12ad243e7af7ed9f65080115ef3 Mon Sep 17 00:00:00 2001 From: wangyix Date: Fri, 13 Jun 2014 11:08:15 -0700 Subject: [PATCH 06/30] rollovers are now handled in SingleSenderStats::trackInboundPacket --- .../octree/OctreeInboundPacketProcessor.cpp | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 52407f78d5..931814d4f8 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -156,6 +156,7 @@ SingleSenderStats::SingleSenderStats() _totalLockWaitTime(0), _totalElementsInPacket(0), _totalPackets(0), + _incomingLastSequence(0), _missingSequenceNumbers() { @@ -168,23 +169,42 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, const int MAX_MISSING_SEQUENCE_SIZE = 100; unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; - + if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; } - else { - // ignore packet if sequence number gap is unreasonable - if (std::abs(incomingSequence - expectedSequence) > MAX_REASONABLE_SEQUENCE_GAP) { + else { // out of order + + const int UINT16_RANGE = 65536; + + int incoming = (int)incomingSequence; + int expected = (int)expectedSequence; + + // check if the gap between incoming and expected is reasonable, taking possible rollover into consideration + int absGap = std::abs(incoming - expected); + if (absGap >= UINT16_RANGE - MAX_REASONABLE_SEQUENCE_GAP) { + // rollover likely occurred between incoming and expected. + // correct the larger of the two so that it's within [-65536, -1] while the other remains within [0, 65535] + if (incoming > expected) { + incoming -= UINT16_RANGE; + } else { + expected -= UINT16_RANGE; + } + } else if (absGap > MAX_REASONABLE_SEQUENCE_GAP) { + // ignore packet if gap is unreasonable qDebug() << "ignoring unreasonable packet... sequence:" << incomingSequence << "_incomingLastSequence:" << _incomingLastSequence; return; } + + // now that rollover has been corrected for (if it occurred), incoming and expected can be + // compared to each other directly, though one of them might be negative - if (incomingSequence > expectedSequence) { // early + if (incoming > expected) { // early // add all sequence numbers that were skipped to the missing sequence numbers list - for (int missingSequence = expectedSequence; missingSequence < incomingSequence; missingSequence++) { - _missingSequenceNumbers.insert(missingSequence); + for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { + _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); } _incomingLastSequence = incomingSequence; From 2b20720f51578f365685845b1f835895b50535de Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 09:57:05 -0700 Subject: [PATCH 07/30] added sendNackPackets() to OctreeInboundPacketProcessor added rollover handling in _missingSequenceNumbers pruning; added EditNack packet types; added getMyEditNackType() to OctreeServer subclasses; added code to randomly skip edit packet sequence numbers for testing in OctreeEditPacketSender --- assignment-client/src/models/ModelServer.h | 1 + .../octree/OctreeInboundPacketProcessor.cpp | 111 ++++++++++++++++-- .../src/octree/OctreeInboundPacketProcessor.h | 11 +- assignment-client/src/octree/OctreeServer.cpp | 6 +- assignment-client/src/octree/OctreeServer.h | 1 + .../src/particles/ParticleServer.h | 1 + assignment-client/src/voxels/VoxelServer.h | 1 + libraries/networking/src/PacketHeaders.h | 5 +- .../networking/src/ReceivedPacketProcessor.h | 6 +- .../octree/src/OctreeEditPacketSender.cpp | 7 ++ libraries/shared/src/GenericThread.h | 2 +- 11 files changed, 133 insertions(+), 19 deletions(-) diff --git a/assignment-client/src/models/ModelServer.h b/assignment-client/src/models/ModelServer.h index 38acc7f1e1..8a26f07f11 100644 --- a/assignment-client/src/models/ModelServer.h +++ b/assignment-client/src/models/ModelServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return MODEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return MODEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_MODELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeModelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 931814d4f8..683fe188b4 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -39,7 +39,6 @@ void OctreeInboundPacketProcessor::resetStats() { _singleSenderStats.clear(); } - void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { bool debugProcessPacket = _myServer->wantsVerboseDebug(); @@ -150,6 +149,76 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns } } +int OctreeInboundPacketProcessor::sendNackPackets() { + + printf("\t\t sendNackPackets()\n"); + + int packetsSent = 0; + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. + + NodeToSenderStatsMap::const_iterator begin = _singleSenderStats.begin(), end = _singleSenderStats.end(); + for (NodeToSenderStatsMap::const_iterator i = begin; i != end; i++) { + + QUuid nodeUUID = i.key(); + SingleSenderStats nodeStats = i.value(); + + if (hasPacketsToProcessFrom(nodeUUID)) { + continue; + } + + const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID); + const QSet& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers(); + + // check if there are any sequence numbers that need to be nacked + int numSequenceNumbersAvailable = missingSequenceNumbers.size(); + + // construct nack packet(s) for this node + + QSet::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.begin(); + char packet[MAX_PACKET_SIZE]; + + while (numSequenceNumbersAvailable > 0) { + + char* dataAt = packet; + int bytesRemaining = MAX_PACKET_SIZE; + + // pack header + int numBytesPacketHeader = populatePacketHeader(packet, _myServer->getMyEditNackType()); + dataAt += numBytesPacketHeader; + bytesRemaining -= numBytesPacketHeader; + + // calculate and pack the number of sequence numbers to nack + int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int); + uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor); + uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt; + *numSequenceNumbersAt = numSequenceNumbers; + dataAt += sizeof(uint16_t); + + // pack sequence numbers to nack + printf("\t\t sending NACK with %d seq numbers:\n\t\t", numSequenceNumbers); + for (uint16_t i = 0; i < numSequenceNumbers; i++) { + unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; + *sequenceNumberAt = *missingSequenceNumberIterator; + dataAt += sizeof(unsigned short int); + printf("%d, ", *missingSequenceNumberIterator); + + missingSequenceNumberIterator++; + } + numSequenceNumbersAvailable -= numSequenceNumbers; + + // send it + qint64 bytesWritten = NodeList::getInstance()->writeDatagram(packet, dataAt - packet, destinationNode); + printf("\t\t wrote %lld bytes\n\n", bytesWritten); + + packetsSent++; + } + } + return packetsSent; +} + + SingleSenderStats::SingleSenderStats() : _totalTransitTime(0), _totalProcessTime(0), @@ -165,17 +234,19 @@ SingleSenderStats::SingleSenderStats() void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { - const int MAX_REASONABLE_SEQUENCE_GAP = 1000; +printf("\t\t tracked seq %d\n", incomingSequence); + + const int UINT16_RANGE = 65536; + + const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; - if (incomingSequence == expectedSequence) { // on time + if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; } - else { // out of order - - const int UINT16_RANGE = 65536; + else { // out of order int incoming = (int)incomingSequence; int expected = (int)expectedSequence; @@ -202,6 +273,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, if (incoming > expected) { // early + printf("\t\t\t packet is early! %d packets were skipped\n", incoming - expected); + // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); @@ -210,6 +283,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, } else { // late + printf("\t\t\t packet is late!\n"); + // remove this from missing sequence number if it's in there _missingSequenceNumbers.remove(incomingSequence); @@ -217,11 +292,27 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, } } - // prune missing sequence list if it gets too big + // prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP + // will be removed. if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { - foreach(unsigned short int missingSequence, _missingSequenceNumbers) { - if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) { - _missingSequenceNumbers.remove(missingSequence); + + // the acceptable range of older sequence numbers may contain a rollover point; this must be handled. + // some sequence number in this list may be larger than _incomingLastSequence, indicating that they were received + // before the most recent rollover. + int cutoff = (int)_incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP; + if (cutoff >= 0) { + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + unsigned short int nonRolloverCutoff = (unsigned short int)cutoff; + if (missingSequence > _incomingLastSequence || missingSequence <= nonRolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } else { + unsigned short int rolloverCutoff = (unsigned short int)(cutoff + UINT16_RANGE); + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + if (missingSequence > _incomingLastSequence && missingSequence <= rolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } } } } diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 82471bcddf..9649da2d61 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -32,6 +32,7 @@ public: { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } quint64 getAverageLockWaitTimePerElement() const { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } + const QSet& getMissingSequenceNumbers() const { return _missingSequenceNumbers; } void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime); @@ -47,8 +48,9 @@ public: QSet _missingSequenceNumbers; }; -typedef std::map NodeToSenderStatsMap; -typedef std::map::iterator NodeToSenderStatsMapIterator; +typedef QHash NodeToSenderStatsMap; +typedef QHash::iterator NodeToSenderStatsMapIterator; +typedef QHash::const_iterator NodeToSenderStatsMapConstIterator; /// Handles processing of incoming network packets for the voxel-server. As with other ReceivedPacketProcessor classes @@ -75,6 +77,9 @@ public: protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); +public slots: + int sendNackPackets(); + private: void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); @@ -87,7 +92,7 @@ private: quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; - + NodeToSenderStatsMap _singleSenderStats; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index cb60f0816e..39f51a0ba9 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -648,10 +648,10 @@ bool OctreeServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url int senderNumber = 0; NodeToSenderStatsMap& allSenderStats = _octreeInboundPacketProcessor->getSingleSenderStats(); - for (NodeToSenderStatsMapIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { + for (NodeToSenderStatsMapConstIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { senderNumber++; - QUuid senderID = i->first; - SingleSenderStats& senderStats = i->second; + QUuid senderID = i.key(); + const SingleSenderStats& senderStats = i.value(); statsString += QString("\r\n Stats for sender %1 uuid: %2\r\n") .arg(senderNumber).arg(senderID.toString()); diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 5595d139be..76b39c5771 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -68,6 +68,7 @@ public: virtual const char* getMyServerName() const = 0; virtual const char* getMyLoggingServerTargetName() const = 0; virtual const char* getMyDefaultPersistFilename() const = 0; + virtual PacketType getMyEditNackType() const = 0; // subclass may implement these method virtual void beforeRun() { }; diff --git a/assignment-client/src/particles/ParticleServer.h b/assignment-client/src/particles/ParticleServer.h index d444368a9d..0b379a903e 100644 --- a/assignment-client/src/particles/ParticleServer.h +++ b/assignment-client/src/particles/ParticleServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return PARTICLE_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return PARTICLE_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_PARTICLES_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeParticleEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/voxels/VoxelServer.h b/assignment-client/src/voxels/VoxelServer.h index b13f83b65f..fadcca2d19 100644 --- a/assignment-client/src/voxels/VoxelServer.h +++ b/assignment-client/src/voxels/VoxelServer.h @@ -42,6 +42,7 @@ public: virtual const char* getMyServerName() const { return VOXEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return VOXEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_VOXELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeVoxelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 8ac5333d10..5ed4110627 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,7 +66,10 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, - PacketTypeOctreeDataNack + PacketTypeOctreeDataNack, // 45 + PacketTypeVoxelEditNack, + PacketTypeParticleEditNack, + PacketTypeModelEditNack, }; typedef char PacketVersion; diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 94ad2d9c41..31cbc3a487 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -35,7 +35,11 @@ public: /// Are there received packets waiting to be processed from a certain node bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { - return _nodePacketCounts[sendingNode->getUUID()] > 0; + return hasPacketsToProcessFrom(sendingNode->getUUID()); + } + + bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const { + return _nodePacketCounts[nodeUUID] > 0; } /// How many received packets waiting are to be processed diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 898c41de08..da1438136b 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -308,7 +308,14 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize]; *sequenceAt = _sequenceNumber; packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence + if (randFloat() < 0.6f) _sequenceNumber++; + else + { + int x = randIntInRange(2, 4); + printf("\t\t seq number jumped from %d to %d\n", _sequenceNumber, _sequenceNumber + x); + _sequenceNumber += x; + } // pack in timestamp quint64 now = usecTimestampNow(); diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index bbb01894ed..b2c0eb13db 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -57,7 +57,7 @@ protected: bool isStillRunning() const { return !_stopThread; } -private: +protected: QMutex _mutex; bool _stopThread; From ebfd65dea8cb86310f8c8c103b19c3217fac0e70 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 12:30:16 -0700 Subject: [PATCH 08/30] added OctreeInboundPacketProcessor::process() override to send nack periodically added code to remove dead nodes' stats in sendNackPackets() --- .../octree/OctreeInboundPacketProcessor.cpp | 62 +++++++++++++++++-- .../src/octree/OctreeInboundPacketProcessor.h | 7 ++- assignment-client/src/octree/OctreeServer.cpp | 3 + .../src/ReceivedPacketProcessor.cpp | 1 + .../networking/src/ReceivedPacketProcessor.h | 10 ++- 5 files changed, 74 insertions(+), 9 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 683fe188b4..9b849905c5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -25,7 +25,8 @@ OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServe _totalProcessTime(0), _totalLockWaitTime(0), _totalElementsInPacket(0), - _totalPackets(0) + _totalPackets(0), + _lastNackTime(usecTimestampNow()) { } @@ -35,10 +36,52 @@ void OctreeInboundPacketProcessor::resetStats() { _totalLockWaitTime = 0; _totalElementsInPacket = 0; _totalPackets = 0; + _lastNackTime = usecTimestampNow(); _singleSenderStats.clear(); } +bool OctreeInboundPacketProcessor::process() { + + const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; + quint64 now = usecTimestampNow(); + + if (_packets.size() == 0) { + // calculate time until next sendNackPackets() + quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; + quint64 now = usecTimestampNow(); + if (now >= nextNackTime) { + // send nacks if we're already past time to send it + _lastNackTime = now; + sendNackPackets(); + } + else { + // otherwise, wait until the next nack time or until a packet arrives + quint64 waitTimeMsecs = (nextNackTime - now) / USECS_PER_MSEC + 1; + _waitingOnPacketsMutex.lock(); + _hasPackets.wait(&_waitingOnPacketsMutex, waitTimeMsecs); + _waitingOnPacketsMutex.unlock(); + } + } + while (_packets.size() > 0) { + lock(); // lock to make sure nothing changes on us + NetworkPacket& packet = _packets.front(); // get the oldest packet + NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us + _packets.erase(_packets.begin()); // remove the oldest packet + _nodePacketCounts[temporary.getNode()->getUUID()]--; + unlock(); // let others add to the packets + processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy + + // if it's time to send nacks, send them. + if (usecTimestampNow() - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } + } + return isStillRunning(); // keep running till they terminate us +} + + void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { bool debugProcessPacket = _myServer->wantsVerboseDebug(); @@ -155,15 +198,21 @@ int OctreeInboundPacketProcessor::sendNackPackets() { int packetsSent = 0; - // if there are packets from _node that are waiting to be processed, - // don't send a NACK since the missing packets may be among those waiting packets. - - NodeToSenderStatsMap::const_iterator begin = _singleSenderStats.begin(), end = _singleSenderStats.end(); - for (NodeToSenderStatsMap::const_iterator i = begin; i != end; i++) { + NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); + while (i != _singleSenderStats.end()) { QUuid nodeUUID = i.key(); SingleSenderStats nodeStats = i.value(); + // check if this node is still alive. Remove its stats if it's dead. + if (!isAlive(nodeUUID)) { + printf("\t\t removing node %s\n", nodeUUID.toString().toLatin1().data()); + i = _singleSenderStats.erase(i); + continue; + } + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. if (hasPacketsToProcessFrom(nodeUUID)) { continue; } @@ -214,6 +263,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() { packetsSent++; } + i++; } return packetsSent; } diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 9649da2d61..378cc9a891 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -75,9 +75,12 @@ public: NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } protected: + virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); -public slots: + virtual bool process(); + +public: int sendNackPackets(); private: @@ -94,5 +97,7 @@ private: quint64 _totalPackets; NodeToSenderStatsMap _singleSenderStats; + + quint64 _lastNackTime; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index 39f51a0ba9..c6ef4aa5aa 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -1060,6 +1060,9 @@ void OctreeServer::nodeAdded(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) { quint64 start = usecTimestampNow(); + // calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!! + _octreeInboundPacketProcessor->nodeKilled(node); + qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node; OctreeQueryNode* nodeData = static_cast(node->getLinkedData()); if (nodeData) { diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index 3ef518bbc2..d85f09fb0a 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -54,4 +54,5 @@ void ReceivedPacketProcessor::nodeKilled(SharedNodePointer node) { lock(); _nodePacketCounts.remove(node->getUUID()); unlock(); + printf("\n\t\t nodeKilled()!!!!! --------------------------\n\n"); } diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 31cbc3a487..4322c87910 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -33,11 +33,17 @@ public: /// Are there received packets waiting to be processed bool hasPacketsToProcess() const { return _packets.size() > 0; } - /// Are there received packets waiting to be processed from a certain node + /// Is a specified node still alive? + bool isAlive(const QUuid& nodeUUID) const { + return _nodePacketCounts.contains(nodeUUID); + } + + /// Are there received packets waiting to be processed from a specified node bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { return hasPacketsToProcessFrom(sendingNode->getUUID()); } + /// Are there received packets waiting to be processed from a specified node bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const { return _nodePacketCounts[nodeUUID] > 0; } @@ -59,7 +65,7 @@ protected: virtual void terminating(); -private: +protected: QVector _packets; QHash _nodePacketCounts; From 0f7ce694c00dc2bd9da28eddcaf172c8970ac99d Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 13:52:01 -0700 Subject: [PATCH 09/30] minor changes before moving SentPacketHistory --- libraries/octree/src/OctreeEditPacketSender.cpp | 6 ++++-- libraries/octree/src/OctreeEditPacketSender.h | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index da1438136b..fb0059b96d 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -99,6 +99,8 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c if (node->getActiveSocket()) { queuePacketForSending(node, QByteArray(reinterpret_cast(buffer), length)); + + // debugging output... bool wantDebugging = false; if (wantDebugging) { @@ -287,8 +289,8 @@ void OctreeEditPacketSender::releaseQueuedMessages() { if (!serversExist()) { _releaseQueuedMessagesPending = true; } else { - for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - releaseQueuedPacket(i->second); + for (QHash::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + releaseQueuedPacket(i.value()); } } } diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 0dc628c433..167e43b200 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -101,7 +101,7 @@ protected: void processPreServerExistsPackets(); // These are packets which are destined from know servers but haven't been released because they're still too small - std::map _pendingEditPackets; + QHash _pendingEditPackets; // These are packets that are waiting to be processed because we don't yet know if there are servers int _maxPendingMessages; From 69c2a2d12b4823e30cb26b2ce8902856e3350d9c Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 14:04:21 -0700 Subject: [PATCH 10/30] moved SentPacketHistory to libraries/networking/src --- .../networking/src/SentPacketHistory.cpp | 44 +++++++++++++++++++ libraries/networking/src/SentPacketHistory.h | 35 +++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 libraries/networking/src/SentPacketHistory.cpp create mode 100644 libraries/networking/src/SentPacketHistory.h diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp new file mode 100644 index 0000000000..0ea7fd8b69 --- /dev/null +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -0,0 +1,44 @@ +// +// SentPacketHistory.cpp +// assignement-client/src/octree +// +// Created by Yixin Wang on 6/5/2014 +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "SentPacketHistory.h" + +SentPacketHistory::SentPacketHistory(int size) + : _sentPackets(size), + _newestPacketAt(0), + _numExistingPackets(0), + _newestSequenceNumber(0) +{ +} + +void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) { + _newestSequenceNumber = sequenceNumber; + + // increment _newestPacketAt cyclically, insert new packet there. + // this will overwrite the oldest packet in the buffer + _newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1; + _sentPackets[_newestPacketAt] = packet; + if (_numExistingPackets < _sentPackets.size()) { + _numExistingPackets++; + } +} + + +const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const { + OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber; + if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) { + return NULL; + } + int packetAt = _newestPacketAt - seqDiff; + if (packetAt < 0) { + packetAt += _sentPackets.size(); + } + return &_sentPackets.at(packetAt); +} diff --git a/libraries/networking/src/SentPacketHistory.h b/libraries/networking/src/SentPacketHistory.h new file mode 100644 index 0000000000..4231400ac1 --- /dev/null +++ b/libraries/networking/src/SentPacketHistory.h @@ -0,0 +1,35 @@ +// +// SentPacketHistory.h +// assignement-client/src/octree +// +// Created by Yixin Wang on 6/5/2014 +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_SentPacketHistory_h +#define hifi_SentPacketHistory_h + +#include +#include + +#include "OctreePacketData.h" + +class SentPacketHistory { + +public: + SentPacketHistory(int size); + + void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet); + const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const; + +private: + QVector _sentPackets; // circular buffer + int _newestPacketAt; + int _numExistingPackets; + + OCTREE_PACKET_SEQUENCE _newestSequenceNumber; +}; + +#endif From 14f50f4576b65dbe0bd257c5a30c3ced4efb8ceb Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 14:05:27 -0700 Subject: [PATCH 11/30] removed old SentPacketHistory --- .../src/octree/SentPacketHistory.cpp | 44 ------------------- .../src/octree/SentPacketHistory.h | 35 --------------- 2 files changed, 79 deletions(-) delete mode 100644 assignment-client/src/octree/SentPacketHistory.cpp delete mode 100644 assignment-client/src/octree/SentPacketHistory.h diff --git a/assignment-client/src/octree/SentPacketHistory.cpp b/assignment-client/src/octree/SentPacketHistory.cpp deleted file mode 100644 index 0ea7fd8b69..0000000000 --- a/assignment-client/src/octree/SentPacketHistory.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// -// SentPacketHistory.cpp -// assignement-client/src/octree -// -// Created by Yixin Wang on 6/5/2014 -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include "SentPacketHistory.h" - -SentPacketHistory::SentPacketHistory(int size) - : _sentPackets(size), - _newestPacketAt(0), - _numExistingPackets(0), - _newestSequenceNumber(0) -{ -} - -void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) { - _newestSequenceNumber = sequenceNumber; - - // increment _newestPacketAt cyclically, insert new packet there. - // this will overwrite the oldest packet in the buffer - _newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1; - _sentPackets[_newestPacketAt] = packet; - if (_numExistingPackets < _sentPackets.size()) { - _numExistingPackets++; - } -} - - -const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const { - OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber; - if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) { - return NULL; - } - int packetAt = _newestPacketAt - seqDiff; - if (packetAt < 0) { - packetAt += _sentPackets.size(); - } - return &_sentPackets.at(packetAt); -} diff --git a/assignment-client/src/octree/SentPacketHistory.h b/assignment-client/src/octree/SentPacketHistory.h deleted file mode 100644 index 4231400ac1..0000000000 --- a/assignment-client/src/octree/SentPacketHistory.h +++ /dev/null @@ -1,35 +0,0 @@ -// -// SentPacketHistory.h -// assignement-client/src/octree -// -// Created by Yixin Wang on 6/5/2014 -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#ifndef hifi_SentPacketHistory_h -#define hifi_SentPacketHistory_h - -#include -#include - -#include "OctreePacketData.h" - -class SentPacketHistory { - -public: - SentPacketHistory(int size); - - void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet); - const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const; - -private: - QVector _sentPackets; // circular buffer - int _newestPacketAt; - int _numExistingPackets; - - OCTREE_PACKET_SEQUENCE _newestSequenceNumber; -}; - -#endif From ddfe98ad43a1e4a545807f5faeac65a4d1d6fd0b Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 15:32:39 -0700 Subject: [PATCH 12/30] added code to parse nack packets in OctreeEditPacketSender --- .../octree/src/OctreeEditPacketSender.cpp | 49 +++++++++++++++++-- libraries/octree/src/OctreeEditPacketSender.h | 21 ++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index fb0059b96d..69b58aaa41 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -17,6 +17,43 @@ #include #include "OctreeEditPacketSender.h" +void NackedPacketHistory::packetSent(const QByteArray& packet) { + // extract sequence number for the sent packet history + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const char* dataAt = reinterpret_cast(packet.data()); + unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader))); + + // add packet to history + _sentPacketHistory.packetSent(sequence, packet); +} + +bool NackedPacketHistory::hasNextNackedPacket() const { + return !_nackedSequenceNumbers.isEmpty(); +} + +const QByteArray* NackedPacketHistory::getNextNackedPacket() { + if (!_nackedSequenceNumbers.isEmpty()) { + // could return null if packet is not in the history + return _sentPacketHistory.getPacket(_nackedSequenceNumbers.dequeue()); + } + return NULL; +} + +void NackedPacketHistory::parseNackPacket(const QByteArray& packet) { + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; + + // read number of sequence numbers + uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); + dataAt += sizeof(uint16_t); + + // read sequence numbers + for (int i = 0; i < numSequenceNumbers; i++) { + unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); + _nackedSequenceNumbers.enqueue(sequenceNumber); + dataAt += sizeof(unsigned short int); + } +} EditPacketBuffer::EditPacketBuffer(PacketType type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) : _nodeUUID(nodeUUID), @@ -97,9 +134,9 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c if (node->getType() == getMyNodeType() && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { if (node->getActiveSocket()) { - queuePacketForSending(node, QByteArray(reinterpret_cast(buffer), length)); - - + QByteArray packet(reinterpret_cast(buffer), length); + queuePacketForSending(node, packet); + _nackedPacketHistories[nodeUUID].packetSent(packet); // debugging output... bool wantDebugging = false; @@ -338,3 +375,9 @@ bool OctreeEditPacketSender::process() { // base class does most of the work. return PacketSender::process(); } + +void OctreeEditPacketSender::parseNackPacket(const QByteArray& packet) { + // parse sending node from packet + QUuid sendingNodeUUID = uuidFromPacketHeader(packet); + _nackedPacketHistories[sendingNodeUUID].parseNackPacket(packet); +} diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 167e43b200..2586ca034f 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -12,9 +12,24 @@ #ifndef hifi_OctreeEditPacketSender_h #define hifi_OctreeEditPacketSender_h +#include #include #include #include "JurisdictionMap.h" +#include "SentPacketHistory.h" + +class NackedPacketHistory { +public: + NackedPacketHistory() : _sentPacketHistory(1000), _nackedSequenceNumbers() { } +public: + void packetSent(const QByteArray& packet); + bool hasNextNackedPacket() const; + const QByteArray* getNextNackedPacket(); + void parseNackPacket(const QByteArray& packet); +private: + SentPacketHistory _sentPacketHistory; + QQueue _nackedSequenceNumbers; +}; /// Used for construction of edit packets class EditPacketBuffer { @@ -90,6 +105,9 @@ public: virtual char getMyNodeType() const = 0; virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) { }; +public: + void parseNackPacket(const QByteArray& packet); + protected: bool _shouldSend; void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length); @@ -114,5 +132,8 @@ protected: unsigned short int _sequenceNumber; int _maxPacketSize; + + // TODO: garbage-collect this + QHash _nackedPacketHistories; }; #endif // hifi_OctreeEditPacketSender_h From 5e37704772405a9862fe5aec3329e938e9c5c055 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 15:59:38 -0700 Subject: [PATCH 13/30] SentPacketHistory now handles rollover updated SentPacketHistory path in comments --- .../networking/src/SentPacketHistory.cpp | 19 ++++++++++++++----- libraries/networking/src/SentPacketHistory.h | 11 +++++------ libraries/octree/src/OctreeEditPacketSender.h | 2 +- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index 0ea7fd8b69..37c8953861 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -1,6 +1,6 @@ // // SentPacketHistory.cpp -// assignement-client/src/octree +// libraries/networking/src // // Created by Yixin Wang on 6/5/2014 // @@ -18,7 +18,7 @@ SentPacketHistory::SentPacketHistory(int size) { } -void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) { +void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& packet) { _newestSequenceNumber = sequenceNumber; // increment _newestPacketAt cyclically, insert new packet there. @@ -31,9 +31,18 @@ void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const } -const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const { - OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber; - if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) { +const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const { + + const int UINT16_RANGE = UINT16_MAX + 1; + + // if sequenceNumber > _newestSequenceNumber, assume sequenceNumber is from before the most recent rollover + // correct the diff so that it correctly represents how far back in the history sequenceNumber is + int seqDiff = (int)_newestSequenceNumber - (int)sequenceNumber; + if (seqDiff < 0) { + seqDiff += UINT16_RANGE; + } + // if desired sequence number is too old to be found in the history, return null + if (seqDiff >= _numExistingPackets) { return NULL; } int packetAt = _newestPacketAt - seqDiff; diff --git a/libraries/networking/src/SentPacketHistory.h b/libraries/networking/src/SentPacketHistory.h index 4231400ac1..e3c7736b72 100644 --- a/libraries/networking/src/SentPacketHistory.h +++ b/libraries/networking/src/SentPacketHistory.h @@ -1,6 +1,6 @@ // // SentPacketHistory.h -// assignement-client/src/octree +// libraries/networking/src // // Created by Yixin Wang on 6/5/2014 // @@ -11,25 +11,24 @@ #ifndef hifi_SentPacketHistory_h #define hifi_SentPacketHistory_h +#include #include #include -#include "OctreePacketData.h" - class SentPacketHistory { public: SentPacketHistory(int size); - void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet); - const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const; + void packetSent(uint16_t sequenceNumber, const QByteArray& packet); + const QByteArray* getPacket(uint16_t sequenceNumber) const; private: QVector _sentPackets; // circular buffer int _newestPacketAt; int _numExistingPackets; - OCTREE_PACKET_SEQUENCE _newestSequenceNumber; + uint16_t _newestSequenceNumber; }; #endif diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 2586ca034f..a737131dff 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -133,7 +133,7 @@ protected: unsigned short int _sequenceNumber; int _maxPacketSize; - // TODO: garbage-collect this + // TODO: garbage-collect this and _pendingEditPackets QHash _nackedPacketHistories; }; #endif // hifi_OctreeEditPacketSender_h From 95b25247849aa63149eac4704e98ef3692a12fff Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 16:08:48 -0700 Subject: [PATCH 14/30] removed magic number 65536 from OctreeInboundPacketProcessor --- assignment-client/src/octree/OctreeInboundPacketProcessor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 9b849905c5..1902ba28c3 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -286,7 +286,7 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, printf("\t\t tracked seq %d\n", incomingSequence); - const int UINT16_RANGE = 65536; + const int UINT16_RANGE = UINT16_MAX + 1; const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; @@ -305,7 +305,7 @@ printf("\t\t tracked seq %d\n", incomingSequence); int absGap = std::abs(incoming - expected); if (absGap >= UINT16_RANGE - MAX_REASONABLE_SEQUENCE_GAP) { // rollover likely occurred between incoming and expected. - // correct the larger of the two so that it's within [-65536, -1] while the other remains within [0, 65535] + // correct the larger of the two so that it's within [-UINT16_RANGE, -1] while the other remains within [0, UINT16_RANGE-1] if (incoming > expected) { incoming -= UINT16_RANGE; } else { From 18a9d74b88cb48841a22a8e881e59df85bc0e3b3 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 16:35:30 -0700 Subject: [PATCH 15/30] changed OctreeEditPacketSender to queue packets for resend as nack is parsed --- libraries/networking/src/SentPacketHistory.h | 2 +- .../octree/src/OctreeEditPacketSender.cpp | 79 ++++++++----------- libraries/octree/src/OctreeEditPacketSender.h | 17 +--- 3 files changed, 38 insertions(+), 60 deletions(-) diff --git a/libraries/networking/src/SentPacketHistory.h b/libraries/networking/src/SentPacketHistory.h index e3c7736b72..53a6919c42 100644 --- a/libraries/networking/src/SentPacketHistory.h +++ b/libraries/networking/src/SentPacketHistory.h @@ -18,7 +18,7 @@ class SentPacketHistory { public: - SentPacketHistory(int size); + SentPacketHistory(int size = 1000); void packetSent(uint16_t sequenceNumber, const QByteArray& packet); const QByteArray* getPacket(uint16_t sequenceNumber) const; diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 69b58aaa41..439148a9a5 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -17,44 +17,6 @@ #include #include "OctreeEditPacketSender.h" -void NackedPacketHistory::packetSent(const QByteArray& packet) { - // extract sequence number for the sent packet history - int numBytesPacketHeader = numBytesForPacketHeader(packet); - const char* dataAt = reinterpret_cast(packet.data()); - unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader))); - - // add packet to history - _sentPacketHistory.packetSent(sequence, packet); -} - -bool NackedPacketHistory::hasNextNackedPacket() const { - return !_nackedSequenceNumbers.isEmpty(); -} - -const QByteArray* NackedPacketHistory::getNextNackedPacket() { - if (!_nackedSequenceNumbers.isEmpty()) { - // could return null if packet is not in the history - return _sentPacketHistory.getPacket(_nackedSequenceNumbers.dequeue()); - } - return NULL; -} - -void NackedPacketHistory::parseNackPacket(const QByteArray& packet) { - int numBytesPacketHeader = numBytesForPacketHeader(packet); - const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; - - // read number of sequence numbers - uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); - dataAt += sizeof(uint16_t); - - // read sequence numbers - for (int i = 0; i < numSequenceNumbers; i++) { - unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); - _nackedSequenceNumbers.enqueue(sequenceNumber); - dataAt += sizeof(unsigned short int); - } -} - EditPacketBuffer::EditPacketBuffer(PacketType type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) : _nodeUUID(nodeUUID), _currentType(type), @@ -126,7 +88,7 @@ bool OctreeEditPacketSender::serversExist() const { // This method is called when the edit packet layer has determined that it has a fully formed packet destined for // a known nodeID. -void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) { +void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsigned char* buffer, ssize_t length) { NodeList* nodeList = NodeList::getInstance(); foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { @@ -134,14 +96,19 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c if (node->getType() == getMyNodeType() && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { if (node->getActiveSocket()) { - QByteArray packet(reinterpret_cast(buffer), length); + QByteArray packet(reinterpret_cast(buffer), length); queuePacketForSending(node, packet); - _nackedPacketHistories[nodeUUID].packetSent(packet); + + // extract sequence number and add packet to history + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const char* dataAt = reinterpret_cast(packet.data()); + unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader))); + _sentPacketHistories[nodeUUID].packetSent(sequence, packet); // debugging output... bool wantDebugging = false; if (wantDebugging) { - int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); + int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader))); quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence)))); quint64 queuedAt = usecTimestampNow(); @@ -377,7 +344,31 @@ bool OctreeEditPacketSender::process() { } void OctreeEditPacketSender::parseNackPacket(const QByteArray& packet) { - // parse sending node from packet + // parse sending node from packet, retrieve packet history for that node QUuid sendingNodeUUID = uuidFromPacketHeader(packet); - _nackedPacketHistories[sendingNodeUUID].parseNackPacket(packet); + + // if packet history doesn't exist for the sender node (somehow), bail + if (!_sentPacketHistories.contains(sendingNodeUUID)) { + return; + } + const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID); + + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; + + // read number of sequence numbers + uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); + dataAt += sizeof(uint16_t); + + // read sequence numbers and queue packets for resend + for (int i = 0; i < numSequenceNumbers; i++) { + unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); + dataAt += sizeof(unsigned short int); + + // retrieve packet from history + const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber); + if (packet) { + queuePacketToNode(sendingNodeUUID, (const unsigned char*)packet->constData(), packet->length()); + } + } } diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index a737131dff..45913b8db2 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -18,19 +18,6 @@ #include "JurisdictionMap.h" #include "SentPacketHistory.h" -class NackedPacketHistory { -public: - NackedPacketHistory() : _sentPacketHistory(1000), _nackedSequenceNumbers() { } -public: - void packetSent(const QByteArray& packet); - bool hasNextNackedPacket() const; - const QByteArray* getNextNackedPacket(); - void parseNackPacket(const QByteArray& packet); -private: - SentPacketHistory _sentPacketHistory; - QQueue _nackedSequenceNumbers; -}; - /// Used for construction of edit packets class EditPacketBuffer { public: @@ -110,7 +97,7 @@ public: protected: bool _shouldSend; - void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length); + void queuePacketToNode(const QUuid& nodeID, const unsigned char* buffer, ssize_t length); void queuePendingPacketToNodes(PacketType type, unsigned char* buffer, ssize_t length); void queuePacketToNodes(unsigned char* buffer, ssize_t length); void initializePacket(EditPacketBuffer& packetBuffer, PacketType type); @@ -134,6 +121,6 @@ protected: int _maxPacketSize; // TODO: garbage-collect this and _pendingEditPackets - QHash _nackedPacketHistories; + QHash _sentPacketHistories; }; #endif // hifi_OctreeEditPacketSender_h From 4d84e1fff1c2269797fcd4b761cc016323b3bb6b Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 16:41:00 -0700 Subject: [PATCH 16/30] added processNackPacket calls to DatagramProcessor --- interface/src/DatagramProcessor.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/interface/src/DatagramProcessor.cpp b/interface/src/DatagramProcessor.cpp index cdd7e7ef0f..e032056fc8 100644 --- a/interface/src/DatagramProcessor.cpp +++ b/interface/src/DatagramProcessor.cpp @@ -145,6 +145,12 @@ void DatagramProcessor::processDatagrams() { } break; } + case PacketTypeVoxelEditNack: + application->_voxelEditSender.processNackPacket(incomingPacket); + case PacketTypeParticleEditNack: + application->_particleEditSender.processNackPacket(incomingPacket); + case PacketTypeModelEditNack: + application->_modelEditSender.processNackPacket(incomingPacket); default: nodeList->processNodeData(senderSockAddr, incomingPacket); break; From e3db60d1efa4f2dd750668aad6fcfca04158ca37 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 16:42:04 -0700 Subject: [PATCH 17/30] forgot to add "break;"s --- interface/src/DatagramProcessor.cpp | 3 +++ libraries/octree/src/OctreeEditPacketSender.cpp | 2 +- libraries/octree/src/OctreeEditPacketSender.h | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/interface/src/DatagramProcessor.cpp b/interface/src/DatagramProcessor.cpp index e032056fc8..29528da126 100644 --- a/interface/src/DatagramProcessor.cpp +++ b/interface/src/DatagramProcessor.cpp @@ -147,10 +147,13 @@ void DatagramProcessor::processDatagrams() { } case PacketTypeVoxelEditNack: application->_voxelEditSender.processNackPacket(incomingPacket); + break; case PacketTypeParticleEditNack: application->_particleEditSender.processNackPacket(incomingPacket); + break; case PacketTypeModelEditNack: application->_modelEditSender.processNackPacket(incomingPacket); + break; default: nodeList->processNodeData(senderSockAddr, incomingPacket); break; diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 439148a9a5..16bb22987d 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -343,7 +343,7 @@ bool OctreeEditPacketSender::process() { return PacketSender::process(); } -void OctreeEditPacketSender::parseNackPacket(const QByteArray& packet) { +void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { // parse sending node from packet, retrieve packet history for that node QUuid sendingNodeUUID = uuidFromPacketHeader(packet); diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 45913b8db2..419e850ff8 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -93,7 +93,7 @@ public: virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) { }; public: - void parseNackPacket(const QByteArray& packet); + void processNackPacket(const QByteArray& packet); protected: bool _shouldSend; From 06f8464ec9ac54683cfbe0392ab790526a58ac04 Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 17:17:48 -0700 Subject: [PATCH 18/30] edit nacks ready for test; seq numbers sometimes repeat?? --- .../src/octree/OctreeInboundPacketProcessor.cpp | 6 +++--- libraries/networking/src/SentPacketHistory.cpp | 6 ++++++ libraries/octree/src/OctreeEditPacketSender.cpp | 17 +++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 1902ba28c3..b795139ed5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -194,8 +194,6 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns int OctreeInboundPacketProcessor::sendNackPackets() { - printf("\t\t sendNackPackets()\n"); - int packetsSent = 0; NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); @@ -336,7 +334,9 @@ printf("\t\t tracked seq %d\n", incomingSequence); printf("\t\t\t packet is late!\n"); // remove this from missing sequence number if it's in there - _missingSequenceNumbers.remove(incomingSequence); + if (_missingSequenceNumbers.remove(incomingSequence)) { + printf("\t\t\t\t packet %d recovered!!!\n", incomingSequence); + } // do not update _incomingLastSequence } diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index 37c8953861..eb55d2e1b4 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -19,6 +19,12 @@ SentPacketHistory::SentPacketHistory(int size) } void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& packet) { + + if (sequenceNumber != 0 && sequenceNumber != _newestSequenceNumber + 1) { + printf("\t packet history received unexpected seq number! prev: %d received: %d\n", _newestSequenceNumber, sequenceNumber); + } + + _newestSequenceNumber = sequenceNumber; // increment _newestPacketAt cyclically, insert new packet there. diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 16bb22987d..2e0999c4f8 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -97,6 +97,9 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsi ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { if (node->getActiveSocket()) { QByteArray packet(reinterpret_cast(buffer), length); + + bool send = randFloat() < 0.7f; + if (send) queuePacketForSending(node, packet); // extract sequence number and add packet to history @@ -105,6 +108,10 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsi unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader))); _sentPacketHistories[nodeUUID].packetSent(sequence, packet); + if (!send) { + printf("\t\t dropped packet %d !!!\n", sequence); + } + // debugging output... bool wantDebugging = false; if (wantDebugging) { @@ -314,14 +321,7 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize]; *sequenceAt = _sequenceNumber; packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence - if (randFloat() < 0.6f) _sequenceNumber++; - else - { - int x = randIntInRange(2, 4); - printf("\t\t seq number jumped from %d to %d\n", _sequenceNumber, _sequenceNumber + x); - _sequenceNumber += x; - } // pack in timestamp quint64 now = usecTimestampNow(); @@ -368,7 +368,8 @@ void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { // retrieve packet from history const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber); if (packet) { - queuePacketToNode(sendingNodeUUID, (const unsigned char*)packet->constData(), packet->length()); + const SharedNodePointer& node = NodeList::getInstance()->getNodeHash().value(sendingNodeUUID); + queuePacketForSending(node, *packet); } } } From dc71f87ea4f5ac2dc2270a4e48478b4bf4217929 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 10:42:56 -0700 Subject: [PATCH 19/30] edit nacks seem to be working; added mutex for releaseQueuedPacket() to prevent duplicate packets being queued up due to the steps of queueing the packet and clearing it not being atomic. --- .../octree/OctreeInboundPacketProcessor.cpp | 2 +- .../networking/src/SentPacketHistory.cpp | 2 +- .../octree/src/OctreeEditPacketSender.cpp | 30 +++++++++++++++---- libraries/octree/src/OctreeEditPacketSender.h | 2 ++ 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index b795139ed5..b0c4d905bc 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -282,7 +282,7 @@ SingleSenderStats::SingleSenderStats() void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { -printf("\t\t tracked seq %d\n", incomingSequence); +printf("\t\t tracked seq %hu\n", incomingSequence); const int UINT16_RANGE = UINT16_MAX + 1; diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index eb55d2e1b4..a34dada397 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -21,7 +21,7 @@ SentPacketHistory::SentPacketHistory(int size) void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& packet) { if (sequenceNumber != 0 && sequenceNumber != _newestSequenceNumber + 1) { - printf("\t packet history received unexpected seq number! prev: %d received: %d\n", _newestSequenceNumber, sequenceNumber); + printf("\t\tpacket history received unexpected seq number! prev: %hu received: %hu **************** \n", _newestSequenceNumber, sequenceNumber); } diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 2e0999c4f8..251886f70c 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -104,12 +104,28 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsi // extract sequence number and add packet to history int numBytesPacketHeader = numBytesForPacketHeader(packet); - const char* dataAt = reinterpret_cast(packet.data()); - unsigned short int sequence = (*((unsigned short int*)(dataAt + numBytesPacketHeader))); + const char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; + unsigned short int sequence = *((unsigned short int*)dataAt); +/* +// debug +dataAt += sizeof(unsigned short int); + +// extract time stamp +quint64 sentTime = *((quint64*)dataAt); +dataAt += sizeof(quint64); + +PacketType type = packetTypeForPacket(packet); + + +printf("adding packet to history. size: %d\n", packet.length()); +printf("type: %d, seq: %hu, time: %llu\n", (unsigned char)type, sequence, sentTime); +printf("destination node: %s\n", nodeUUID.toString().toLatin1().data()); +fflush(stdout); +*/ _sentPacketHistories[nodeUUID].packetSent(sequence, packet); if (!send) { - printf("\t\t dropped packet %d !!!\n", sequence); + printf("\t dropped packet %d !!! ---------------------------\n", sequence); } // debugging output... @@ -307,11 +323,15 @@ void OctreeEditPacketSender::releaseQueuedMessages() { } void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { + _releaseQueuedPacketMutex.lock(); + if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketTypeUnknown) { queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); + packetBuffer._currentSize = 0; + packetBuffer._currentType = PacketTypeUnknown; } - packetBuffer._currentSize = 0; - packetBuffer._currentType = PacketTypeUnknown; + + _releaseQueuedPacketMutex.unlock(); } void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PacketType type) { diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 419e850ff8..e9c616085f 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -122,5 +122,7 @@ protected: // TODO: garbage-collect this and _pendingEditPackets QHash _sentPacketHistories; + + QMutex _releaseQueuedPacketMutex; }; #endif // hifi_OctreeEditPacketSender_h From 6e71523346108b6cc2b5d59343413e409f6e6b44 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 10:58:44 -0700 Subject: [PATCH 20/30] added OctreeEditPacketSender::nodeKilled(), no locks yet also added nodeKilled() calls to the 3 editsenders in Application::nodeKilled() --- interface/src/Application.cpp | 10 ++++++++-- libraries/octree/src/OctreeEditPacketSender.cpp | 7 +++++++ libraries/octree/src/OctreeEditPacketSender.h | 11 +++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 7a7d7c10f8..02cf59ad87 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -3249,10 +3249,16 @@ void Application::nodeAdded(SharedNodePointer node) { void Application::nodeKilled(SharedNodePointer node) { - // this is here because connecting NodeList::nodeKilled to OctreePacketProcessor::nodeKilled doesn't work: - // OctreePacketProcessor::nodeKilled is not called when NodeList::nodeKilled is emitted for some reason. + // These are here because connecting NodeList::nodeKilled to OctreePacketProcessor::nodeKilled doesn't work: + // OctreePacketProcessor::nodeKilled is not being called when NodeList::nodeKilled is emitted. + // This may have to do with GenericThread::threadRoutine() blocking the QThread event loop + _octreeProcessor.nodeKilled(node); + _voxelEditSender.nodeKilled(node); + _particleEditSender.nodeKilled(node); + _modelEditSender.nodeKilled(node); + if (node->getType() == NodeType::VoxelServer) { QUuid nodeUUID = node->getUUID(); // see if this is the first we've heard of this node... diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 251886f70c..f82e7e3101 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -393,3 +393,10 @@ void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { } } } + +void OctreeEditPacketSender::nodeKilled(SharedNodePointer node) { + // TODO: add locks + QUuid nodeUUID = node->getUUID(); + _pendingEditPackets.remove(nodeUUID); + _sentPacketHistories.remove(nodeUUID); +} diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index e9c616085f..c16c0a2d4b 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -91,7 +91,10 @@ public: // you must override these... virtual char getMyNodeType() const = 0; virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) { }; - + +public slots: + void nodeKilled(SharedNodePointer node); + public: void processNackPacket(const QByteArray& packet); @@ -120,9 +123,9 @@ protected: unsigned short int _sequenceNumber; int _maxPacketSize; - // TODO: garbage-collect this and _pendingEditPackets - QHash _sentPacketHistories; - QMutex _releaseQueuedPacketMutex; + + // TODO: add locks for this and _pendingEditPackets + QHash _sentPacketHistories; }; #endif // hifi_OctreeEditPacketSender_h From 81879123844e1aade1362c4fb26006251800f3b6 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 11:34:35 -0700 Subject: [PATCH 21/30] added qDebug() check in SentPacketHistory for seq numbers used (unsigned short int)1 instead of 1 when calculating expectedSequence --- .../src/octree/OctreeInboundPacketProcessor.cpp | 6 +++--- libraries/networking/src/SentPacketHistory.cpp | 13 ++++++++----- libraries/octree/src/OctreeEditPacketSender.cpp | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index b0c4d905bc..44c9576943 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -289,7 +289,7 @@ printf("\t\t tracked seq %hu\n", incomingSequence); const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; - unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; + unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + (unsigned short int)1; if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; @@ -346,8 +346,8 @@ printf("\t\t tracked seq %hu\n", incomingSequence); // will be removed. if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { - // the acceptable range of older sequence numbers may contain a rollover point; this must be handled. - // some sequence number in this list may be larger than _incomingLastSequence, indicating that they were received + // some older sequence numbers may be from before a rollover point; this must be handled. + // some sequence numbers in this list may be larger than _incomingLastSequence, indicating that they were received // before the most recent rollover. int cutoff = (int)_incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP; if (cutoff >= 0) { diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index a34dada397..1e1157ba71 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -9,22 +9,26 @@ // #include "SentPacketHistory.h" +#include SentPacketHistory::SentPacketHistory(int size) : _sentPackets(size), _newestPacketAt(0), _numExistingPackets(0), - _newestSequenceNumber(0) + _newestSequenceNumber(UINT16_MAX) { } void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& packet) { - if (sequenceNumber != 0 && sequenceNumber != _newestSequenceNumber + 1) { - printf("\t\tpacket history received unexpected seq number! prev: %hu received: %hu **************** \n", _newestSequenceNumber, sequenceNumber); + // check if given seq number has the expected value. if not, something's wrong with + // the code calling this function + uint16_t expectedSequenceNumber = _newestSequenceNumber + (uint16_t)1; + if (sequenceNumber != expectedSequenceNumber) { + qDebug() << "Unexpected sequence number passed to SentPacketHistory::packetSent()!" + << "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber; } - _newestSequenceNumber = sequenceNumber; // increment _newestPacketAt cyclically, insert new packet there. @@ -36,7 +40,6 @@ void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& pa } } - const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const { const int UINT16_RANGE = UINT16_MAX + 1; diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index f82e7e3101..01d2ecf464 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -34,7 +34,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() : _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), _releaseQueuedMessagesPending(false), _serverJurisdictions(NULL), - _sequenceNumber(0), + _sequenceNumber(65500), _maxPacketSize(MAX_PACKET_SIZE) { } From 011e7c2de2f7ccfbfd2b50e7a47b3d36e7e913e0 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 11:53:09 -0700 Subject: [PATCH 22/30] removed debug code --- .../octree/OctreeInboundPacketProcessor.cpp | 18 +++---------- .../src/octree/OctreeInboundPacketProcessor.h | 3 +-- .../src/ReceivedPacketProcessor.cpp | 1 - .../octree/src/OctreeEditPacketSender.cpp | 25 ++----------------- 4 files changed, 6 insertions(+), 41 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 44c9576943..9a4c6d7cbb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -204,7 +204,6 @@ int OctreeInboundPacketProcessor::sendNackPackets() { // check if this node is still alive. Remove its stats if it's dead. if (!isAlive(nodeUUID)) { - printf("\t\t removing node %s\n", nodeUUID.toString().toLatin1().data()); i = _singleSenderStats.erase(i); continue; } @@ -244,12 +243,10 @@ int OctreeInboundPacketProcessor::sendNackPackets() { dataAt += sizeof(uint16_t); // pack sequence numbers to nack - printf("\t\t sending NACK with %d seq numbers:\n\t\t", numSequenceNumbers); for (uint16_t i = 0; i < numSequenceNumbers; i++) { unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; *sequenceNumberAt = *missingSequenceNumberIterator; dataAt += sizeof(unsigned short int); - printf("%d, ", *missingSequenceNumberIterator); missingSequenceNumberIterator++; } @@ -257,7 +254,6 @@ int OctreeInboundPacketProcessor::sendNackPackets() { // send it qint64 bytesWritten = NodeList::getInstance()->writeDatagram(packet, dataAt - packet, destinationNode); - printf("\t\t wrote %lld bytes\n\n", bytesWritten); packetsSent++; } @@ -282,8 +278,6 @@ SingleSenderStats::SingleSenderStats() void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { -printf("\t\t tracked seq %hu\n", incomingSequence); - const int UINT16_RANGE = UINT16_MAX + 1; const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work @@ -321,8 +315,6 @@ printf("\t\t tracked seq %hu\n", incomingSequence); if (incoming > expected) { // early - printf("\t\t\t packet is early! %d packets were skipped\n", incoming - expected); - // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); @@ -331,14 +323,10 @@ printf("\t\t tracked seq %hu\n", incomingSequence); } else { // late - printf("\t\t\t packet is late!\n"); - // remove this from missing sequence number if it's in there - if (_missingSequenceNumbers.remove(incomingSequence)) { - printf("\t\t\t\t packet %d recovered!!!\n", incomingSequence); - } + _missingSequenceNumbers.remove(incomingSequence); - // do not update _incomingLastSequence + // do not update _incomingLastSequence; it shouldn't become smaller } } @@ -373,4 +361,4 @@ printf("\t\t tracked seq %hu\n", incomingSequence); _totalLockWaitTime += lockWaitTime; _totalElementsInPacket += editsInPacket; _totalPackets++; -} \ No newline at end of file +} diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 378cc9a891..d3b3b80208 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -37,7 +37,6 @@ public: void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime); - quint64 _totalTransitTime; quint64 _totalProcessTime; quint64 _totalLockWaitTime; @@ -95,7 +94,7 @@ private: quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; - + NodeToSenderStatsMap _singleSenderStats; quint64 _lastNackTime; diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index d85f09fb0a..3ef518bbc2 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -54,5 +54,4 @@ void ReceivedPacketProcessor::nodeKilled(SharedNodePointer node) { lock(); _nodePacketCounts.remove(node->getUUID()); unlock(); - printf("\n\t\t nodeKilled()!!!!! --------------------------\n\n"); } diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 01d2ecf464..f49fe9f22f 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -34,7 +34,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() : _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), _releaseQueuedMessagesPending(false), _serverJurisdictions(NULL), - _sequenceNumber(65500), + _sequenceNumber(0), _maxPacketSize(MAX_PACKET_SIZE) { } @@ -98,36 +98,15 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsi if (node->getActiveSocket()) { QByteArray packet(reinterpret_cast(buffer), length); - bool send = randFloat() < 0.7f; - if (send) queuePacketForSending(node, packet); // extract sequence number and add packet to history int numBytesPacketHeader = numBytesForPacketHeader(packet); const char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; unsigned short int sequence = *((unsigned short int*)dataAt); -/* -// debug -dataAt += sizeof(unsigned short int); - -// extract time stamp -quint64 sentTime = *((quint64*)dataAt); -dataAt += sizeof(quint64); - -PacketType type = packetTypeForPacket(packet); - - -printf("adding packet to history. size: %d\n", packet.length()); -printf("type: %d, seq: %hu, time: %llu\n", (unsigned char)type, sequence, sentTime); -printf("destination node: %s\n", nodeUUID.toString().toLatin1().data()); -fflush(stdout); -*/ + _sentPacketHistories[nodeUUID].packetSent(sequence, packet); - if (!send) { - printf("\t dropped packet %d !!! ---------------------------\n", sequence); - } - // debugging output... bool wantDebugging = false; if (wantDebugging) { From 5f62b43ba6e0ebd1aafd5874c00d1377242c0649 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 12:14:44 -0700 Subject: [PATCH 23/30] added edit nack types to non-verified list --- libraries/networking/src/PacketHeaders.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 5ed4110627..0f87b0e607 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -79,7 +79,7 @@ const QSet NON_VERIFIED_PACKETS = QSet() << PacketTypeDomainList << PacketTypeDomainListRequest << PacketTypeDomainOAuthRequest << PacketTypeCreateAssignment << PacketTypeRequestAssignment << PacketTypeStunResponse << PacketTypeNodeJsonStats << PacketTypeVoxelQuery << PacketTypeParticleQuery << PacketTypeModelQuery - << PacketTypeOctreeDataNack; + << PacketTypeOctreeDataNack << PacketTypeVoxelEditNack << PacketTypeParticleEditNack << PacketTypeModelEditNack; const int NUM_BYTES_MD5_HASH = 16; const int NUM_STATIC_HEADER_BYTES = sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; From 604b17185beee992101113a95205f1419b582590 Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 14:43:13 -0700 Subject: [PATCH 24/30] call writeUnverifiedDatagram for sending edit nacks --- assignment-client/src/octree/OctreeInboundPacketProcessor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 9a4c6d7cbb..91a5c62752 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -253,7 +253,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() { numSequenceNumbersAvailable -= numSequenceNumbers; // send it - qint64 bytesWritten = NodeList::getInstance()->writeDatagram(packet, dataAt - packet, destinationNode); + qint64 bytesWritten = NodeList::getInstance()->writeUnverifiedDatagram(packet, dataAt - packet, destinationNode); packetsSent++; } From 7f4cf3719e709078f07a61d62fe89441bf22a7ee Mon Sep 17 00:00:00 2001 From: wangyix Date: Tue, 17 Jun 2014 16:40:08 -0700 Subject: [PATCH 25/30] added rollover handling to OctreeSceneStats --- libraries/octree/src/OctreeSceneStats.cpp | 131 +++++++++++++++------- 1 file changed, 89 insertions(+), 42 deletions(-) diff --git a/libraries/octree/src/OctreeSceneStats.cpp b/libraries/octree/src/OctreeSceneStats.cpp index 868ef29886..e99096dd5c 100644 --- a/libraries/octree/src/OctreeSceneStats.cpp +++ b/libraries/octree/src/OctreeSceneStats.cpp @@ -842,8 +842,8 @@ const char* OctreeSceneStats::getItemValue(Item item) { } void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, - bool wasStatsPacket, int nodeClockSkewUsec) { - const bool wantExtraDebugging = false; + bool wasStatsPacket, int nodeClockSkewUsec) { + const bool wantExtraDebugging = true; int numBytesPacketHeader = numBytesForPacketHeader(packet); const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; @@ -852,10 +852,10 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, dataAt += sizeof(OCTREE_PACKET_FLAGS); OCTREE_PACKET_SEQUENCE sequence = (*(OCTREE_PACKET_SEQUENCE*)dataAt); dataAt += sizeof(OCTREE_PACKET_SEQUENCE); - + OCTREE_PACKET_SENT_TIME sentAt = (*(OCTREE_PACKET_SENT_TIME*)dataAt); dataAt += sizeof(OCTREE_PACKET_SENT_TIME); - + //bool packetIsColored = oneAtBit(flags, PACKET_IS_COLOR_BIT); //bool packetIsCompressed = oneAtBit(flags, PACKET_IS_COMPRESSED_BIT); @@ -877,49 +877,67 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, return; // ignore any packets that are unreasonable } + const int UINT16_RANGE = UINT16_MAX + 1; + // determine our expected sequence number... handle rollover appropriately - OCTREE_PACKET_SEQUENCE expected = _incomingPacket > 0 ? _incomingLastSequence + 1 : sequence; - - // Guard against possible corrupted packets... with bad sequence numbers - const int MAX_RESONABLE_SEQUENCE_OFFSET = 2000; - const int MIN_RESONABLE_SEQUENCE_OFFSET = -2000; - int sequenceOffset = (sequence - expected); - if (sequenceOffset > MAX_RESONABLE_SEQUENCE_OFFSET || sequenceOffset < MIN_RESONABLE_SEQUENCE_OFFSET) { - qDebug() << "ignoring unreasonable packet... sequence:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; - return; // ignore any packets that are unreasonable - } - - // track packets here... - _incomingPacket++; - _incomingBytes += packet.size(); - if (!wasStatsPacket) { - _incomingWastedBytes += (MAX_PACKET_SIZE - packet.size()); - } + OCTREE_PACKET_SEQUENCE expected = _incomingPacket > 0 ? _incomingLastSequence + (quint16)1 : sequence; const int USECS_PER_MSEC = 1000; float flightTimeMsecs = flightTime / USECS_PER_MSEC; _incomingFlightTimeAverage.updateAverage(flightTimeMsecs); - + // track out of order and possibly lost packets... if (sequence == _incomingLastSequence) { if (wantExtraDebugging) { qDebug() << "last packet duplicate got:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; } - } else { + } + else { if (sequence != expected) { if (wantExtraDebugging) { qDebug() << "out of order... got:" << sequence << "expected:" << expected; } + int sequenceInt = (int)sequence; + int expectedInt = (int)expected; + + // if distance between sequence and expected are more than half of the total range of possible seq numbers, + // assume that a rollover occurred between the two. + // correct the larger one so it's in the range [-UINT16_RANGE, -1] while the other remains in [0, UINT16_RANGE-1] + // after doing so, sequenceInt and expectedInt can be correctly compared to each other, though one may be negative + if (std::abs(sequenceInt - expectedInt) > UINT16_RANGE / 2) { + if (sequenceInt > expectedInt) { + sequenceInt -= UINT16_RANGE; + } + else { + expectedInt -= UINT16_RANGE; + } + } + + // Guard against possible corrupted packets... with bad sequence numbers + const int MAX_RESONABLE_SEQUENCE_OFFSET = 2000; + const int MIN_RESONABLE_SEQUENCE_OFFSET = -2000; + + int sequenceOffset = (sequenceInt - expectedInt); + if (sequenceOffset > MAX_RESONABLE_SEQUENCE_OFFSET || sequenceOffset < MIN_RESONABLE_SEQUENCE_OFFSET) { + qDebug() << "ignoring unreasonable packet... sequence:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; + return; // ignore any packets that are unreasonable + } + // if the sequence is less than our expected, then this might be a packet // that was delayed and so we should find it in our lostSequence list - if (sequence < expected) { + if (sequenceInt < expectedInt) { + + // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] + // if rollover between them: sequenceInt in [-UINT16_RANGE, -1], expectedInt in [0, UINT16_RANGE-1] + if (wantExtraDebugging) { qDebug() << "this packet is later than expected..."; } - if (sequence < std::max(0, (expected - MAX_MISSING_SEQUENCE_OLD_AGE))) { + if (sequenceInt < expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE) { _incomingReallyLate++; - } else { + } + else { _incomingLate++; } @@ -931,57 +949,80 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, _sequenceNumbersToNack.remove(sequence); _incomingLikelyLost--; _incomingRecovered++; - } else { + } + else { // if we're still in our pruning window, and we didn't find it in our missing list, // than this is really unexpected and can probably only happen if the packet was a // duplicate - if (sequence >= std::max(0, (expected - MAX_MISSING_SEQUENCE_OLD_AGE))) { + if (sequenceInt >= expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE) { if (wantExtraDebugging) { - qDebug() << "sequence:" << sequence << "WAS NOT found in _missingSequenceNumbers, and not that old... (expected - MAX_MISSING_SEQUENCE_OLD_AGE):" << (expected - MAX_MISSING_SEQUENCE_OLD_AGE); + qDebug() << "sequence:" << sequence << "WAS NOT found in _missingSequenceNumbers, and not that old... (expected - MAX_MISSING_SEQUENCE_OLD_AGE):" + << (uint16_t)(expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE); } _incomingPossibleDuplicate++; } } + + // don't update _incomingLastSequence in this case. + // only bump the last sequence if it was greater than our expected sequence, this will keep us from + // accidentally going backwards when an out of order (recovered) packet comes in + } - - if (sequence > expected) { + else { // sequenceInt > expectedInt + + // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] + // if rollover between them: sequenceInt in [0, UINT16_RANGE-1], expectedInt in [-UINT16_RANGE, -1] + if (wantExtraDebugging) { qDebug() << "this packet is earlier than expected..."; } _incomingEarly++; // hmm... so, we either didn't get some packets, or this guy came early... - unsigned int missing = sequence - expected; + int missing = sequenceInt - expectedInt; if (wantExtraDebugging) { qDebug() << ">>>>>>>> missing gap=" << missing; } _incomingLikelyLost += missing; - for(unsigned int missingSequence = expected; missingSequence < sequence; missingSequence++) { + for (int missingSequenceInt = expectedInt; missingSequenceInt < sequenceInt; missingSequenceInt++) { + OCTREE_PACKET_SEQUENCE missingSequence = missingSequenceInt >= 0 ? missingSequenceInt : missingSequenceInt + UINT16_RANGE; _missingSequenceNumbers << missingSequence; _sequenceNumbersToNack << missingSequence; } + + _incomingLastSequence = sequence; } } + else { // sequence = expected + + _incomingLastSequence = sequence; + } } - // only bump the last sequence if it was greater than our expected sequence, this will keep us from - // accidentally going backwards when an out of order (recovered) packet comes in - if (sequence >= expected) { - _incomingLastSequence = sequence; - } - // do some garbage collecting on our _missingSequenceNumbers if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE) { if (wantExtraDebugging) { qDebug() << "too many _missingSequenceNumbers:" << _missingSequenceNumbers.size(); } + + int oldAgeCutoff = (int)_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE; + foreach(uint16_t missingItem, _missingSequenceNumbers) { if (wantExtraDebugging) { qDebug() << "checking item:" << missingItem << "is it in need of pruning?"; - qDebug() << "(_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE):" - << (_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE); + qDebug() << "(_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE):" + << (uint16_t)((int)_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE); } - if (missingItem <= std::max(0, _incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE)) { + + bool prune; + if (oldAgeCutoff >= 0) { + prune = (missingItem <= oldAgeCutoff || missingItem > _incomingLastSequence); + } + else { + prune = (missingItem <= oldAgeCutoff + UINT16_RANGE && missingItem > _incomingLastSequence); + } + + if (prune) { if (wantExtraDebugging) { qDebug() << "pruning really old missing sequence:" << missingItem; } @@ -991,6 +1032,12 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, } } + // track packets here... + _incomingPacket++; + _incomingBytes += packet.size(); + if (!wasStatsPacket) { + _incomingWastedBytes += (MAX_PACKET_SIZE - packet.size()); + } } int OctreeSceneStats::getNumSequenceNumbersToNack() const { From 8c4e365958654580132ed955ae22bb1cdbc72aed Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 09:28:42 -0700 Subject: [PATCH 26/30] added forgotten i++ in sendNackPackets() plus minor style fixes --- .../octree/OctreeInboundPacketProcessor.cpp | 71 +++++++------------ .../src/octree/OctreeInboundPacketProcessor.h | 6 +- .../src/ReceivedPacketProcessor.cpp | 5 +- .../networking/src/ReceivedPacketProcessor.h | 12 ++++ libraries/octree/src/OctreeSceneStats.cpp | 11 ++- 5 files changed, 50 insertions(+), 55 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 91a5c62752..6bc085df4b 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -17,6 +17,7 @@ #include "OctreeInboundPacketProcessor.h" static QUuid DEFAULT_NODE_ID_REF; +const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServer) : _myServer(myServer), @@ -41,46 +42,33 @@ void OctreeInboundPacketProcessor::resetStats() { _singleSenderStats.clear(); } -bool OctreeInboundPacketProcessor::process() { - - const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; +unsigned long OctreeInboundPacketProcessor::getMaxWait() const { + // calculate time until next sendNackPackets() + quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; quint64 now = usecTimestampNow(); - - if (_packets.size() == 0) { - // calculate time until next sendNackPackets() - quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; - quint64 now = usecTimestampNow(); - if (now >= nextNackTime) { - // send nacks if we're already past time to send it - _lastNackTime = now; - sendNackPackets(); - } - else { - // otherwise, wait until the next nack time or until a packet arrives - quint64 waitTimeMsecs = (nextNackTime - now) / USECS_PER_MSEC + 1; - _waitingOnPacketsMutex.lock(); - _hasPackets.wait(&_waitingOnPacketsMutex, waitTimeMsecs); - _waitingOnPacketsMutex.unlock(); - } + if (now >= nextNackTime) { + return 0; } - while (_packets.size() > 0) { - lock(); // lock to make sure nothing changes on us - NetworkPacket& packet = _packets.front(); // get the oldest packet - NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us - _packets.erase(_packets.begin()); // remove the oldest packet - _nodePacketCounts[temporary.getNode()->getUUID()]--; - unlock(); // let others add to the packets - processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy - - // if it's time to send nacks, send them. - if (usecTimestampNow() - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { - _lastNackTime = now; - sendNackPackets(); - } - } - return isStillRunning(); // keep running till they terminate us + return (nextNackTime - now) / USECS_PER_MSEC + 1; } +void OctreeInboundPacketProcessor::preProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} + +void OctreeInboundPacketProcessor::midProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { @@ -211,6 +199,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() { // if there are packets from _node that are waiting to be processed, // don't send a NACK since the missing packets may be among those waiting packets. if (hasPacketsToProcessFrom(nodeUUID)) { + i++; continue; } @@ -279,7 +268,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { const int UINT16_RANGE = UINT16_MAX + 1; - const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; @@ -287,9 +275,7 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; - } - else { // out of order - + } else { // out of order int incoming = (int)incomingSequence; int expected = (int)expectedSequence; @@ -312,17 +298,13 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, // now that rollover has been corrected for (if it occurred), incoming and expected can be // compared to each other directly, though one of them might be negative - if (incoming > expected) { // early - // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); } _incomingLastSequence = incomingSequence; - } else { // late - // remove this from missing sequence number if it's in there _missingSequenceNumbers.remove(incomingSequence); @@ -333,7 +315,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, // prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP // will be removed. if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { - // some older sequence numbers may be from before a rollover point; this must be handled. // some sequence numbers in this list may be larger than _incomingLastSequence, indicating that they were received // before the most recent rollover. diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index d3b3b80208..46a57205cb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -77,9 +77,11 @@ protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); - virtual bool process(); + virtual unsigned long getMaxWait() const; + virtual void preProcess(); + virtual void midProcess(); -public: +private: int sendNackPackets(); private: diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index 3ef518bbc2..59e1ecd456 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -35,9 +35,10 @@ bool ReceivedPacketProcessor::process() { if (_packets.size() == 0) { _waitingOnPacketsMutex.lock(); - _hasPackets.wait(&_waitingOnPacketsMutex); + _hasPackets.wait(&_waitingOnPacketsMutex, getMaxWait()); _waitingOnPacketsMutex.unlock(); } + preProcess(); while (_packets.size() > 0) { lock(); // lock to make sure nothing changes on us NetworkPacket& packet = _packets.front(); // get the oldest packet @@ -46,7 +47,9 @@ bool ReceivedPacketProcessor::process() { _nodePacketCounts[temporary.getNode()->getUUID()]--; unlock(); // let others add to the packets processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy + midProcess(); } + postProcess(); return isStillRunning(); // keep running till they terminate us } diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 4322c87910..607f9e54c2 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -63,6 +63,18 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); + /// Determines the timeout of the wait when there are no packets to process. Default value means no timeout + virtual unsigned long getMaxWait() const { return ULONG_MAX; } + + /// Override to do work before the packets processing loop. Default does nothing. + virtual void preProcess() { } + + /// Override to do work inside the packet processing loop after a packet is processed. Default does nothing. + virtual void midProcess() { } + + /// Override to do work after the packets processing loop. Default does nothing. + virtual void postProcess() { } + virtual void terminating(); protected: diff --git a/libraries/octree/src/OctreeSceneStats.cpp b/libraries/octree/src/OctreeSceneStats.cpp index e99096dd5c..1ed078ed14 100644 --- a/libraries/octree/src/OctreeSceneStats.cpp +++ b/libraries/octree/src/OctreeSceneStats.cpp @@ -843,7 +843,7 @@ const char* OctreeSceneStats::getItemValue(Item item) { void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, bool wasStatsPacket, int nodeClockSkewUsec) { - const bool wantExtraDebugging = true; + const bool wantExtraDebugging = false; int numBytesPacketHeader = numBytesForPacketHeader(packet); const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; @@ -891,8 +891,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, if (wantExtraDebugging) { qDebug() << "last packet duplicate got:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; } - } - else { + } else { if (sequence != expected) { if (wantExtraDebugging) { qDebug() << "out of order... got:" << sequence << "expected:" << expected; @@ -967,8 +966,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, // only bump the last sequence if it was greater than our expected sequence, this will keep us from // accidentally going backwards when an out of order (recovered) packet comes in - } - else { // sequenceInt > expectedInt + } else { // sequenceInt > expectedInt // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] // if rollover between them: sequenceInt in [0, UINT16_RANGE-1], expectedInt in [-UINT16_RANGE, -1] @@ -992,8 +990,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, _incomingLastSequence = sequence; } - } - else { // sequence = expected + } else { // sequence = expected _incomingLastSequence = sequence; } From b63c88f42c811adabe6c3148ee8a5238ec6347b2 Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 09:38:39 -0700 Subject: [PATCH 27/30] removed more spaces --- libraries/octree/src/OctreeEditPacketSender.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index f49fe9f22f..43e253b2da 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -97,14 +97,12 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsi ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { if (node->getActiveSocket()) { QByteArray packet(reinterpret_cast(buffer), length); - queuePacketForSending(node, packet); // extract sequence number and add packet to history int numBytesPacketHeader = numBytesForPacketHeader(packet); const char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; unsigned short int sequence = *((unsigned short int*)dataAt); - _sentPacketHistories[nodeUUID].packetSent(sequence, packet); // debugging output... @@ -303,13 +301,11 @@ void OctreeEditPacketSender::releaseQueuedMessages() { void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { _releaseQueuedPacketMutex.lock(); - if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketTypeUnknown) { queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); packetBuffer._currentSize = 0; packetBuffer._currentType = PacketTypeUnknown; } - _releaseQueuedPacketMutex.unlock(); } From 54f32d331e258a801d041a1e436ff42c044c4f96 Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 10:20:19 -0700 Subject: [PATCH 28/30] replaced UINT16_MAX with std::numeric_limits::max --- libraries/networking/src/SentPacketHistory.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index 1e1157ba71..841b5e909c 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -8,6 +8,7 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include #include "SentPacketHistory.h" #include @@ -15,7 +16,7 @@ SentPacketHistory::SentPacketHistory(int size) : _sentPackets(size), _newestPacketAt(0), _numExistingPackets(0), - _newestSequenceNumber(UINT16_MAX) + _newestSequenceNumber(std::numeric_limits::max()) { } @@ -42,7 +43,7 @@ void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& pa const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const { - const int UINT16_RANGE = UINT16_MAX + 1; + const int UINT16_RANGE = std::numeric_limits::max() + 1; // if sequenceNumber > _newestSequenceNumber, assume sequenceNumber is from before the most recent rollover // correct the diff so that it correctly represents how far back in the history sequenceNumber is From 7cef5eeeec0cf31d05bd3905451d7251aea20ba5 Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 10:27:02 -0700 Subject: [PATCH 29/30] replaced UINT16_MAX at 2 other places --- assignment-client/src/octree/OctreeInboundPacketProcessor.cpp | 3 ++- libraries/octree/src/OctreeSceneStats.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 6bc085df4b..b13f3d7096 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -9,6 +9,7 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include #include #include @@ -267,7 +268,7 @@ SingleSenderStats::SingleSenderStats() void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { - const int UINT16_RANGE = UINT16_MAX + 1; + const int UINT16_RANGE = std::numeric_limits::max() + 1; const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; diff --git a/libraries/octree/src/OctreeSceneStats.cpp b/libraries/octree/src/OctreeSceneStats.cpp index 1ed078ed14..d037ec79ad 100644 --- a/libraries/octree/src/OctreeSceneStats.cpp +++ b/libraries/octree/src/OctreeSceneStats.cpp @@ -9,6 +9,7 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include #include #include @@ -877,7 +878,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, return; // ignore any packets that are unreasonable } - const int UINT16_RANGE = UINT16_MAX + 1; + const int UINT16_RANGE = std::numeric_limits::max() + 1; // determine our expected sequence number... handle rollover appropriately OCTREE_PACKET_SEQUENCE expected = _incomingPacket > 0 ? _incomingLastSequence + (quint16)1 : sequence; From d78ed66616fc6836a0efa935d145f369c6e0191d Mon Sep 17 00:00:00 2001 From: wangyix Date: Wed, 18 Jun 2014 11:37:54 -0700 Subject: [PATCH 30/30] removed spaces before comments in OctreeInboundPacketProcessor --- .../src/octree/OctreeInboundPacketProcessor.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index b13f3d7096..78ab9259fd 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -274,9 +274,9 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + (unsigned short int)1; - if (incomingSequence == expectedSequence) { // on time + if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; - } else { // out of order + } else { // out of order int incoming = (int)incomingSequence; int expected = (int)expectedSequence; @@ -299,13 +299,13 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, // now that rollover has been corrected for (if it occurred), incoming and expected can be // compared to each other directly, though one of them might be negative - if (incoming > expected) { // early + if (incoming > expected) { // early // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); } _incomingLastSequence = incomingSequence; - } else { // late + } else { // late // remove this from missing sequence number if it's in there _missingSequenceNumbers.remove(incomingSequence);