From 2b20720f51578f365685845b1f835895b50535de Mon Sep 17 00:00:00 2001 From: wangyix Date: Mon, 16 Jun 2014 09:57:05 -0700 Subject: [PATCH] added sendNackPackets() to OctreeInboundPacketProcessor added rollover handling in _missingSequenceNumbers pruning; added EditNack packet types; added getMyEditNackType() to OctreeServer subclasses; added code to randomly skip edit packet sequence numbers for testing in OctreeEditPacketSender --- assignment-client/src/models/ModelServer.h | 1 + .../octree/OctreeInboundPacketProcessor.cpp | 111 ++++++++++++++++-- .../src/octree/OctreeInboundPacketProcessor.h | 11 +- assignment-client/src/octree/OctreeServer.cpp | 6 +- assignment-client/src/octree/OctreeServer.h | 1 + .../src/particles/ParticleServer.h | 1 + assignment-client/src/voxels/VoxelServer.h | 1 + libraries/networking/src/PacketHeaders.h | 5 +- .../networking/src/ReceivedPacketProcessor.h | 6 +- .../octree/src/OctreeEditPacketSender.cpp | 7 ++ libraries/shared/src/GenericThread.h | 2 +- 11 files changed, 133 insertions(+), 19 deletions(-) diff --git a/assignment-client/src/models/ModelServer.h b/assignment-client/src/models/ModelServer.h index 38acc7f1e1..8a26f07f11 100644 --- a/assignment-client/src/models/ModelServer.h +++ b/assignment-client/src/models/ModelServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return MODEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return MODEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_MODELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeModelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 931814d4f8..683fe188b4 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -39,7 +39,6 @@ void OctreeInboundPacketProcessor::resetStats() { _singleSenderStats.clear(); } - void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { bool debugProcessPacket = _myServer->wantsVerboseDebug(); @@ -150,6 +149,76 @@ void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, uns } } +int OctreeInboundPacketProcessor::sendNackPackets() { + + printf("\t\t sendNackPackets()\n"); + + int packetsSent = 0; + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. + + NodeToSenderStatsMap::const_iterator begin = _singleSenderStats.begin(), end = _singleSenderStats.end(); + for (NodeToSenderStatsMap::const_iterator i = begin; i != end; i++) { + + QUuid nodeUUID = i.key(); + SingleSenderStats nodeStats = i.value(); + + if (hasPacketsToProcessFrom(nodeUUID)) { + continue; + } + + const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID); + const QSet& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers(); + + // check if there are any sequence numbers that need to be nacked + int numSequenceNumbersAvailable = missingSequenceNumbers.size(); + + // construct nack packet(s) for this node + + QSet::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.begin(); + char packet[MAX_PACKET_SIZE]; + + while (numSequenceNumbersAvailable > 0) { + + char* dataAt = packet; + int bytesRemaining = MAX_PACKET_SIZE; + + // pack header + int numBytesPacketHeader = populatePacketHeader(packet, _myServer->getMyEditNackType()); + dataAt += numBytesPacketHeader; + bytesRemaining -= numBytesPacketHeader; + + // calculate and pack the number of sequence numbers to nack + int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int); + uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor); + uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt; + *numSequenceNumbersAt = numSequenceNumbers; + dataAt += sizeof(uint16_t); + + // pack sequence numbers to nack + printf("\t\t sending NACK with %d seq numbers:\n\t\t", numSequenceNumbers); + for (uint16_t i = 0; i < numSequenceNumbers; i++) { + unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; + *sequenceNumberAt = *missingSequenceNumberIterator; + dataAt += sizeof(unsigned short int); + printf("%d, ", *missingSequenceNumberIterator); + + missingSequenceNumberIterator++; + } + numSequenceNumbersAvailable -= numSequenceNumbers; + + // send it + qint64 bytesWritten = NodeList::getInstance()->writeDatagram(packet, dataAt - packet, destinationNode); + printf("\t\t wrote %lld bytes\n\n", bytesWritten); + + packetsSent++; + } + } + return packetsSent; +} + + SingleSenderStats::SingleSenderStats() : _totalTransitTime(0), _totalProcessTime(0), @@ -165,17 +234,19 @@ SingleSenderStats::SingleSenderStats() void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { - const int MAX_REASONABLE_SEQUENCE_GAP = 1000; +printf("\t\t tracked seq %d\n", incomingSequence); + + const int UINT16_RANGE = 65536; + + const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work const int MAX_MISSING_SEQUENCE_SIZE = 100; unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + 1; - if (incomingSequence == expectedSequence) { // on time + if (incomingSequence == expectedSequence) { // on time _incomingLastSequence = incomingSequence; } - else { // out of order - - const int UINT16_RANGE = 65536; + else { // out of order int incoming = (int)incomingSequence; int expected = (int)expectedSequence; @@ -202,6 +273,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, if (incoming > expected) { // early + printf("\t\t\t packet is early! %d packets were skipped\n", incoming - expected); + // add all sequence numbers that were skipped to the missing sequence numbers list for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); @@ -210,6 +283,8 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, } else { // late + printf("\t\t\t packet is late!\n"); + // remove this from missing sequence number if it's in there _missingSequenceNumbers.remove(incomingSequence); @@ -217,11 +292,27 @@ void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, } } - // prune missing sequence list if it gets too big + // prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP + // will be removed. if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { - foreach(unsigned short int missingSequence, _missingSequenceNumbers) { - if (missingSequence <= std::max(0, _incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP)) { - _missingSequenceNumbers.remove(missingSequence); + + // the acceptable range of older sequence numbers may contain a rollover point; this must be handled. + // some sequence number in this list may be larger than _incomingLastSequence, indicating that they were received + // before the most recent rollover. + int cutoff = (int)_incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP; + if (cutoff >= 0) { + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + unsigned short int nonRolloverCutoff = (unsigned short int)cutoff; + if (missingSequence > _incomingLastSequence || missingSequence <= nonRolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } else { + unsigned short int rolloverCutoff = (unsigned short int)(cutoff + UINT16_RANGE); + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + if (missingSequence > _incomingLastSequence && missingSequence <= rolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } } } } diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index 82471bcddf..9649da2d61 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -32,6 +32,7 @@ public: { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } quint64 getAverageLockWaitTimePerElement() const { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } + const QSet& getMissingSequenceNumbers() const { return _missingSequenceNumbers; } void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime); @@ -47,8 +48,9 @@ public: QSet _missingSequenceNumbers; }; -typedef std::map NodeToSenderStatsMap; -typedef std::map::iterator NodeToSenderStatsMapIterator; +typedef QHash NodeToSenderStatsMap; +typedef QHash::iterator NodeToSenderStatsMapIterator; +typedef QHash::const_iterator NodeToSenderStatsMapConstIterator; /// Handles processing of incoming network packets for the voxel-server. As with other ReceivedPacketProcessor classes @@ -75,6 +77,9 @@ public: protected: virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); +public slots: + int sendNackPackets(); + private: void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); @@ -87,7 +92,7 @@ private: quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; - + NodeToSenderStatsMap _singleSenderStats; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index cb60f0816e..39f51a0ba9 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -648,10 +648,10 @@ bool OctreeServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url int senderNumber = 0; NodeToSenderStatsMap& allSenderStats = _octreeInboundPacketProcessor->getSingleSenderStats(); - for (NodeToSenderStatsMapIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { + for (NodeToSenderStatsMapConstIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { senderNumber++; - QUuid senderID = i->first; - SingleSenderStats& senderStats = i->second; + QUuid senderID = i.key(); + const SingleSenderStats& senderStats = i.value(); statsString += QString("\r\n Stats for sender %1 uuid: %2\r\n") .arg(senderNumber).arg(senderID.toString()); diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 5595d139be..76b39c5771 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -68,6 +68,7 @@ public: virtual const char* getMyServerName() const = 0; virtual const char* getMyLoggingServerTargetName() const = 0; virtual const char* getMyDefaultPersistFilename() const = 0; + virtual PacketType getMyEditNackType() const = 0; // subclass may implement these method virtual void beforeRun() { }; diff --git a/assignment-client/src/particles/ParticleServer.h b/assignment-client/src/particles/ParticleServer.h index d444368a9d..0b379a903e 100644 --- a/assignment-client/src/particles/ParticleServer.h +++ b/assignment-client/src/particles/ParticleServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return PARTICLE_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return PARTICLE_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_PARTICLES_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeParticleEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/voxels/VoxelServer.h b/assignment-client/src/voxels/VoxelServer.h index b13f83b65f..fadcca2d19 100644 --- a/assignment-client/src/voxels/VoxelServer.h +++ b/assignment-client/src/voxels/VoxelServer.h @@ -42,6 +42,7 @@ public: virtual const char* getMyServerName() const { return VOXEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return VOXEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_VOXELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeVoxelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 8ac5333d10..5ed4110627 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,7 +66,10 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, - PacketTypeOctreeDataNack + PacketTypeOctreeDataNack, // 45 + PacketTypeVoxelEditNack, + PacketTypeParticleEditNack, + PacketTypeModelEditNack, }; typedef char PacketVersion; diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 94ad2d9c41..31cbc3a487 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -35,7 +35,11 @@ public: /// Are there received packets waiting to be processed from a certain node bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { - return _nodePacketCounts[sendingNode->getUUID()] > 0; + return hasPacketsToProcessFrom(sendingNode->getUUID()); + } + + bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const { + return _nodePacketCounts[nodeUUID] > 0; } /// How many received packets waiting are to be processed diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 898c41de08..da1438136b 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -308,7 +308,14 @@ void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, Pa unsigned short int* sequenceAt = (unsigned short int*)&packetBuffer._currentBuffer[packetBuffer._currentSize]; *sequenceAt = _sequenceNumber; packetBuffer._currentSize += sizeof(unsigned short int); // nudge past sequence + if (randFloat() < 0.6f) _sequenceNumber++; + else + { + int x = randIntInRange(2, 4); + printf("\t\t seq number jumped from %d to %d\n", _sequenceNumber, _sequenceNumber + x); + _sequenceNumber += x; + } // pack in timestamp quint64 now = usecTimestampNow(); diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index bbb01894ed..b2c0eb13db 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -57,7 +57,7 @@ protected: bool isStillRunning() const { return !_stopThread; } -private: +protected: QMutex _mutex; bool _stopThread;