From 93a00c3d5d94d1203f566045dff453edf2f73501 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 18 Mar 2016 14:03:56 -0700 Subject: [PATCH 01/13] don't perform a decrease during single packet loss events --- .../networking/src/udt/CongestionControl.cpp | 19 +++++++++++++------ .../networking/src/udt/CongestionControl.h | 1 + 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 0f8a9f24f6..76ac93781b 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -138,18 +138,23 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { } } - _loss = true; - static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125; static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5; - + // check if this NAK starts a new congestion period - this will be the case if the // NAK received occured for a packet sent after the last decrease if (rangeStart > _lastDecreaseMaxSeq) { + + // check if we should skip handling of this loss event + // we do this if this congestion event represents only a single packet loss + if (rangeStart == rangeEnd) { + return; + } + _lastDecreasePeriod = _packetSendPeriod; - + _packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE); - + // use EWMA to update the average number of NAKs per congestion static const double NAK_EWMA_ALPHA = 0.125; _avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA); @@ -159,7 +164,7 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { _decreaseCount = 1; _lastDecreaseMaxSeq = _sendCurrSeqNum; - + if (_avgNAKNum < 1) { _randomDecreaseThreshold = 1; } else { @@ -177,6 +182,8 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { _packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE); _lastDecreaseMaxSeq = _sendCurrSeqNum; } + + _loss = true; } // Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index 69b7a32d2d..e3990f511a 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -118,6 +118,7 @@ private: SequenceNumber _slowStartLastACK; // last ACKed seq num from previous slow start check bool _loss { false }; // if loss happened since last rate increase SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened + SequenceNumber _firstLossFromEvent; // sequence number of first packet ignored for last congestion event double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened int _nakCount { 0 }; // number of NAKs in congestion epoch int _randomDecreaseThreshold { 1 }; // random threshold on decrease by number of loss events From cbcc6e3ef28b1818f7589be8c1bfc27efbb51c72 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 18 Mar 2016 14:30:06 -0700 Subject: [PATCH 02/13] fix for last decrease sequence number --- libraries/networking/src/udt/CongestionControl.cpp | 6 ++++++ libraries/networking/src/udt/CongestionControl.h | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 76ac93781b..6f447e8233 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -148,6 +148,7 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { // check if we should skip handling of this loss event // we do this if this congestion event represents only a single packet loss if (rangeStart == rangeEnd) { + qDebug() << "Skipping a first loss event"; return; } @@ -213,3 +214,8 @@ void DefaultCC::stopSlowStart() { _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval()); } } + +void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) { + _slowStartLastACK = seqNum; + _lastDecreaseMaxSeq = seqNum - 1; +} diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index e3990f511a..2b64796bc5 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -107,7 +107,7 @@ public: virtual void onTimeout(); protected: - virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) { _slowStartLastACK = seqNum; } + virtual void setInitialSendSequenceNumber(SequenceNumber seqNum); private: void stopSlowStart(); // stops the slow start on loss or timeout From 08dff9c7aca7a14d692fc50a6551d3aed755c1d8 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 09:06:39 -0700 Subject: [PATCH 03/13] handle expiry check even if a packet was sent --- libraries/networking/src/udt/SendQueue.cpp | 49 ++++++++++------------ libraries/networking/src/udt/SendQueue.h | 1 - 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index bda12b2e4d..90f6cf9813 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -133,7 +133,6 @@ void SendQueue::sendPacket(const Packet& packet) { void SendQueue::ack(SequenceNumber ack) { // this is a response from the client, re-set our timeout expiry and our last response time - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); if (_lastACKSequenceNumber == (uint32_t) ack) { @@ -161,7 +160,6 @@ void SendQueue::ack(SequenceNumber ack) { void SendQueue::nak(SequenceNumber start, SequenceNumber end) { // this is a response from the client, re-set our timeout expiry - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); { @@ -175,7 +173,6 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { // this is a response from the client, re-set our timeout expiry - _timeoutExpiryCount = 0; _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); { @@ -438,28 +435,31 @@ bool SendQueue::maybeResendPacket() { } bool SendQueue::isInactive(bool sentAPacket) { - if (!sentAPacket) { - // check if it is time to break this connection - - // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been - // at least 5 seconds - static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; - if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE && - _lastReceiverResponse > 0 && - (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { - // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, - // then signal the queue is inactive and return so it can be cleaned up - + // check for connection timeout first + + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 5 seconds + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; + + auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse); + + if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * _estimatedTimeout) && + _lastReceiverResponse > 0 && + sinceLastResponse > MIN_SECONDS_BEFORE_INACTIVE_MS) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive and return so it can be cleaned up + #ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; + qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" + << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; #endif - - deactivate(); - return true; - } - + + deactivate(); + return true; + } + + if (!sentAPacket) { // During our processing above we didn't send any packets // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. @@ -504,9 +504,6 @@ bool SendQueue::isInactive(bool sentAPacket) { auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); if (cvStatus == std::cv_status::timeout) { - // increase the number of timeouts - ++_timeoutExpiryCount; - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { // after a timeout if we still have sent packets that the client hasn't ACKed we // add them to the loss list diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 0390f2ff1f..0c8dfd8766 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -118,7 +118,6 @@ private: std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC std::atomic _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC - std::atomic _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client std::atomic _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK) std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC From 4c5ad8a03e41098bf6d43d65230f2e50e101efc1 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 09:08:11 -0700 Subject: [PATCH 04/13] remove first congestion event drops --- libraries/networking/src/udt/CongestionControl.cpp | 11 ++--------- libraries/networking/src/udt/CongestionControl.h | 1 - 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 6f447e8233..e8afdc5fe2 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -137,6 +137,8 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { return; } } + + _loss = true; static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125; static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5; @@ -145,13 +147,6 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { // NAK received occured for a packet sent after the last decrease if (rangeStart > _lastDecreaseMaxSeq) { - // check if we should skip handling of this loss event - // we do this if this congestion event represents only a single packet loss - if (rangeStart == rangeEnd) { - qDebug() << "Skipping a first loss event"; - return; - } - _lastDecreasePeriod = _packetSendPeriod; _packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE); @@ -183,8 +178,6 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { _packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE); _lastDecreaseMaxSeq = _sendCurrSeqNum; } - - _loss = true; } // Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index 2b64796bc5..c2b4227667 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -118,7 +118,6 @@ private: SequenceNumber _slowStartLastACK; // last ACKed seq num from previous slow start check bool _loss { false }; // if loss happened since last rate increase SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened - SequenceNumber _firstLossFromEvent; // sequence number of first packet ignored for last congestion event double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened int _nakCount { 0 }; // number of NAKs in congestion epoch int _randomDecreaseThreshold { 1 }; // random threshold on decrease by number of loss events From b7ff94e20d9eeb781ab644e58b415cac194d067b Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 10:09:28 -0700 Subject: [PATCH 05/13] put packets back in the queue if flow window is full --- libraries/networking/src/udt/SendQueue.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 90f6cf9813..6bace3ba21 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -314,7 +314,6 @@ bool SendQueue::maybeSendNewPacket() { if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { // we didn't re-send a packet, so time to send a new one - if (!_packets.isEmpty()) { SequenceNumber nextNumber = getNextSequenceNumber(); @@ -467,8 +466,11 @@ bool SendQueue::isInactive(bool sentAPacket) { using DoubleLock = DoubleLock; DoubleLock doubleLock(_packets.getLock(), _naksLock); DoubleLock::Lock locker(doubleLock, std::try_to_lock); + + auto packetsOnWire = seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber); + bool congestionWindowFull = (packetsOnWire > _flowWindowSize); - if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) { + if (locker.owns_lock() && (_packets.isEmpty() || congestionWindowFull) && _naks.isEmpty()) { // The packets queue and loss list mutexes are now both locked and they're both empty if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { From b059e98ff54dc0ed6a7d4c3ef344b7db2c2e1313 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 10:15:55 -0700 Subject: [PATCH 06/13] fix units for timeout check on expiry --- libraries/networking/src/udt/SendQueue.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 6bace3ba21..5b4fe1f7a5 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include "../NetworkLogging.h" @@ -439,19 +440,19 @@ bool SendQueue::isInactive(bool sentAPacket) { // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been // at least 5 seconds static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; + static const int MIN_MS_BEFORE_INACTIVE = 5 * 1000; auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse); - if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * _estimatedTimeout) && + if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) && _lastReceiverResponse > 0 && - sinceLastResponse > MIN_SECONDS_BEFORE_INACTIVE_MS) { + sinceLastResponse > MIN_MS_BEFORE_INACTIVE) { // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, // then signal the queue is inactive and return so it can be cleaned up #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; + << "and" << MIN_MS_BEFORE_INACTIVE << "milliseconds before receiving any ACK/NAK and is now inactive. Stopping."; #endif deactivate(); From 74ae18e51431c56797cc48968675f41f2ece03dd Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 11:15:43 -0700 Subject: [PATCH 07/13] replace append with insert to work around assert --- libraries/networking/src/udt/SendQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 5b4fe1f7a5..d67f053bad 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -512,7 +512,7 @@ bool SendQueue::isInactive(bool sentAPacket) { // add them to the loss list // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + _naks.insert(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); } } } From 4fe9ad94f5b6eea7783780edcb7c3948956e7d8a Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 11:20:39 -0700 Subject: [PATCH 08/13] notify on the emptyCondition if an ACK is received --- libraries/networking/src/udt/SendQueue.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index d67f053bad..2de23c5d0a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -157,6 +157,9 @@ void SendQueue::ack(SequenceNumber ack) { } _lastACKSequenceNumber = (uint32_t) ack; + + // call notify_one on the condition_variable_any in case the send thread is sleeping with a full congestion window + _emptyCondition.notify_one(); } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { From 24fd39dfa31f88323f7b7c8077d9794b3ca97458 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 11:36:23 -0700 Subject: [PATCH 09/13] make sure NAKs is empty before append from timeout --- libraries/networking/src/udt/SendQueue.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 2de23c5d0a..9ffc3587ae 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -510,12 +510,12 @@ bool SendQueue::isInactive(bool sentAPacket) { auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); if (cvStatus == std::cv_status::timeout) { - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + if (_naks.isEmpty() && SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { // after a timeout if we still have sent packets that the client hasn't ACKed we // add them to the loss list // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.insert(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); } } } From c65ab5f1ad5650365b884776a1ca1fa4b0b383af Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 11:40:27 -0700 Subject: [PATCH 10/13] re-check the timeout guards before acting on timeout --- libraries/networking/src/udt/SendQueue.cpp | 30 +++++++++++----------- libraries/networking/src/udt/SendQueue.h | 2 ++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 9ffc3587ae..06e82d439c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -315,7 +315,7 @@ void SendQueue::run() { } bool SendQueue::maybeSendNewPacket() { - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { + if (!isFlowWindowFull()) { // we didn't re-send a packet, so time to send a new one if (!_packets.isEmpty()) { @@ -470,11 +470,8 @@ bool SendQueue::isInactive(bool sentAPacket) { using DoubleLock = DoubleLock; DoubleLock doubleLock(_packets.getLock(), _naksLock); DoubleLock::Lock locker(doubleLock, std::try_to_lock); - - auto packetsOnWire = seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber); - bool congestionWindowFull = (packetsOnWire > _flowWindowSize); - if (locker.owns_lock() && (_packets.isEmpty() || congestionWindowFull) && _naks.isEmpty()) { + if (locker.owns_lock() && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) { // The packets queue and loss list mutexes are now both locked and they're both empty if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { @@ -488,7 +485,7 @@ bool SendQueue::isInactive(bool sentAPacket) { // we have the lock again - Make sure to unlock it locker.unlock(); - if (cvStatus == std::cv_status::timeout) { + if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) { #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() @@ -509,14 +506,13 @@ bool SendQueue::isInactive(bool sentAPacket) { // use our condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); - if (cvStatus == std::cv_status::timeout) { - if (_naks.isEmpty() && SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { - // after a timeout if we still have sent packets that the client hasn't ACKed we - // add them to the loss list - - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); - } + if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty() + && SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list + + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); } } } @@ -530,4 +526,8 @@ void SendQueue::deactivate() { emit queueInactive(); _state = State::Stopped; -} \ No newline at end of file +} + +bool SendQueue::isFlowWindowFull() const { + return seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > _flowWindowSize; +} diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 0c8dfd8766..0f646c3d9c 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -97,6 +97,8 @@ private: bool isInactive(bool sentAPacket); void deactivate(); // makes the queue inactive and cleans it up + + bool isFlowWindowFull() const; // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); From 8e97a5095764ba40aa1f4ff51b9a8a3015a44fb5 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 11:56:36 -0700 Subject: [PATCH 11/13] unlock before de-activating the queue --- libraries/networking/src/udt/SendQueue.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 06e82d439c..87f2f0a5bc 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -482,9 +482,6 @@ bool SendQueue::isInactive(bool sentAPacket) { // use our condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT); - // we have the lock again - Make sure to unlock it - locker.unlock(); - if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) { #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" @@ -492,11 +489,15 @@ bool SendQueue::isInactive(bool sentAPacket) { << "seconds and receiver has ACKed all packets." << "The queue is now inactive and will be stopped."; #endif + + // we have the lock again - Make sure to unlock it + locker.unlock(); // Deactivate queue deactivate(); return true; } + } else { // We think the client is still waiting for data (based on the sequence number gap) // Let's wait either for a response from the client or until the estimated timeout From 0956649cdbc2989b1fd9b078fad40d160feb2283 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 12:12:15 -0700 Subject: [PATCH 12/13] give CongestionControl timeout event from SendQueue --- .../networking/src/udt/CongestionControl.cpp | 15 +++++++-------- libraries/networking/src/udt/CongestionControl.h | 4 ++-- libraries/networking/src/udt/Connection.cpp | 7 +++++++ libraries/networking/src/udt/Connection.h | 1 + libraries/networking/src/udt/SendQueue.cpp | 5 +++++ libraries/networking/src/udt/SendQueue.h | 2 ++ 6 files changed, 24 insertions(+), 10 deletions(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index e8afdc5fe2..2b71ea5355 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -62,10 +62,10 @@ void DefaultCC::onACK(SequenceNumber ackNum) { if (_slowStart) { // we are in slow start phase - increase the congestion window size by the number of packets just ACKed - _congestionWindowSize += seqlen(_slowStartLastACK, ackNum); + _congestionWindowSize += seqlen(_lastACK, ackNum); // update the last ACK - _slowStartLastACK = ackNum; + _lastACK = ackNum; // check if we can get out of slow start (is our new congestion window size bigger than the max) if (_congestionWindowSize > _maxCongestionWindowSize) { @@ -186,11 +186,11 @@ void DefaultCC::onTimeout() { stopSlowStart(); } else { // UDT used to do the following on timeout if not in slow start - we should check if it could be helpful - // _lastDecreasePeriod = _packetSendPeriod; - // _packetSendPeriod = ceil(_packetSendPeriod * 2); - + _lastDecreasePeriod = _packetSendPeriod; + _packetSendPeriod = ceil(_packetSendPeriod * 2); + // this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start - // _lastDecreaseMaxSeq = _slowStartLastAck; + _lastDecreaseMaxSeq = _lastACK; } } @@ -209,6 +209,5 @@ void DefaultCC::stopSlowStart() { } void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) { - _slowStartLastACK = seqNum; - _lastDecreaseMaxSeq = seqNum - 1; + _lastACK = _lastDecreaseMaxSeq = seqNum - 1; } diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index c2b4227667..3a5c8d0d00 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -41,7 +41,7 @@ public: virtual void init() {} virtual void onACK(SequenceNumber ackNum) {} virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {} - + virtual void onTimeout() {} protected: void setAckInterval(int ackInterval) { _ackInterval = ackInterval; } void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; } @@ -115,7 +115,7 @@ private: p_high_resolution_clock::time_point _lastRCTime = p_high_resolution_clock::now(); // last rate increase time bool _slowStart { true }; // if in slow start phase - SequenceNumber _slowStartLastACK; // last ACKed seq num from previous slow start check + SequenceNumber _lastACK; // last ACKed sequence number from previous bool _loss { false }; // if loss happened since last rate increase SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 96d4b65aec..f75a9535f5 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -98,6 +98,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); + QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout); // set defaults on the send queue from our congestion control object and estimatedTimeout() _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -129,6 +130,12 @@ void Connection::queueInactive() { } } +void Connection::queueTimeout() { + updateCongestionControlAndSendQueue([this]{ + _congestionControl->onTimeout(); + }); +} + void Connection::sendReliablePacket(std::unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); getSendQueue().queuePacket(std::move(packet)); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index bf56a468aa..8d80e736af 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -84,6 +84,7 @@ private slots: void recordSentPackets(int payload, int total); void recordRetransmission(); void queueInactive(); + void queueTimeout(); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 87f2f0a5bc..9701561ec7 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -514,6 +514,11 @@ bool SendQueue::isInactive(bool sentAPacket) { // Note that thanks to the DoubleLock we have the _naksLock right now _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + + // we have the lock again - time to unlock it + locker.unlock(); + + emit timeout(); } } } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 0f646c3d9c..9400ae8352 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -78,6 +78,8 @@ signals: void packetRetransmitted(); void queueInactive(); + + void timeout(); private slots: void run(); From 884df5739d4388ce269d4d7448c8ef8b670c610c Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 21 Mar 2016 12:14:23 -0700 Subject: [PATCH 13/13] remove comment that indicates that onTimeout is not used --- libraries/networking/src/udt/CongestionControl.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 2b71ea5355..bac178377e 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -180,7 +180,6 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) { } } -// Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets void DefaultCC::onTimeout() { if (_slowStart) { stopSlowStart();