Merge pull request #3045 from wangyix/edit_nack_pull

Changed octree data NACK handling to repeatedly NACK missing seq numbers, like edit NACKs
This commit is contained in:
Brad Hefta-Gaub 2014-06-18 15:02:40 -07:00
commit f2b2784fa9
6 changed files with 56 additions and 72 deletions

View file

@ -184,7 +184,8 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns
int OctreeInboundPacketProcessor::sendNackPackets() { int OctreeInboundPacketProcessor::sendNackPackets() {
int packetsSent = 0; int packetsSent = 0;
char packet[MAX_PACKET_SIZE];
NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); NodeToSenderStatsMapIterator i = _singleSenderStats.begin();
while (i != _singleSenderStats.end()) { while (i != _singleSenderStats.end()) {
@ -206,15 +207,10 @@ int OctreeInboundPacketProcessor::sendNackPackets() {
const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID); const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID);
const QSet<unsigned short int>& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers(); const QSet<unsigned short int>& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers();
// check if there are any sequence numbers that need to be nacked
int numSequenceNumbersAvailable = missingSequenceNumbers.size();
// construct nack packet(s) for this node // construct nack packet(s) for this node
int numSequenceNumbersAvailable = missingSequenceNumbers.size();
QSet<unsigned short int>::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.begin(); QSet<unsigned short int>::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.constBegin();
char packet[MAX_PACKET_SIZE];
while (numSequenceNumbersAvailable > 0) { while (numSequenceNumbersAvailable > 0) {
char* dataAt = packet; char* dataAt = packet;
@ -243,8 +239,7 @@ int OctreeInboundPacketProcessor::sendNackPackets() {
numSequenceNumbersAvailable -= numSequenceNumbers; numSequenceNumbersAvailable -= numSequenceNumbers;
// send it // send it
qint64 bytesWritten = NodeList::getInstance()->writeUnverifiedDatagram(packet, dataAt - packet, destinationNode); NodeList::getInstance()->writeUnverifiedDatagram(packet, dataAt - packet, destinationNode);
packetsSent++; packetsSent++;
} }
i++; i++;

View file

@ -851,7 +851,6 @@ void OctreeServer::readPendingDatagrams() {
// If we got a nack packet, then we're talking to an agent, and we // If we got a nack packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList. // need to make sure we have it in our nodeList.
if (matchingNode) { if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData(); OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData) { if (nodeData) {
nodeData->parseNackPacket(receivedPacket); nodeData->parseNackPacket(receivedPacket);

View file

@ -2123,19 +2123,19 @@ void Application::updateMyAvatar(float deltaTime) {
const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND;
if (sinceLastNack > TOO_LONG_SINCE_LAST_NACK) { if (sinceLastNack > TOO_LONG_SINCE_LAST_NACK) {
_lastNackTime = now; _lastNackTime = now;
sendNack(); sendNackPackets();
} }
} }
} }
void Application::sendNack() { int Application::sendNackPackets() {
if (Menu::getInstance()->isOptionChecked(MenuOption::DisableNackPackets)) { if (Menu::getInstance()->isOptionChecked(MenuOption::DisableNackPackets)) {
return; return 0;
} }
int packetsSent = 0;
char packet[MAX_PACKET_SIZE]; char packet[MAX_PACKET_SIZE];
NodeList* nodeList = NodeList::getInstance();
// iterates thru all nodes in NodeList // iterates thru all nodes in NodeList
foreach(const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) { foreach(const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) {
@ -2146,14 +2146,14 @@ void Application::sendNack() {
|| node->getType() == NodeType::ModelServer) || node->getType() == NodeType::ModelServer)
) { ) {
// if there are octree packets from this node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets.
if (_octreeProcessor.hasPacketsToProcessFrom(node)) {
continue;
}
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
// if there are octree packets from this node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets.
if (_octreeProcessor.hasPacketsToProcessFrom(nodeUUID)) {
continue;
}
_octreeSceneStatsLock.lockForRead(); _octreeSceneStatsLock.lockForRead();
// retreive octree scene stats of this node // retreive octree scene stats of this node
@ -2163,40 +2163,48 @@ void Application::sendNack() {
} }
OctreeSceneStats& stats = _octreeServerSceneStats[nodeUUID]; OctreeSceneStats& stats = _octreeServerSceneStats[nodeUUID];
// check if there are any sequence numbers that need to be nacked // make copy of missing sequence numbers from stats
int numSequenceNumbersAvailable = stats.getNumSequenceNumbersToNack(); const QSet<OCTREE_PACKET_SEQUENCE> missingSequenceNumbers = stats.getMissingSequenceNumbers();
if (numSequenceNumbersAvailable == 0) {
_octreeSceneStatsLock.unlock();
continue;
}
char* dataAt = packet;
int bytesRemaining = MAX_PACKET_SIZE;
// pack header
int numBytesPacketHeader = populatePacketHeader(packet, PacketTypeOctreeDataNack);
dataAt += numBytesPacketHeader;
bytesRemaining -= numBytesPacketHeader;
int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(OCTREE_PACKET_SEQUENCE);
// calculate and pack the number of sequence numbers
uint16_t numSequenceNumbers = min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt;
*numSequenceNumbersAt = numSequenceNumbers;
dataAt += sizeof(uint16_t);
// pack sequence numbers
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE* sequenceNumberAt = (OCTREE_PACKET_SEQUENCE*)dataAt;
*sequenceNumberAt = stats.getNextSequenceNumberToNack();
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
}
_octreeSceneStatsLock.unlock(); _octreeSceneStatsLock.unlock();
nodeList->writeUnverifiedDatagram(packet, dataAt - packet, node); // construct nack packet(s) for this node
int numSequenceNumbersAvailable = missingSequenceNumbers.size();
QSet<OCTREE_PACKET_SEQUENCE>::const_iterator missingSequenceNumbersIterator = missingSequenceNumbers.constBegin();
while (numSequenceNumbersAvailable > 0) {
char* dataAt = packet;
int bytesRemaining = MAX_PACKET_SIZE;
// pack header
int numBytesPacketHeader = populatePacketHeader(packet, PacketTypeOctreeDataNack);
dataAt += numBytesPacketHeader;
bytesRemaining -= numBytesPacketHeader;
// calculate and pack the number of sequence numbers
int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(OCTREE_PACKET_SEQUENCE);
uint16_t numSequenceNumbers = min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor);
uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt;
*numSequenceNumbersAt = numSequenceNumbers;
dataAt += sizeof(uint16_t);
// pack sequence numbers
for (int i = 0; i < numSequenceNumbers; i++) {
OCTREE_PACKET_SEQUENCE* sequenceNumberAt = (OCTREE_PACKET_SEQUENCE*)dataAt;
*sequenceNumberAt = *missingSequenceNumbersIterator;
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
missingSequenceNumbersIterator++;
}
numSequenceNumbersAvailable -= numSequenceNumbers;
// send it
NodeList::getInstance()->writeUnverifiedDatagram(packet, dataAt - packet, node);
packetsSent++;
}
} }
} }
return packetsSent;
} }
void Application::queryOctree(NodeType_t serverType, PacketType packetType, NodeToJurisdictionMap& jurisdictions) { void Application::queryOctree(NodeType_t serverType, PacketType packetType, NodeToJurisdictionMap& jurisdictions) {

View file

@ -413,7 +413,7 @@ private:
static void attachNewHeadToNode(Node *newNode); static void attachNewHeadToNode(Node *newNode);
static void* networkReceive(void* args); // network receive thread static void* networkReceive(void* args); // network receive thread
void sendNack(); int sendNackPackets();
MainWindow* _window; MainWindow* _window;
GLCanvas* _glWidget; // our GLCanvas has a couple extra features GLCanvas* _glWidget; // our GLCanvas has a couple extra features

View file

@ -47,7 +47,6 @@ OctreeSceneStats::OctreeSceneStats() :
_incomingReallyLate(0), _incomingReallyLate(0),
_incomingPossibleDuplicate(0), _incomingPossibleDuplicate(0),
_missingSequenceNumbers(), _missingSequenceNumbers(),
_sequenceNumbersToNack(),
_incomingFlightTimeAverage(samples), _incomingFlightTimeAverage(samples),
_jurisdictionRoot(NULL) _jurisdictionRoot(NULL)
{ {
@ -160,7 +159,6 @@ void OctreeSceneStats::copyFromOther(const OctreeSceneStats& other) {
_incomingPossibleDuplicate = other._incomingPossibleDuplicate; _incomingPossibleDuplicate = other._incomingPossibleDuplicate;
_missingSequenceNumbers = other._missingSequenceNumbers; _missingSequenceNumbers = other._missingSequenceNumbers;
_sequenceNumbersToNack = other._sequenceNumbersToNack;
} }
@ -946,7 +944,6 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
qDebug() << "found it in _missingSequenceNumbers"; qDebug() << "found it in _missingSequenceNumbers";
} }
_missingSequenceNumbers.remove(sequence); _missingSequenceNumbers.remove(sequence);
_sequenceNumbersToNack.remove(sequence);
_incomingLikelyLost--; _incomingLikelyLost--;
_incomingRecovered++; _incomingRecovered++;
} }
@ -986,7 +983,6 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
for (int missingSequenceInt = expectedInt; missingSequenceInt < sequenceInt; missingSequenceInt++) { for (int missingSequenceInt = expectedInt; missingSequenceInt < sequenceInt; missingSequenceInt++) {
OCTREE_PACKET_SEQUENCE missingSequence = missingSequenceInt >= 0 ? missingSequenceInt : missingSequenceInt + UINT16_RANGE; OCTREE_PACKET_SEQUENCE missingSequence = missingSequenceInt >= 0 ? missingSequenceInt : missingSequenceInt + UINT16_RANGE;
_missingSequenceNumbers << missingSequence; _missingSequenceNumbers << missingSequence;
_sequenceNumbersToNack << missingSequence;
} }
_incomingLastSequence = sequence; _incomingLastSequence = sequence;
@ -1025,7 +1021,6 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
qDebug() << "pruning really old missing sequence:" << missingItem; qDebug() << "pruning really old missing sequence:" << missingItem;
} }
_missingSequenceNumbers.remove(missingItem); _missingSequenceNumbers.remove(missingItem);
_sequenceNumbersToNack.remove(missingItem);
} }
} }
} }
@ -1037,14 +1032,3 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet,
_incomingWastedBytes += (MAX_PACKET_SIZE - packet.size()); _incomingWastedBytes += (MAX_PACKET_SIZE - packet.size());
} }
} }
int OctreeSceneStats::getNumSequenceNumbersToNack() const {
return _sequenceNumbersToNack.size();
}
uint16_t OctreeSceneStats::getNextSequenceNumberToNack() {
QSet<uint16_t>::Iterator it = _sequenceNumbersToNack.begin();
uint16_t sequenceNumber = *it;
_sequenceNumbersToNack.remove(sequenceNumber);
return sequenceNumber;
}

