added forgotten i++ in sendNackPackets()

plus minor style fixes
This commit is contained in:
wangyix 2014-06-18 09:28:42 -07:00
parent 7f4cf3719e
commit 8c4e365958
5 changed files with 50 additions and 55 deletions

View file

@ -17,6 +17,7 @@
#include "OctreeInboundPacketProcessor.h"
static QUuid DEFAULT_NODE_ID_REF;
const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND;
OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServer) :
_myServer(myServer),
@ -41,46 +42,33 @@ void OctreeInboundPacketProcessor::resetStats() {
_singleSenderStats.clear();
}
bool OctreeInboundPacketProcessor::process() {
const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND;
unsigned long OctreeInboundPacketProcessor::getMaxWait() const {
// calculate time until next sendNackPackets()
quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK;
quint64 now = usecTimestampNow();
if (_packets.size() == 0) {
// calculate time until next sendNackPackets()
quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK;
quint64 now = usecTimestampNow();
if (now >= nextNackTime) {
// send nacks if we're already past time to send it
_lastNackTime = now;
sendNackPackets();
}
else {
// otherwise, wait until the next nack time or until a packet arrives
quint64 waitTimeMsecs = (nextNackTime - now) / USECS_PER_MSEC + 1;
_waitingOnPacketsMutex.lock();
_hasPackets.wait(&_waitingOnPacketsMutex, waitTimeMsecs);
_waitingOnPacketsMutex.unlock();
}
if (now >= nextNackTime) {
return 0;
}
while (_packets.size() > 0) {
lock(); // lock to make sure nothing changes on us
NetworkPacket& packet = _packets.front(); // get the oldest packet
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us
_packets.erase(_packets.begin()); // remove the oldest packet
_nodePacketCounts[temporary.getNode()->getUUID()]--;
unlock(); // let others add to the packets
processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy
// if it's time to send nacks, send them.
if (usecTimestampNow() - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) {
_lastNackTime = now;
sendNackPackets();
}
}
return isStillRunning(); // keep running till they terminate us
return (nextNackTime - now) / USECS_PER_MSEC + 1;
}
void OctreeInboundPacketProcessor::preProcess() {
// check if it's time to send a nack. If yes, do so
quint64 now = usecTimestampNow();
if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) {
_lastNackTime = now;
sendNackPackets();
}
}
void OctreeInboundPacketProcessor::midProcess() {
// check if it's time to send a nack. If yes, do so
quint64 now = usecTimestampNow();
if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) {
_lastNackTime = now;
sendNackPackets();
}
}
void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
@ -211,6 +199,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() {
// if there are packets from _node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets.
if (hasPacketsToProcessFrom(nodeUUID)) {
i++;
continue;
}
@ -279,7 +268,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
int editsInPacket, quint64 processTime, quint64 lockWaitTime) {
const int UINT16_RANGE = UINT16_MAX + 1;
const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work
const int MAX_MISSING_SEQUENCE_SIZE = 100;
@ -287,9 +275,7 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
if (incomingSequence == expectedSequence) { // on time
_incomingLastSequence = incomingSequence;
}
else { // out of order
} else { // out of order
int incoming = (int)incomingSequence;
int expected = (int)expectedSequence;
@ -312,17 +298,13 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
// now that rollover has been corrected for (if it occurred), incoming and expected can be
// compared to each other directly, though one of them might be negative
if (incoming > expected) { // early
// add all sequence numbers that were skipped to the missing sequence numbers list
for (int missingSequence = expected; missingSequence < incoming; missingSequence++) {
_missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence);
}
_incomingLastSequence = incomingSequence;
} else { // late
// remove this from missing sequence number if it's in there
_missingSequenceNumbers.remove(incomingSequence);
@ -333,7 +315,6 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence,
// prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP
// will be removed.
if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) {
// some older sequence numbers may be from before a rollover point; this must be handled.
// some sequence numbers in this list may be larger than _incomingLastSequence, indicating that they were received
// before the most recent rollover.

View file

@ -77,9 +77,11 @@ protected:
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
virtual bool process();
virtual unsigned long getMaxWait() const;
virtual void preProcess();
virtual void midProcess();
public:
private:
int sendNackPackets();
private:

View file

@ -35,9 +35,10 @@ bool ReceivedPacketProcessor::process() {
if (_packets.size() == 0) {
_waitingOnPacketsMutex.lock();
_hasPackets.wait(&_waitingOnPacketsMutex);
_hasPackets.wait(&_waitingOnPacketsMutex, getMaxWait());
_waitingOnPacketsMutex.unlock();
}
preProcess();
while (_packets.size() > 0) {
lock(); // lock to make sure nothing changes on us
NetworkPacket& packet = _packets.front(); // get the oldest packet
@ -46,7 +47,9 @@ bool ReceivedPacketProcessor::process() {
_nodePacketCounts[temporary.getNode()->getUUID()]--;
unlock(); // let others add to the packets
processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy
midProcess();
}
postProcess();
return isStillRunning(); // keep running till they terminate us
}

View file

@ -63,6 +63,18 @@ protected:
/// Implements generic processing behavior for this thread.
virtual bool process();
/// Determines the timeout of the wait when there are no packets to process. Default value means no timeout
virtual unsigned long getMaxWait() const { return ULONG_MAX; }
/// Override to do work before the packets processing loop. Default does nothing.
virtual void preProcess() { }
/// Override to do work inside the packet processing loop after a packet is processed. Default does nothing.
virtual void midProcess() { }
/// Override to do work after the packets processing loop. Default does nothing.
virtual void postProcess() { }
virtual void terminating();
protected:

View file

@ -843,7 +843,7 @@ const char* OctreeSceneStats::getItemValue(Item item) {
void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
bool wasStatsPacket, int nodeClockSkewUsec) {
const bool wantExtraDebugging = true;
const bool wantExtraDebugging = false;
int numBytesPacketHeader = numBytesForPacketHeader(packet);
const unsigned char* dataAt = reinterpret_cast<const unsigned char*>(packet.data()) + numBytesPacketHeader;
@ -891,8 +891,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
if (wantExtraDebugging) {
qDebug() << "last packet duplicate got:" << sequence << "_incomingLastSequence:" << _incomingLastSequence;
}
}
else {
} else {
if (sequence != expected) {
if (wantExtraDebugging) {
qDebug() << "out of order... got:" << sequence << "expected:" << expected;
@ -967,8 +966,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
// only bump the last sequence if it was greater than our expected sequence, this will keep us from
// accidentally going backwards when an out of order (recovered) packet comes in
}
else { // sequenceInt > expectedInt
} else { // sequenceInt > expectedInt
// if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1]
// if rollover between them: sequenceInt in [0, UINT16_RANGE-1], expectedInt in [-UINT16_RANGE, -1]
@ -992,8 +990,7 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
_incomingLastSequence = sequence;
}
}
else { // sequence = expected
} else { // sequence = expected
_incomingLastSequence = sequence;
}