diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 0f8a9f24f6..bac178377e 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -62,10 +62,10 @@ void DefaultCC::onACK(SequenceNumber ackNum) { if (_slowStart) { // we are in slow start phase - increase the congestion window size by the number of packets just ACKed - _congestionWindowSize += seqlen(_slowStartLastACK, ackNum); + _congestionWindowSize += seqlen(_lastACK, ackNum); // update the last ACK - _slowStartLastACK = ackNum; + _lastACK = ackNum; // check if we can get out of slow start (is our new congestion window size bigger than the max) if (_congestionWindowSize > _maxCongestionWindowSize) { @@ -137,19 +137,20 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { return; } } - + _loss = true; static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125; static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5; - + // check if this NAK starts a new congestion period - this will be the case if the // NAK received occured for a packet sent after the last decrease if (rangeStart > _lastDecreaseMaxSeq) { + _lastDecreasePeriod = _packetSendPeriod; - + _packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE); - + // use EWMA to update the average number of NAKs per congestion static const double NAK_EWMA_ALPHA = 0.125; _avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA); @@ -159,7 +160,7 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { _decreaseCount = 1; _lastDecreaseMaxSeq = _sendCurrSeqNum; - + if (_avgNAKNum < 1) { _randomDecreaseThreshold = 1; } else { @@ -179,17 +180,16 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { } } -// Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets void DefaultCC::onTimeout() { if (_slowStart) { stopSlowStart(); } else { // UDT used to do the following on timeout if not in slow start - we should check if it could be helpful - // _lastDecreasePeriod = _packetSendPeriod; - // _packetSendPeriod = ceil(_packetSendPeriod * 2); - + _lastDecreasePeriod = _packetSendPeriod; + _packetSendPeriod = ceil(_packetSendPeriod * 2); + // this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start - // _lastDecreaseMaxSeq = _slowStartLastAck; + _lastDecreaseMaxSeq = _lastACK; } } @@ -206,3 +206,7 @@ void DefaultCC::stopSlowStart() { _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval()); } } + +void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) { + _lastACK = _lastDecreaseMaxSeq = seqNum - 1; +} diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index 69b7a32d2d..3a5c8d0d00 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -41,7 +41,7 @@ public: virtual void init() {} virtual void onACK(SequenceNumber ackNum) {} virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {} - + virtual void onTimeout() {} protected: void setAckInterval(int ackInterval) { _ackInterval = ackInterval; } void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; } @@ -107,7 +107,7 @@ public: virtual void onTimeout(); protected: - virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) { _slowStartLastACK = seqNum; } + virtual void setInitialSendSequenceNumber(SequenceNumber seqNum); private: void stopSlowStart(); // stops the slow start on loss or timeout @@ -115,7 +115,7 @@ private: p_high_resolution_clock::time_point _lastRCTime = p_high_resolution_clock::now(); // last rate increase time bool _slowStart { true }; // if in slow start phase - SequenceNumber _slowStartLastACK; // last ACKed seq num from previous slow start check + SequenceNumber _lastACK; // last ACKed sequence number from previous bool _loss { false }; // if loss happened since last rate increase SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 96d4b65aec..f75a9535f5 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -98,6 +98,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); + QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout); // set defaults on the send queue from our congestion control object and estimatedTimeout() _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -129,6 +130,12 @@ void Connection::queueInactive() { } } +void Connection::queueTimeout() { + updateCongestionControlAndSendQueue([this]{ + _congestionControl->onTimeout(); + }); +} + void Connection::sendReliablePacket(std::unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); getSendQueue().queuePacket(std::move(packet)); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index bf56a468aa..8d80e736af 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -84,6 +84,7 @@ private slots: void recordSentPackets(int payload, int total); void recordRetransmission(); void queueInactive(); + void queueTimeout(); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index bda12b2e4d..9701561ec7 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include "../NetworkLogging.h" @@ -133,7 +134,6 @@ void SendQueue::sendPacket(const Packet& packet) { void SendQueue::ack(SequenceNumber ack) { // this is a response from the client, re-set our timeout expiry and our last response time - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); if (_lastACKSequenceNumber == (uint32_t) ack) { @@ -157,11 +157,13 @@ void SendQueue::ack(SequenceNumber ack) { } _lastACKSequenceNumber = (uint32_t) ack; + + // call notify_one on the condition_variable_any in case the send thread is sleeping with a full congestion window + _emptyCondition.notify_one(); } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { // this is a response from the client, re-set our timeout expiry - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); { @@ -175,7 +177,6 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { // this is a response from the client, re-set our timeout expiry - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); { @@ -314,10 +315,9 @@ void SendQueue::run() { } bool SendQueue::maybeSendNewPacket() { - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { + if (!isFlowWindowFull()) { // we didn't re-send a packet, so time to send a new one - if (!_packets.isEmpty()) { SequenceNumber nextNumber = getNextSequenceNumber(); @@ -438,28 +438,31 @@ bool SendQueue::maybeResendPacket() { } bool SendQueue::isInactive(bool sentAPacket) { - if (!sentAPacket) { - // check if it is time to break this connection - - // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been - // at least 5 seconds - static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; - if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE && - _lastReceiverResponse > 0 && - (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { - // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, - // then signal the queue is inactive and return so it can be cleaned up - + // check for connection timeout first + + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 5 seconds + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + static const int MIN_MS_BEFORE_INACTIVE = 5 * 1000; + + auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse); + + if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) && + _lastReceiverResponse > 0 && + sinceLastResponse > MIN_MS_BEFORE_INACTIVE) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive and return so it can be cleaned up + #ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; + qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" + << "and" << MIN_MS_BEFORE_INACTIVE << "milliseconds before receiving any ACK/NAK and is now inactive. Stopping."; #endif - - deactivate(); - return true; - } - + + deactivate(); + return true; + } + + if (!sentAPacket) { // During our processing above we didn't send any packets // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. @@ -468,7 +471,7 @@ bool SendQueue::isInactive(bool sentAPacket) { DoubleLock doubleLock(_packets.getLock(), _naksLock); DoubleLock::Lock locker(doubleLock, std::try_to_lock); - if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) { + if (locker.owns_lock() && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) { // The packets queue and loss list mutexes are now both locked and they're both empty if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { @@ -479,21 +482,22 @@ bool SendQueue::isInactive(bool sentAPacket) { // use our condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT); - // we have the lock again - Make sure to unlock it - locker.unlock(); - - if (cvStatus == std::cv_status::timeout) { + if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) { #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() << "seconds and receiver has ACKed all packets." << "The queue is now inactive and will be stopped."; #endif + + // we have the lock again - Make sure to unlock it + locker.unlock(); // Deactivate queue deactivate(); return true; } + } else { // We think the client is still waiting for data (based on the sequence number gap) // Let's wait either for a response from the client or until the estimated timeout @@ -503,17 +507,18 @@ bool SendQueue::isInactive(bool sentAPacket) { // use our condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); - if (cvStatus == std::cv_status::timeout) { - // increase the number of timeouts - ++_timeoutExpiryCount; + if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty() + && SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { - // after a timeout if we still have sent packets that the client hasn't ACKed we - // add them to the loss list - - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); - } + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + + // we have the lock again - time to unlock it + locker.unlock(); + + emit timeout(); } } } @@ -527,4 +532,8 @@ void SendQueue::deactivate() { emit queueInactive(); _state = State::Stopped; -} \ No newline at end of file +} + +bool SendQueue::isFlowWindowFull() const { + return seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > _flowWindowSize; +} diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 0390f2ff1f..9400ae8352 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -78,6 +78,8 @@ signals: void packetRetransmitted(); void queueInactive(); + + void timeout(); private slots: void run(); @@ -97,6 +99,8 @@ private: bool isInactive(bool sentAPacket); void deactivate(); // makes the queue inactive and cleans it up + + bool isFlowWindowFull() const; // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); @@ -118,7 +122,6 @@ private: std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC std::atomic _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC - std::atomic _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client std::atomic _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK) std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC