diff --git a/assignment-client/src/models/ModelServer.h b/assignment-client/src/models/ModelServer.h index 38acc7f1e1..8a26f07f11 100644 --- a/assignment-client/src/models/ModelServer.h +++ b/assignment-client/src/models/ModelServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return MODEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return MODEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_MODELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeModelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index 545c502036..78ab9259fd 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -9,6 +9,7 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include #include #include @@ -17,6 +18,7 @@ #include "OctreeInboundPacketProcessor.h" static QUuid DEFAULT_NODE_ID_REF; +const quint64 TOO_LONG_SINCE_LAST_NACK = 1 * USECS_PER_SECOND; OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServer) : _myServer(myServer), @@ -25,7 +27,8 @@ OctreeInboundPacketProcessor::OctreeInboundPacketProcessor(OctreeServer* myServe _totalProcessTime(0), _totalLockWaitTime(0), _totalElementsInPacket(0), - _totalPackets(0) + _totalPackets(0), + _lastNackTime(usecTimestampNow()) { } @@ -35,10 +38,38 @@ void OctreeInboundPacketProcessor::resetStats() { _totalLockWaitTime = 0; _totalElementsInPacket = 0; _totalPackets = 0; + _lastNackTime = usecTimestampNow(); _singleSenderStats.clear(); } +unsigned long OctreeInboundPacketProcessor::getMaxWait() const { + // calculate time until next sendNackPackets() + quint64 nextNackTime = _lastNackTime + TOO_LONG_SINCE_LAST_NACK; + quint64 now = usecTimestampNow(); + if (now >= nextNackTime) { + return 0; + } + return (nextNackTime - now) / USECS_PER_MSEC + 1; +} + +void OctreeInboundPacketProcessor::preProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} + +void OctreeInboundPacketProcessor::midProcess() { + // check if it's time to send a nack. If yes, do so + quint64 now = usecTimestampNow(); + if (now - _lastNackTime >= TOO_LONG_SINCE_LAST_NACK) { + _lastNackTime = now; + sendNackPackets(); + } +} void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { @@ -123,13 +154,13 @@ void OctreeInboundPacketProcessor::processPacket(const SharedNodePointer& sendin qDebug() << "sender has no known nodeUUID."; } } - trackInboundPackets(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); + trackInboundPacket(nodeUUID, sequence, transitTime, editsInPacket, processTime, lockWaitTime); } else { qDebug("unknown packet ignored... packetType=%d", packetType); } } -void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, +void OctreeInboundPacketProcessor::trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int editsInPacket, quint64 processTime, quint64 lockWaitTime) { _totalTransitTime += transitTime; @@ -142,31 +173,174 @@ void OctreeInboundPacketProcessor::trackInboundPackets(const QUuid& nodeUUID, in // see if this is the first we've heard of this node... if (_singleSenderStats.find(nodeUUID) == _singleSenderStats.end()) { SingleSenderStats stats; - - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; - + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); _singleSenderStats[nodeUUID] = stats; } else { SingleSenderStats& stats = _singleSenderStats[nodeUUID]; - stats._totalTransitTime += transitTime; - stats._totalProcessTime += processTime; - stats._totalLockWaitTime += lockWaitTime; - stats._totalElementsInPacket += editsInPacket; - stats._totalPackets++; + stats.trackInboundPacket(sequence, transitTime, editsInPacket, processTime, lockWaitTime); } } +int OctreeInboundPacketProcessor::sendNackPackets() { -SingleSenderStats::SingleSenderStats() { - _totalTransitTime = 0; - _totalProcessTime = 0; - _totalLockWaitTime = 0; - _totalElementsInPacket = 0; - _totalPackets = 0; + int packetsSent = 0; + + NodeToSenderStatsMapIterator i = _singleSenderStats.begin(); + while (i != _singleSenderStats.end()) { + + QUuid nodeUUID = i.key(); + SingleSenderStats nodeStats = i.value(); + + // check if this node is still alive. Remove its stats if it's dead. + if (!isAlive(nodeUUID)) { + i = _singleSenderStats.erase(i); + continue; + } + + // if there are packets from _node that are waiting to be processed, + // don't send a NACK since the missing packets may be among those waiting packets. + if (hasPacketsToProcessFrom(nodeUUID)) { + i++; + continue; + } + + const SharedNodePointer& destinationNode = NodeList::getInstance()->getNodeHash().value(nodeUUID); + const QSet& missingSequenceNumbers = nodeStats.getMissingSequenceNumbers(); + + // check if there are any sequence numbers that need to be nacked + int numSequenceNumbersAvailable = missingSequenceNumbers.size(); + + // construct nack packet(s) for this node + + QSet::const_iterator missingSequenceNumberIterator = missingSequenceNumbers.begin(); + char packet[MAX_PACKET_SIZE]; + + while (numSequenceNumbersAvailable > 0) { + + char* dataAt = packet; + int bytesRemaining = MAX_PACKET_SIZE; + + // pack header + int numBytesPacketHeader = populatePacketHeader(packet, _myServer->getMyEditNackType()); + dataAt += numBytesPacketHeader; + bytesRemaining -= numBytesPacketHeader; + + // calculate and pack the number of sequence numbers to nack + int numSequenceNumbersRoomFor = (bytesRemaining - sizeof(uint16_t)) / sizeof(unsigned short int); + uint16_t numSequenceNumbers = std::min(numSequenceNumbersAvailable, numSequenceNumbersRoomFor); + uint16_t* numSequenceNumbersAt = (uint16_t*)dataAt; + *numSequenceNumbersAt = numSequenceNumbers; + dataAt += sizeof(uint16_t); + + // pack sequence numbers to nack + for (uint16_t i = 0; i < numSequenceNumbers; i++) { + unsigned short int* sequenceNumberAt = (unsigned short int*)dataAt; + *sequenceNumberAt = *missingSequenceNumberIterator; + dataAt += sizeof(unsigned short int); + + missingSequenceNumberIterator++; + } + numSequenceNumbersAvailable -= numSequenceNumbers; + + // send it + qint64 bytesWritten = NodeList::getInstance()->writeUnverifiedDatagram(packet, dataAt - packet, destinationNode); + + packetsSent++; + } + i++; + } + return packetsSent; } +SingleSenderStats::SingleSenderStats() + : _totalTransitTime(0), + _totalProcessTime(0), + _totalLockWaitTime(0), + _totalElementsInPacket(0), + _totalPackets(0), + _incomingLastSequence(0), + _missingSequenceNumbers() +{ + +} + +void SingleSenderStats::trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime) { + + const int UINT16_RANGE = std::numeric_limits::max() + 1; + const int MAX_REASONABLE_SEQUENCE_GAP = 1000; // this must be less than UINT16_RANGE / 2 for rollover handling to work + const int MAX_MISSING_SEQUENCE_SIZE = 100; + + unsigned short int expectedSequence = _totalPackets == 0 ? incomingSequence : _incomingLastSequence + (unsigned short int)1; + + if (incomingSequence == expectedSequence) { // on time + _incomingLastSequence = incomingSequence; + } else { // out of order + int incoming = (int)incomingSequence; + int expected = (int)expectedSequence; + + // check if the gap between incoming and expected is reasonable, taking possible rollover into consideration + int absGap = std::abs(incoming - expected); + if (absGap >= UINT16_RANGE - MAX_REASONABLE_SEQUENCE_GAP) { + // rollover likely occurred between incoming and expected. + // correct the larger of the two so that it's within [-UINT16_RANGE, -1] while the other remains within [0, UINT16_RANGE-1] + if (incoming > expected) { + incoming -= UINT16_RANGE; + } else { + expected -= UINT16_RANGE; + } + } else if (absGap > MAX_REASONABLE_SEQUENCE_GAP) { + // ignore packet if gap is unreasonable + qDebug() << "ignoring unreasonable packet... sequence:" << incomingSequence + << "_incomingLastSequence:" << _incomingLastSequence; + return; + } + + // now that rollover has been corrected for (if it occurred), incoming and expected can be + // compared to each other directly, though one of them might be negative + if (incoming > expected) { // early + // add all sequence numbers that were skipped to the missing sequence numbers list + for (int missingSequence = expected; missingSequence < incoming; missingSequence++) { + _missingSequenceNumbers.insert(missingSequence < 0 ? missingSequence + UINT16_RANGE : missingSequence); + } + _incomingLastSequence = incomingSequence; + } else { // late + // remove this from missing sequence number if it's in there + _missingSequenceNumbers.remove(incomingSequence); + + // do not update _incomingLastSequence; it shouldn't become smaller + } + } + + // prune missing sequence list if it gets too big; sequence numbers that are older than MAX_REASONABLE_SEQUENCE_GAP + // will be removed. + if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE_SIZE) { + // some older sequence numbers may be from before a rollover point; this must be handled. + // some sequence numbers in this list may be larger than _incomingLastSequence, indicating that they were received + // before the most recent rollover. + int cutoff = (int)_incomingLastSequence - MAX_REASONABLE_SEQUENCE_GAP; + if (cutoff >= 0) { + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + unsigned short int nonRolloverCutoff = (unsigned short int)cutoff; + if (missingSequence > _incomingLastSequence || missingSequence <= nonRolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } else { + unsigned short int rolloverCutoff = (unsigned short int)(cutoff + UINT16_RANGE); + foreach(unsigned short int missingSequence, _missingSequenceNumbers) { + if (missingSequence > _incomingLastSequence && missingSequence <= rolloverCutoff) { + _missingSequenceNumbers.remove(missingSequence); + } + } + } + } + + // update other stats + _totalTransitTime += transitTime; + _totalProcessTime += processTime; + _totalLockWaitTime += lockWaitTime; + _totalElementsInPacket += editsInPacket; + _totalPackets++; +} diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.h b/assignment-client/src/octree/OctreeInboundPacketProcessor.h index f637a9e7c9..46a57205cb 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.h +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.h @@ -32,16 +32,24 @@ public: { return _totalElementsInPacket == 0 ? 0 : _totalProcessTime / _totalElementsInPacket; } quint64 getAverageLockWaitTimePerElement() const { return _totalElementsInPacket == 0 ? 0 : _totalLockWaitTime / _totalElementsInPacket; } - + const QSet& getMissingSequenceNumbers() const { return _missingSequenceNumbers; } + + void trackInboundPacket(unsigned short int incomingSequence, quint64 transitTime, + int editsInPacket, quint64 processTime, quint64 lockWaitTime); + quint64 _totalTransitTime; quint64 _totalProcessTime; quint64 _totalLockWaitTime; quint64 _totalElementsInPacket; quint64 _totalPackets; + + unsigned short int _incomingLastSequence; + QSet _missingSequenceNumbers; }; -typedef std::map NodeToSenderStatsMap; -typedef std::map::iterator NodeToSenderStatsMapIterator; +typedef QHash NodeToSenderStatsMap; +typedef QHash::iterator NodeToSenderStatsMapIterator; +typedef QHash::const_iterator NodeToSenderStatsMapConstIterator; /// Handles processing of incoming network packets for the voxel-server. As with other ReceivedPacketProcessor classes @@ -66,10 +74,18 @@ public: NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; } protected: + virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); + virtual unsigned long getMaxWait() const; + virtual void preProcess(); + virtual void midProcess(); + private: - void trackInboundPackets(const QUuid& nodeUUID, int sequence, quint64 transitTime, + int sendNackPackets(); + +private: + void trackInboundPacket(const QUuid& nodeUUID, unsigned short int sequence, quint64 transitTime, int voxelsInPacket, quint64 processTime, quint64 lockWaitTime); OctreeServer* _myServer; @@ -82,5 +98,7 @@ private: quint64 _totalPackets; NodeToSenderStatsMap _singleSenderStats; + + quint64 _lastNackTime; }; #endif // hifi_OctreeInboundPacketProcessor_h diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index cb60f0816e..c6ef4aa5aa 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -648,10 +648,10 @@ bool OctreeServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url int senderNumber = 0; NodeToSenderStatsMap& allSenderStats = _octreeInboundPacketProcessor->getSingleSenderStats(); - for (NodeToSenderStatsMapIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { + for (NodeToSenderStatsMapConstIterator i = allSenderStats.begin(); i != allSenderStats.end(); i++) { senderNumber++; - QUuid senderID = i->first; - SingleSenderStats& senderStats = i->second; + QUuid senderID = i.key(); + const SingleSenderStats& senderStats = i.value(); statsString += QString("\r\n Stats for sender %1 uuid: %2\r\n") .arg(senderNumber).arg(senderID.toString()); @@ -1060,6 +1060,9 @@ void OctreeServer::nodeAdded(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) { quint64 start = usecTimestampNow(); + // calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!! + _octreeInboundPacketProcessor->nodeKilled(node); + qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node; OctreeQueryNode* nodeData = static_cast(node->getLinkedData()); if (nodeData) { diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 5595d139be..76b39c5771 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -68,6 +68,7 @@ public: virtual const char* getMyServerName() const = 0; virtual const char* getMyLoggingServerTargetName() const = 0; virtual const char* getMyDefaultPersistFilename() const = 0; + virtual PacketType getMyEditNackType() const = 0; // subclass may implement these method virtual void beforeRun() { }; diff --git a/assignment-client/src/octree/SentPacketHistory.cpp b/assignment-client/src/octree/SentPacketHistory.cpp deleted file mode 100644 index 0ea7fd8b69..0000000000 --- a/assignment-client/src/octree/SentPacketHistory.cpp +++ /dev/null @@ -1,44 +0,0 @@ -// -// SentPacketHistory.cpp -// assignement-client/src/octree -// -// Created by Yixin Wang on 6/5/2014 -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include "SentPacketHistory.h" - -SentPacketHistory::SentPacketHistory(int size) - : _sentPackets(size), - _newestPacketAt(0), - _numExistingPackets(0), - _newestSequenceNumber(0) -{ -} - -void SentPacketHistory::packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet) { - _newestSequenceNumber = sequenceNumber; - - // increment _newestPacketAt cyclically, insert new packet there. - // this will overwrite the oldest packet in the buffer - _newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1; - _sentPackets[_newestPacketAt] = packet; - if (_numExistingPackets < _sentPackets.size()) { - _numExistingPackets++; - } -} - - -const QByteArray* SentPacketHistory::getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const { - OCTREE_PACKET_SEQUENCE seqDiff = _newestSequenceNumber - sequenceNumber; - if (!(seqDiff >= 0 && seqDiff < _numExistingPackets)) { - return NULL; - } - int packetAt = _newestPacketAt - seqDiff; - if (packetAt < 0) { - packetAt += _sentPackets.size(); - } - return &_sentPackets.at(packetAt); -} diff --git a/assignment-client/src/particles/ParticleServer.h b/assignment-client/src/particles/ParticleServer.h index d444368a9d..0b379a903e 100644 --- a/assignment-client/src/particles/ParticleServer.h +++ b/assignment-client/src/particles/ParticleServer.h @@ -33,6 +33,7 @@ public: virtual const char* getMyServerName() const { return PARTICLE_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return PARTICLE_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_PARTICLES_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeParticleEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/assignment-client/src/voxels/VoxelServer.h b/assignment-client/src/voxels/VoxelServer.h index b13f83b65f..fadcca2d19 100644 --- a/assignment-client/src/voxels/VoxelServer.h +++ b/assignment-client/src/voxels/VoxelServer.h @@ -42,6 +42,7 @@ public: virtual const char* getMyServerName() const { return VOXEL_SERVER_NAME; } virtual const char* getMyLoggingServerTargetName() const { return VOXEL_SERVER_LOGGING_TARGET_NAME; } virtual const char* getMyDefaultPersistFilename() const { return LOCAL_VOXELS_PERSIST_FILE; } + virtual PacketType getMyEditNackType() const { return PacketTypeVoxelEditNack; } // subclass may implement these method virtual void beforeRun(); diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 5507089798..afd6793edf 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -3270,10 +3270,16 @@ void Application::nodeAdded(SharedNodePointer node) { void Application::nodeKilled(SharedNodePointer node) { - // this is here because connecting NodeList::nodeKilled to OctreePacketProcessor::nodeKilled doesn't work: - // OctreePacketProcessor::nodeKilled is not called when NodeList::nodeKilled is emitted for some reason. + // These are here because connecting NodeList::nodeKilled to OctreePacketProcessor::nodeKilled doesn't work: + // OctreePacketProcessor::nodeKilled is not being called when NodeList::nodeKilled is emitted. + // This may have to do with GenericThread::threadRoutine() blocking the QThread event loop + _octreeProcessor.nodeKilled(node); + _voxelEditSender.nodeKilled(node); + _particleEditSender.nodeKilled(node); + _modelEditSender.nodeKilled(node); + if (node->getType() == NodeType::VoxelServer) { QUuid nodeUUID = node->getUUID(); // see if this is the first we've heard of this node... diff --git a/interface/src/DatagramProcessor.cpp b/interface/src/DatagramProcessor.cpp index cdd7e7ef0f..29528da126 100644 --- a/interface/src/DatagramProcessor.cpp +++ b/interface/src/DatagramProcessor.cpp @@ -145,6 +145,15 @@ void DatagramProcessor::processDatagrams() { } break; } + case PacketTypeVoxelEditNack: + application->_voxelEditSender.processNackPacket(incomingPacket); + break; + case PacketTypeParticleEditNack: + application->_particleEditSender.processNackPacket(incomingPacket); + break; + case PacketTypeModelEditNack: + application->_modelEditSender.processNackPacket(incomingPacket); + break; default: nodeList->processNodeData(senderSockAddr, incomingPacket); break; diff --git a/libraries/networking/src/PacketHeaders.h b/libraries/networking/src/PacketHeaders.h index 8ac5333d10..0f87b0e607 100644 --- a/libraries/networking/src/PacketHeaders.h +++ b/libraries/networking/src/PacketHeaders.h @@ -66,7 +66,10 @@ enum PacketType { PacketTypeModelAddOrEdit, PacketTypeModelErase, PacketTypeModelAddResponse, - PacketTypeOctreeDataNack + PacketTypeOctreeDataNack, // 45 + PacketTypeVoxelEditNack, + PacketTypeParticleEditNack, + PacketTypeModelEditNack, }; typedef char PacketVersion; @@ -76,7 +79,7 @@ const QSet NON_VERIFIED_PACKETS = QSet() << PacketTypeDomainList << PacketTypeDomainListRequest << PacketTypeDomainOAuthRequest << PacketTypeCreateAssignment << PacketTypeRequestAssignment << PacketTypeStunResponse << PacketTypeNodeJsonStats << PacketTypeVoxelQuery << PacketTypeParticleQuery << PacketTypeModelQuery - << PacketTypeOctreeDataNack; + << PacketTypeOctreeDataNack << PacketTypeVoxelEditNack << PacketTypeParticleEditNack << PacketTypeModelEditNack; const int NUM_BYTES_MD5_HASH = 16; const int NUM_STATIC_HEADER_BYTES = sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID; diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index 3ef518bbc2..59e1ecd456 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -35,9 +35,10 @@ bool ReceivedPacketProcessor::process() { if (_packets.size() == 0) { _waitingOnPacketsMutex.lock(); - _hasPackets.wait(&_waitingOnPacketsMutex); + _hasPackets.wait(&_waitingOnPacketsMutex, getMaxWait()); _waitingOnPacketsMutex.unlock(); } + preProcess(); while (_packets.size() > 0) { lock(); // lock to make sure nothing changes on us NetworkPacket& packet = _packets.front(); // get the oldest packet @@ -46,7 +47,9 @@ bool ReceivedPacketProcessor::process() { _nodePacketCounts[temporary.getNode()->getUUID()]--; unlock(); // let others add to the packets processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy + midProcess(); } + postProcess(); return isStillRunning(); // keep running till they terminate us } diff --git a/libraries/networking/src/ReceivedPacketProcessor.h b/libraries/networking/src/ReceivedPacketProcessor.h index 94ad2d9c41..607f9e54c2 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.h +++ b/libraries/networking/src/ReceivedPacketProcessor.h @@ -33,9 +33,19 @@ public: /// Are there received packets waiting to be processed bool hasPacketsToProcess() const { return _packets.size() > 0; } - /// Are there received packets waiting to be processed from a certain node + /// Is a specified node still alive? + bool isAlive(const QUuid& nodeUUID) const { + return _nodePacketCounts.contains(nodeUUID); + } + + /// Are there received packets waiting to be processed from a specified node bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const { - return _nodePacketCounts[sendingNode->getUUID()] > 0; + return hasPacketsToProcessFrom(sendingNode->getUUID()); + } + + /// Are there received packets waiting to be processed from a specified node + bool hasPacketsToProcessFrom(const QUuid& nodeUUID) const { + return _nodePacketCounts[nodeUUID] > 0; } /// How many received packets waiting are to be processed @@ -53,9 +63,21 @@ protected: /// Implements generic processing behavior for this thread. virtual bool process(); + /// Determines the timeout of the wait when there are no packets to process. Default value means no timeout + virtual unsigned long getMaxWait() const { return ULONG_MAX; } + + /// Override to do work before the packets processing loop. Default does nothing. + virtual void preProcess() { } + + /// Override to do work inside the packet processing loop after a packet is processed. Default does nothing. + virtual void midProcess() { } + + /// Override to do work after the packets processing loop. Default does nothing. + virtual void postProcess() { } + virtual void terminating(); -private: +protected: QVector _packets; QHash _nodePacketCounts; diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp new file mode 100644 index 0000000000..841b5e909c --- /dev/null +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -0,0 +1,63 @@ +// +// SentPacketHistory.cpp +// libraries/networking/src +// +// Created by Yixin Wang on 6/5/2014 +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include +#include "SentPacketHistory.h" +#include + +SentPacketHistory::SentPacketHistory(int size) + : _sentPackets(size), + _newestPacketAt(0), + _numExistingPackets(0), + _newestSequenceNumber(std::numeric_limits::max()) +{ +} + +void SentPacketHistory::packetSent(uint16_t sequenceNumber, const QByteArray& packet) { + + // check if given seq number has the expected value. if not, something's wrong with + // the code calling this function + uint16_t expectedSequenceNumber = _newestSequenceNumber + (uint16_t)1; + if (sequenceNumber != expectedSequenceNumber) { + qDebug() << "Unexpected sequence number passed to SentPacketHistory::packetSent()!" + << "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber; + } + + _newestSequenceNumber = sequenceNumber; + + // increment _newestPacketAt cyclically, insert new packet there. + // this will overwrite the oldest packet in the buffer + _newestPacketAt = (_newestPacketAt == _sentPackets.size() - 1) ? 0 : _newestPacketAt + 1; + _sentPackets[_newestPacketAt] = packet; + if (_numExistingPackets < _sentPackets.size()) { + _numExistingPackets++; + } +} + +const QByteArray* SentPacketHistory::getPacket(uint16_t sequenceNumber) const { + + const int UINT16_RANGE = std::numeric_limits::max() + 1; + + // if sequenceNumber > _newestSequenceNumber, assume sequenceNumber is from before the most recent rollover + // correct the diff so that it correctly represents how far back in the history sequenceNumber is + int seqDiff = (int)_newestSequenceNumber - (int)sequenceNumber; + if (seqDiff < 0) { + seqDiff += UINT16_RANGE; + } + // if desired sequence number is too old to be found in the history, return null + if (seqDiff >= _numExistingPackets) { + return NULL; + } + int packetAt = _newestPacketAt - seqDiff; + if (packetAt < 0) { + packetAt += _sentPackets.size(); + } + return &_sentPackets.at(packetAt); +} diff --git a/assignment-client/src/octree/SentPacketHistory.h b/libraries/networking/src/SentPacketHistory.h similarity index 61% rename from assignment-client/src/octree/SentPacketHistory.h rename to libraries/networking/src/SentPacketHistory.h index 4231400ac1..53a6919c42 100644 --- a/assignment-client/src/octree/SentPacketHistory.h +++ b/libraries/networking/src/SentPacketHistory.h @@ -1,6 +1,6 @@ // // SentPacketHistory.h -// assignement-client/src/octree +// libraries/networking/src // // Created by Yixin Wang on 6/5/2014 // @@ -11,25 +11,24 @@ #ifndef hifi_SentPacketHistory_h #define hifi_SentPacketHistory_h +#include #include #include -#include "OctreePacketData.h" - class SentPacketHistory { public: - SentPacketHistory(int size); + SentPacketHistory(int size = 1000); - void packetSent(OCTREE_PACKET_SEQUENCE sequenceNumber, const QByteArray& packet); - const QByteArray* getPacket(OCTREE_PACKET_SEQUENCE sequenceNumber) const; + void packetSent(uint16_t sequenceNumber, const QByteArray& packet); + const QByteArray* getPacket(uint16_t sequenceNumber) const; private: QVector _sentPackets; // circular buffer int _newestPacketAt; int _numExistingPackets; - OCTREE_PACKET_SEQUENCE _newestSequenceNumber; + uint16_t _newestSequenceNumber; }; #endif diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 898c41de08..43e253b2da 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -17,7 +17,6 @@ #include #include "OctreeEditPacketSender.h" - EditPacketBuffer::EditPacketBuffer(PacketType type, unsigned char* buffer, ssize_t length, QUuid nodeUUID) : _nodeUUID(nodeUUID), _currentType(type), @@ -89,7 +88,7 @@ bool OctreeEditPacketSender::serversExist() const { // This method is called when the edit packet layer has determined that it has a fully formed packet destined for // a known nodeID. -void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, ssize_t length) { +void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, const unsigned char* buffer, ssize_t length) { NodeList* nodeList = NodeList::getInstance(); foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { @@ -97,12 +96,19 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c if (node->getType() == getMyNodeType() && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull()))) { if (node->getActiveSocket()) { - queuePacketForSending(node, QByteArray(reinterpret_cast(buffer), length)); + QByteArray packet(reinterpret_cast(buffer), length); + queuePacketForSending(node, packet); + + // extract sequence number and add packet to history + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; + unsigned short int sequence = *((unsigned short int*)dataAt); + _sentPacketHistories[nodeUUID].packetSent(sequence, packet); // debugging output... bool wantDebugging = false; if (wantDebugging) { - int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); + int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader))); quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence)))); quint64 queuedAt = usecTimestampNow(); @@ -287,18 +293,20 @@ void OctreeEditPacketSender::releaseQueuedMessages() { if (!serversExist()) { _releaseQueuedMessagesPending = true; } else { - for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - releaseQueuedPacket(i->second); + for (QHash::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + releaseQueuedPacket(i.value()); } } } void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { + _releaseQueuedPacketMutex.lock(); if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketTypeUnknown) { queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); + packetBuffer._currentSize = 0; + packetBuffer._currentType = PacketTypeUnknown; } - packetBuffer._currentSize = 0; - packetBuffer._currentType = PacketTypeUnknown; + _releaseQueuedPacketMutex.unlock(); } void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PacketType type) { @@ -329,3 +337,41 @@ bool OctreeEditPacketSender::process() { // base class does most of the work. return PacketSender::process(); } + +void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { + // parse sending node from packet, retrieve packet history for that node + QUuid sendingNodeUUID = uuidFromPacketHeader(packet); + + // if packet history doesn't exist for the sender node (somehow), bail + if (!_sentPacketHistories.contains(sendingNodeUUID)) { + return; + } + const SentPacketHistory& sentPacketHistory = _sentPacketHistories.value(sendingNodeUUID); + + int numBytesPacketHeader = numBytesForPacketHeader(packet); + const unsigned char* dataAt = reinterpret_cast(packet.data()) + numBytesPacketHeader; + + // read number of sequence numbers + uint16_t numSequenceNumbers = (*(uint16_t*)dataAt); + dataAt += sizeof(uint16_t); + + // read sequence numbers and queue packets for resend + for (int i = 0; i < numSequenceNumbers; i++) { + unsigned short int sequenceNumber = (*(unsigned short int*)dataAt); + dataAt += sizeof(unsigned short int); + + // retrieve packet from history + const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber); + if (packet) { + const SharedNodePointer& node = NodeList::getInstance()->getNodeHash().value(sendingNodeUUID); + queuePacketForSending(node, *packet); + } + } +} + +void OctreeEditPacketSender::nodeKilled(SharedNodePointer node) { + // TODO: add locks + QUuid nodeUUID = node->getUUID(); + _pendingEditPackets.remove(nodeUUID); + _sentPacketHistories.remove(nodeUUID); +} diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index 0dc628c433..c16c0a2d4b 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -12,9 +12,11 @@ #ifndef hifi_OctreeEditPacketSender_h #define hifi_OctreeEditPacketSender_h +#include #include #include #include "JurisdictionMap.h" +#include "SentPacketHistory.h" /// Used for construction of edit packets class EditPacketBuffer { @@ -89,10 +91,16 @@ public: // you must override these... virtual char getMyNodeType() const = 0; virtual void adjustEditPacketForClockSkew(unsigned char* codeColorBuffer, ssize_t length, int clockSkew) { }; - + +public slots: + void nodeKilled(SharedNodePointer node); + +public: + void processNackPacket(const QByteArray& packet); + protected: bool _shouldSend; - void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, ssize_t length); + void queuePacketToNode(const QUuid& nodeID, const unsigned char* buffer, ssize_t length); void queuePendingPacketToNodes(PacketType type, unsigned char* buffer, ssize_t length); void queuePacketToNodes(unsigned char* buffer, ssize_t length); void initializePacket(EditPacketBuffer& packetBuffer, PacketType type); @@ -101,7 +109,7 @@ protected: void processPreServerExistsPackets(); // These are packets which are destined from know servers but haven't been released because they're still too small - std::map _pendingEditPackets; + QHash _pendingEditPackets; // These are packets that are waiting to be processed because we don't yet know if there are servers int _maxPendingMessages; @@ -114,5 +122,10 @@ protected: unsigned short int _sequenceNumber; int _maxPacketSize; + + QMutex _releaseQueuedPacketMutex; + + // TODO: add locks for this and _pendingEditPackets + QHash _sentPacketHistories; }; #endif // hifi_OctreeEditPacketSender_h diff --git a/libraries/octree/src/OctreeSceneStats.cpp b/libraries/octree/src/OctreeSceneStats.cpp index 868ef29886..d037ec79ad 100644 --- a/libraries/octree/src/OctreeSceneStats.cpp +++ b/libraries/octree/src/OctreeSceneStats.cpp @@ -9,6 +9,7 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include #include #include @@ -842,7 +843,7 @@ const char* OctreeSceneStats::getItemValue(Item item) { } void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, - bool wasStatsPacket, int nodeClockSkewUsec) { + bool wasStatsPacket, int nodeClockSkewUsec) { const bool wantExtraDebugging = false; int numBytesPacketHeader = numBytesForPacketHeader(packet); @@ -852,10 +853,10 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, dataAt += sizeof(OCTREE_PACKET_FLAGS); OCTREE_PACKET_SEQUENCE sequence = (*(OCTREE_PACKET_SEQUENCE*)dataAt); dataAt += sizeof(OCTREE_PACKET_SEQUENCE); - + OCTREE_PACKET_SENT_TIME sentAt = (*(OCTREE_PACKET_SENT_TIME*)dataAt); dataAt += sizeof(OCTREE_PACKET_SENT_TIME); - + //bool packetIsColored = oneAtBit(flags, PACKET_IS_COLOR_BIT); //bool packetIsCompressed = oneAtBit(flags, PACKET_IS_COMPRESSED_BIT); @@ -877,29 +878,15 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, return; // ignore any packets that are unreasonable } + const int UINT16_RANGE = std::numeric_limits::max() + 1; + // determine our expected sequence number... handle rollover appropriately - OCTREE_PACKET_SEQUENCE expected = _incomingPacket > 0 ? _incomingLastSequence + 1 : sequence; - - // Guard against possible corrupted packets... with bad sequence numbers - const int MAX_RESONABLE_SEQUENCE_OFFSET = 2000; - const int MIN_RESONABLE_SEQUENCE_OFFSET = -2000; - int sequenceOffset = (sequence - expected); - if (sequenceOffset > MAX_RESONABLE_SEQUENCE_OFFSET || sequenceOffset < MIN_RESONABLE_SEQUENCE_OFFSET) { - qDebug() << "ignoring unreasonable packet... sequence:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; - return; // ignore any packets that are unreasonable - } - - // track packets here... - _incomingPacket++; - _incomingBytes += packet.size(); - if (!wasStatsPacket) { - _incomingWastedBytes += (MAX_PACKET_SIZE - packet.size()); - } + OCTREE_PACKET_SEQUENCE expected = _incomingPacket > 0 ? _incomingLastSequence + (quint16)1 : sequence; const int USECS_PER_MSEC = 1000; float flightTimeMsecs = flightTime / USECS_PER_MSEC; _incomingFlightTimeAverage.updateAverage(flightTimeMsecs); - + // track out of order and possibly lost packets... if (sequence == _incomingLastSequence) { if (wantExtraDebugging) { @@ -911,15 +898,46 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, qDebug() << "out of order... got:" << sequence << "expected:" << expected; } + int sequenceInt = (int)sequence; + int expectedInt = (int)expected; + + // if distance between sequence and expected are more than half of the total range of possible seq numbers, + // assume that a rollover occurred between the two. + // correct the larger one so it's in the range [-UINT16_RANGE, -1] while the other remains in [0, UINT16_RANGE-1] + // after doing so, sequenceInt and expectedInt can be correctly compared to each other, though one may be negative + if (std::abs(sequenceInt - expectedInt) > UINT16_RANGE / 2) { + if (sequenceInt > expectedInt) { + sequenceInt -= UINT16_RANGE; + } + else { + expectedInt -= UINT16_RANGE; + } + } + + // Guard against possible corrupted packets... with bad sequence numbers + const int MAX_RESONABLE_SEQUENCE_OFFSET = 2000; + const int MIN_RESONABLE_SEQUENCE_OFFSET = -2000; + + int sequenceOffset = (sequenceInt - expectedInt); + if (sequenceOffset > MAX_RESONABLE_SEQUENCE_OFFSET || sequenceOffset < MIN_RESONABLE_SEQUENCE_OFFSET) { + qDebug() << "ignoring unreasonable packet... sequence:" << sequence << "_incomingLastSequence:" << _incomingLastSequence; + return; // ignore any packets that are unreasonable + } + // if the sequence is less than our expected, then this might be a packet // that was delayed and so we should find it in our lostSequence list - if (sequence < expected) { + if (sequenceInt < expectedInt) { + + // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] + // if rollover between them: sequenceInt in [-UINT16_RANGE, -1], expectedInt in [0, UINT16_RANGE-1] + if (wantExtraDebugging) { qDebug() << "this packet is later than expected..."; } - if (sequence < std::max(0, (expected - MAX_MISSING_SEQUENCE_OLD_AGE))) { + if (sequenceInt < expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE) { _incomingReallyLate++; - } else { + } + else { _incomingLate++; } @@ -931,57 +949,78 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, _sequenceNumbersToNack.remove(sequence); _incomingLikelyLost--; _incomingRecovered++; - } else { + } + else { // if we're still in our pruning window, and we didn't find it in our missing list, // than this is really unexpected and can probably only happen if the packet was a // duplicate - if (sequence >= std::max(0, (expected - MAX_MISSING_SEQUENCE_OLD_AGE))) { + if (sequenceInt >= expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE) { if (wantExtraDebugging) { - qDebug() << "sequence:" << sequence << "WAS NOT found in _missingSequenceNumbers, and not that old... (expected - MAX_MISSING_SEQUENCE_OLD_AGE):" << (expected - MAX_MISSING_SEQUENCE_OLD_AGE); + qDebug() << "sequence:" << sequence << "WAS NOT found in _missingSequenceNumbers, and not that old... (expected - MAX_MISSING_SEQUENCE_OLD_AGE):" + << (uint16_t)(expectedInt - MAX_MISSING_SEQUENCE_OLD_AGE); } _incomingPossibleDuplicate++; } } - } - - if (sequence > expected) { + + // don't update _incomingLastSequence in this case. + // only bump the last sequence if it was greater than our expected sequence, this will keep us from + // accidentally going backwards when an out of order (recovered) packet comes in + + } else { // sequenceInt > expectedInt + + // if no rollover between them: sequenceInt, expectedInt are both in range [0, UINT16_RANGE-1] + // if rollover between them: sequenceInt in [0, UINT16_RANGE-1], expectedInt in [-UINT16_RANGE, -1] + if (wantExtraDebugging) { qDebug() << "this packet is earlier than expected..."; } _incomingEarly++; // hmm... so, we either didn't get some packets, or this guy came early... - unsigned int missing = sequence - expected; + int missing = sequenceInt - expectedInt; if (wantExtraDebugging) { qDebug() << ">>>>>>>> missing gap=" << missing; } _incomingLikelyLost += missing; - for(unsigned int missingSequence = expected; missingSequence < sequence; missingSequence++) { + for (int missingSequenceInt = expectedInt; missingSequenceInt < sequenceInt; missingSequenceInt++) { + OCTREE_PACKET_SEQUENCE missingSequence = missingSequenceInt >= 0 ? missingSequenceInt : missingSequenceInt + UINT16_RANGE; _missingSequenceNumbers << missingSequence; _sequenceNumbersToNack << missingSequence; } + + _incomingLastSequence = sequence; } + } else { // sequence = expected + + _incomingLastSequence = sequence; } } - // only bump the last sequence if it was greater than our expected sequence, this will keep us from - // accidentally going backwards when an out of order (recovered) packet comes in - if (sequence >= expected) { - _incomingLastSequence = sequence; - } - // do some garbage collecting on our _missingSequenceNumbers if (_missingSequenceNumbers.size() > MAX_MISSING_SEQUENCE) { if (wantExtraDebugging) { qDebug() << "too many _missingSequenceNumbers:" << _missingSequenceNumbers.size(); } + + int oldAgeCutoff = (int)_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE; + foreach(uint16_t missingItem, _missingSequenceNumbers) { if (wantExtraDebugging) { qDebug() << "checking item:" << missingItem << "is it in need of pruning?"; - qDebug() << "(_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE):" - << (_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE); + qDebug() << "(_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE):" + << (uint16_t)((int)_incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE); } - if (missingItem <= std::max(0, _incomingLastSequence - MAX_MISSING_SEQUENCE_OLD_AGE)) { + + bool prune; + if (oldAgeCutoff >= 0) { + prune = (missingItem <= oldAgeCutoff || missingItem > _incomingLastSequence); + } + else { + prune = (missingItem <= oldAgeCutoff + UINT16_RANGE && missingItem > _incomingLastSequence); + } + + if (prune) { if (wantExtraDebugging) { qDebug() << "pruning really old missing sequence:" << missingItem; } @@ -991,6 +1030,12 @@ void OctreeSceneStats::trackIncomingOctreePacket(const QByteArray& packet, } } + // track packets here... + _incomingPacket++; + _incomingBytes += packet.size(); + if (!wasStatsPacket) { + _incomingWastedBytes += (MAX_PACKET_SIZE - packet.size()); + } } int OctreeSceneStats::getNumSequenceNumbersToNack() const { diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index bbb01894ed..b2c0eb13db 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -57,7 +57,7 @@ protected: bool isStillRunning() const { return !_stopThread; } -private: +protected: QMutex _mutex; bool _stopThread;