diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index a502c714e3..74ad707b11 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -727,9 +727,12 @@ void Connection::updateCongestionControlAndSendQueue(std::function cong // fire congestion control callback congestionCallback(); + auto& sendQueue = getSendQueue(); + // now that we've updated the congestion control, update the packet send period and flow window size - getSendQueue().setPacketSendPeriod(_congestionControl->_packetSendPeriod); - getSendQueue().setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); + sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod); + sendQueue.setEstimatedTimeout(estimatedTimeout()); + sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); // record connection stats _stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 36ebd2a6b0..7db24a63a3 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -154,6 +154,9 @@ void SendQueue::sendPacket(const Packet& packet) { } void SendQueue::ack(SequenceNumber ack) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + if (_lastACKSequenceNumber == (uint32_t) ack) { return; } @@ -177,6 +180,9 @@ void SendQueue::ack(SequenceNumber ack) { } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + std::unique_lock nakLocker(_naksLock); _naks.insert(start, end); @@ -189,6 +195,9 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { } void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + std::unique_lock nakLocker(_naksLock); _naks.clear(); @@ -212,7 +221,7 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { } void SendQueue::handshakeACK() { - std::unique_lock locker(_handshakeMutex); + std::unique_lock locker { _handshakeMutex }; _hasReceivedHandshakeACK = true; @@ -258,7 +267,7 @@ void SendQueue::run() { while (_isRunning) { // Record how long the loop takes to execute auto loopStartTimestamp = high_resolution_clock::now(); - + std::unique_lock handshakeLock { _handshakeMutex }; if (!_hasReceivedHandshakeACK) { @@ -305,12 +314,6 @@ void SendQueue::run() { sentAPacket = maybeSendNewPacket(); } - // Keep track of how long the flow window has been full for - if (flowWindowFull && !_flowWindowWasFull) { - _flowWindowFullSince = loopStartTimestamp; - } - _flowWindowWasFull = flowWindowFull; - // since we're a while loop, give the thread a chance to process events QCoreApplication::processEvents(); @@ -320,15 +323,22 @@ void SendQueue::run() { } if (_hasReceivedHandshakeACK && !sentAPacket) { - static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; + // check if it is time to break this connection - if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 10 seconds + + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + + if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE) { // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, // then signal the queue is inactive and return so it can be cleaned up + qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is" + << "considered inactive. It is now being stopped."; emit queueInactive(); return; } else { - // During our processing above we didn't send any packets and the flow window is not full. + // During our processing above we didn't send any packets // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock @@ -337,21 +347,49 @@ void SendQueue::run() { // The packets queue and loss list mutexes are now both locked - check if they're still both empty if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { - // both are empty - let's use a condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER); - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); + if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { + // we've sent the client as much data as we have (and they've ACKed it) + // either wait for new data to send or 5 seconds before cleaning up the queue + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT_US = std::chrono::microseconds(5 * 1000 * 1000); - if (cvStatus == std::cv_status::timeout) { - // the wait_for released because we've been inactive for too long - // so emit our inactive signal and return so the send queue can be cleaned up - emit queueInactive(); - return; + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT_US); + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + if (cvStatus == std::cv_status::timeout) { + qDebug() << "SendQueue to" << _destination << "has been empty for" + << std::chrono::duration_cast(EMPTY_QUEUES_INACTIVE_TIMEOUT_US).count() + << "and receiver has ACKed all packets. The queue is considered inactive and will be stopped"; + + // this queue is inactive - emit that signal and stop the while + emit queueInactive(); + return; + } + } else { + // We think the client is still waiting for data (based on the sequence number gap) + // Let's wait either for a response from the client or until the estimated timeout + auto waitDuration = std::chrono::microseconds(_estimatedTimeout); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); + + if (cvStatus == std::cv_status::timeout) { + // increase the number of timeouts + ++_timeoutExpiryCount; + + // Add all of the packets above the last received ACKed sequence number to the loss list + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + // skip to the next iteration + continue; } - - // skip to the next iteration - continue; } } } @@ -452,4 +490,3 @@ bool SendQueue::maybeResendPacket() { // No packet was resent return false; } - diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 70493f2054..22d52d28f0 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -57,6 +57,8 @@ public: int getPacketSendPeriod() const { return _packetSendPeriod; } void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; } + void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; } + public slots: void stop(); @@ -104,6 +106,9 @@ private: std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC std::atomic _isRunning { false }; + std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC + std::atomic _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client + std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC // Used to detect when the connection becomes inactive for too long @@ -117,7 +122,7 @@ private: std::unordered_map> _sentPackets; // Packets waiting for ACK. std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable - bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client + std::atomic _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client std::condition_variable _handshakeACKCondition; std::condition_variable_any _emptyCondition;