From 7955979599cced0a45d9cbf9da89ec643f073094 Mon Sep 17 00:00:00 2001 From: wangyix Date: Thu, 12 Jun 2014 09:17:12 -0700 Subject: [PATCH] added _missingSequenceNumbers tracking to OctreeInboundPacketProcessor --- .../octree/OctreeInboundPacketProcessor.cpp | 83 ++++++++++++++----- .../src/octree/OctreeInboundPacketProcessor.h | 9 +- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 545c502036..52407f78d5 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -123,13 +123,13 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin qDebug() << "sender has no known nodeUUID."; } } - trackInboundPackets(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); + trackInboundPacket(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); } else { qDebug("unknown packet ignored... packetType=%d", packetType); } } -void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, +void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { _totalTransitTime += transitTime; @@ -142,31 +142,74 @@ void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, in // see if this is the first we've heard of this node... if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { SingleSenderStats stats; - - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; - + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); _singleSenderStats[nodeUUID] = stats; } else { SingleSenderStats& stats = _singleSenderStats[nodeUUID]; - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); } } +SingleSenderStats::SingleSenderStats() + : _totalTransitTime(0), + _totalProcessTime(0), + _totalLockWaitTime(0), + _totalElementsInPacket(0), + _totalPackets(0), + _missingSequenceNumbers() +{ -SingleSenderStats::SingleSenderStats() { - _totalTransitTime = 0; - _totalProcessTime = 0; - _totalLockWaitTime = 0; - _totalElementsInPacket = 0; - _totalPackets = 0; } +void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime) { + const int MAX_REASONABLE_SEQUENCE_GAP = 1000; + const int MAX_MISSING_SEQUENCE_SIZE = 100; + + unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; + + if (incomingSequence == expectedSequence) { // on time + _incomingLastSequence = incomingSequence; + } + else { + // ignore packet if sequence number gap is unreasonable + if (std::abs(incomingSequence - expectedSequence) > MAX_REASONABLE_SEQUENCE_GAP) { + qDebug() << "ignoring unreasonable packet... sequence:" << incomingSequence + << "_incomingLastSequence:" << _incomingLastSequence; + return; + } + + if (incomingSequence > expectedSequence) { // early + + // add all sequence numbers that were skipped to the missing sequence numbers list + for (int missingSequence = expectedSequence; missingSequence < incomingSequence; missingSequence++) { + _missingSequenceNumbers.insert(missingSequence); + } + _incomingLastSequence = incomingSequence; + + } else { // late + + // remove this from missing sequence number if it's in there + _missingSequenceNumbers.remove(incomingSequence); + + // do not update _incomingLastSequence + } + } + + // prune missing sequence list if it gets too big + if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } + + // update other stats + _totalTransitTime += transitTime; + _totalProcessTime += processTime; + _totalLockWaitTime += lockWaitTime; + _totalElementsInPacket += editsInPacket; + _totalPackets++; +} \ No newline at end of file diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index f637a9e7c9..82471bcddf 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -32,12 +32,19 @@ public: { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } quint64 getAverageLockWaitTimePerElement() const { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } + + void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime); + quint64 _totalTransitTime; quint64 _totalProcessTime; quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; + + unsigned short int _incomingLastSequence; + QSet _missingSequenceNumbers; }; typedef std::map NodeToSenderStatsMap; @@ -69,7 +76,7 @@ protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); private: - void trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, + void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); OctreeServer* _myServer;