added _missingSequenceNumbers tracking to OctreeInboundPacketProcessor

This commit is contained in:
wangyix 2014-06-12 09:17:12 -07:00
parent 7aef5edb8f
commit 7955979599
2 changed files with 71 additions and 21 deletions

View file

@ -123,13 +123,13 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin
qDebug() << "sender has no known nodeUUID."; qDebug() << "sender has no known nodeUUID.";
} }
} }
trackInboundPackets(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); trackInboundPacket(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime);
} else { } else {
qDebug("unknown packet ignored... packetType=%d", packetType); qDebug("unknown packet ignored... packetType=%d", packetType);
} }
} }
void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime) { int editsInPacket, quint64 processTime, quint64 lockWaitTime) {
_totalTransitTime += transitTime; _totalTransitTime += transitTime;
@ -142,31 +142,74 @@ void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, in
// see if this is the first we've heard of this node... // see if this is the first we've heard of this node...
if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) {
SingleSenderStats stats; SingleSenderStats stats;
stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime);
stats._totalTransitTime += transitTime;
stats._totalProcessTime += processTime;
stats._totalLockWaitTime += lockWaitTime;
stats._totalElementsInPacket += editsInPacket;
stats._totalPackets++;
_singleSenderStats[nodeUUID] = stats; _singleSenderStats[nodeUUID] = stats;
} else { } else {
SingleSenderStats& stats = _singleSenderStats[nodeUUID]; SingleSenderStats& stats = _singleSenderStats[nodeUUID];
stats._totalTransitTime += transitTime; stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime);
stats._totalProcessTime += processTime;
stats._totalLockWaitTime += lockWaitTime;
stats._totalElementsInPacket += editsInPacket;
stats._totalPackets++;
} }
} }
SingleSenderStats::SingleSenderStats()
: _totalTransitTime(0),
_totalProcessTime(0),
_totalLockWaitTime(0),
_totalElementsInPacket(0),
_totalPackets(0),
_missingSequenceNumbers()
{
SingleSenderStats::SingleSenderStats() {
_totalTransitTime = 0;
_totalProcessTime = 0;
_totalLockWaitTime = 0;
_totalElementsInPacket = 0;
_totalPackets = 0;
} }
void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime) {
const int MAX_REASONABLE_SEQUENCE_GAP = 1000;
const int MAX_MISSING_SEQUENCE_SIZE = 100;
unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1;
if (incomingSequence == expectedSequence) { // on time
_incomingLastSequence = incomingSequence;
}
else {
// ignore packet if sequence number gap is unreasonable
if (std::abs(incomingSequence - expectedSequence) > MAX_REASONABLE_SEQUENCE_GAP) {
qDebug() << "ignoring unreasonable packet... sequence:" << incomingSequence
<< "_incomingLastSequence:" << _incomingLastSequence;
return;
}
if (incomingSequence > expectedSequence) { // early
// add all sequence numbers that were skipped to the missing sequence numbers list
for (int missingSequence = expectedSequence; missingSequence < incomingSequence; missingSequence++) {
_missingSequenceNumbers.insert(missingSequence);
}
_incomingLastSequence = incomingSequence;
} else { // late
// remove this from missing sequence number if it's in there
_missingSequenceNumbers.remove(incomingSequence);
// do not update _incomingLastSequence
}
}
// prune missing sequence list if it gets too big
if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) {
foreach(unsigned short int missingSequence, _missingSequenceNumbers) {
if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) {
_missingSequenceNumbers.remove(missingSequence);
}
}
}
// update other stats
_totalTransitTime += transitTime;
_totalProcessTime += processTime;
_totalLockWaitTime += lockWaitTime;
_totalElementsInPacket += editsInPacket;
_totalPackets++;
}

View file

@ -32,12 +32,19 @@ public:
{ return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; }
quint64 getAverageLockWaitTimePerElement() const quint64 getAverageLockWaitTimePerElement() const
{ return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; }
void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime,
int editsInPacket, quint64 processTime, quint64 lockWaitTime);
quint64 _totalTransitTime; quint64 _totalTransitTime;
quint64 _totalProcessTime; quint64 _totalProcessTime;
quint64 _totalLockWaitTime; quint64 _totalLockWaitTime;
quint64 _totalElementsInPacket; quint64 _totalElementsInPacket;
quint64 _totalPackets; quint64 _totalPackets;
unsigned short int _incomingLastSequence;
QSet<unsigned short int> _missingSequenceNumbers;
}; };
typedef std::map<QUuid, SingleSenderStats> NodeToSenderStatsMap; typedef std::map<QUuid, SingleSenderStats> NodeToSenderStatsMap;
@ -69,7 +76,7 @@ protected:
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
private: private:
void trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime,
int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); int voxelsInPacket, quint64 processTime, quint64 lockWaitTime);
OctreeServer* _myServer; OctreeServer* _myServer;