View file

@ -172,9 +172,8 @@ public:
quint32 getIncomingReallyLate() const { return _incomingReallyLate; } quint32 getIncomingReallyLate() const { return _incomingReallyLate; }
quint32 getIncomingPossibleDuplicate() const { return _incomingPossibleDuplicate; } quint32 getIncomingPossibleDuplicate() const { return _incomingPossibleDuplicate; }
float getIncomingFlightTimeAverage() { return _incomingFlightTimeAverage.getAverage(); } float getIncomingFlightTimeAverage() { return _incomingFlightTimeAverage.getAverage(); }
int getNumSequenceNumbersToNack() const; const QSet<OCTREE_PACKET_SEQUENCE>& getMissingSequenceNumbers() const { return _missingSequenceNumbers; }
OCTREE_PACKET_SEQUENCE getNextSequenceNumberToNack();
private: private:
@ -277,7 +276,6 @@ private:
quint32 _incomingReallyLate; /// out of order and later than MAX_MISSING_SEQUENCE_OLD_AGE late quint32 _incomingReallyLate; /// out of order and later than MAX_MISSING_SEQUENCE_OLD_AGE late
quint32 _incomingPossibleDuplicate; /// out of order possibly a duplicate quint32 _incomingPossibleDuplicate; /// out of order possibly a duplicate
QSet<OCTREE_PACKET_SEQUENCE> _missingSequenceNumbers; QSet<OCTREE_PACKET_SEQUENCE> _missingSequenceNumbers;
QSet<OCTREE_PACKET_SEQUENCE> _sequenceNumbersToNack;
SimpleMovingAverage _incomingFlightTimeAverage; SimpleMovingAverage _incomingFlightTimeAverage;
// features related items // features related items