mirror of
https://github.com/lubosz/overte.git
synced 2025-04-24 03:53:52 +02:00
Merge pull request #7407 from birarda/udt-slow-start
fixes for udt timeout behaviour
This commit is contained in:
commit
605cc625d6
6 changed files with 81 additions and 57 deletions
|
@ -62,10 +62,10 @@ void DefaultCC::onACK(SequenceNumber ackNum) {
|
|||
|
||||
if (_slowStart) {
|
||||
// we are in slow start phase - increase the congestion window size by the number of packets just ACKed
|
||||
_congestionWindowSize += seqlen(_slowStartLastACK, ackNum);
|
||||
_congestionWindowSize += seqlen(_lastACK, ackNum);
|
||||
|
||||
// update the last ACK
|
||||
_slowStartLastACK = ackNum;
|
||||
_lastACK = ackNum;
|
||||
|
||||
// check if we can get out of slow start (is our new congestion window size bigger than the max)
|
||||
if (_congestionWindowSize > _maxCongestionWindowSize) {
|
||||
|
@ -137,19 +137,20 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
|
|||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
_loss = true;
|
||||
|
||||
static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125;
|
||||
static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5;
|
||||
|
||||
|
||||
// check if this NAK starts a new congestion period - this will be the case if the
|
||||
// NAK received occured for a packet sent after the last decrease
|
||||
if (rangeStart > _lastDecreaseMaxSeq) {
|
||||
|
||||
_lastDecreasePeriod = _packetSendPeriod;
|
||||
|
||||
|
||||
_packetSendPeriod = ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE);
|
||||
|
||||
|
||||
// use EWMA to update the average number of NAKs per congestion
|
||||
static const double NAK_EWMA_ALPHA = 0.125;
|
||||
_avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA);
|
||||
|
@ -159,7 +160,7 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
|
|||
_decreaseCount = 1;
|
||||
|
||||
_lastDecreaseMaxSeq = _sendCurrSeqNum;
|
||||
|
||||
|
||||
if (_avgNAKNum < 1) {
|
||||
_randomDecreaseThreshold = 1;
|
||||
} else {
|
||||
|
@ -179,17 +180,16 @@ void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
|
|||
}
|
||||
}
|
||||
|
||||
// Note: This isn't currently being called by anything since we, unlike UDT, don't have TTL on our packets
|
||||
void DefaultCC::onTimeout() {
|
||||
if (_slowStart) {
|
||||
stopSlowStart();
|
||||
} else {
|
||||
// UDT used to do the following on timeout if not in slow start - we should check if it could be helpful
|
||||
// _lastDecreasePeriod = _packetSendPeriod;
|
||||
// _packetSendPeriod = ceil(_packetSendPeriod * 2);
|
||||
|
||||
_lastDecreasePeriod = _packetSendPeriod;
|
||||
_packetSendPeriod = ceil(_packetSendPeriod * 2);
|
||||
|
||||
// this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start
|
||||
// _lastDecreaseMaxSeq = _slowStartLastAck;
|
||||
_lastDecreaseMaxSeq = _lastACK;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -206,3 +206,7 @@ void DefaultCC::stopSlowStart() {
|
|||
_packetSendPeriod = _congestionWindowSize / (_rtt + synInterval());
|
||||
}
|
||||
}
|
||||
|
||||
void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) {
|
||||
_lastACK = _lastDecreaseMaxSeq = seqNum - 1;
|
||||
}
|
||||
|
|
|
@ -41,7 +41,7 @@ public:
|
|||
virtual void init() {}
|
||||
virtual void onACK(SequenceNumber ackNum) {}
|
||||
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {}
|
||||
|
||||
virtual void onTimeout() {}
|
||||
protected:
|
||||
void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
|
||||
void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; }
|
||||
|
@ -107,7 +107,7 @@ public:
|
|||
virtual void onTimeout();
|
||||
|
||||
protected:
|
||||
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) { _slowStartLastACK = seqNum; }
|
||||
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum);
|
||||
|
||||
private:
|
||||
void stopSlowStart(); // stops the slow start on loss or timeout
|
||||
|
@ -115,7 +115,7 @@ private:
|
|||
p_high_resolution_clock::time_point _lastRCTime = p_high_resolution_clock::now(); // last rate increase time
|
||||
|
||||
bool _slowStart { true }; // if in slow start phase
|
||||
SequenceNumber _slowStartLastACK; // last ACKed seq num from previous slow start check
|
||||
SequenceNumber _lastACK; // last ACKed sequence number from previous
|
||||
bool _loss { false }; // if loss happened since last rate increase
|
||||
SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened
|
||||
double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened
|
||||
|
|
|
@ -98,6 +98,7 @@ SendQueue& Connection::getSendQueue() {
|
|||
QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
|
||||
|
||||
// set defaults on the send queue from our congestion control object and estimatedTimeout()
|
||||
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
|
||||
|
@ -129,6 +130,12 @@ void Connection::queueInactive() {
|
|||
}
|
||||
}
|
||||
|
||||
void Connection::queueTimeout() {
|
||||
updateCongestionControlAndSendQueue([this]{
|
||||
_congestionControl->onTimeout();
|
||||
});
|
||||
}
|
||||
|
||||
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
|
||||
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
|
||||
getSendQueue().queuePacket(std::move(packet));
|
||||
|
|
|
@ -84,6 +84,7 @@ private slots:
|
|||
void recordSentPackets(int payload, int total);
|
||||
void recordRetransmission();
|
||||
void queueInactive();
|
||||
void queueTimeout();
|
||||
|
||||
private:
|
||||
void sendACK(bool wasCausedBySyncTimeout = true);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include <QtCore/QThread>
|
||||
|
||||
#include <LogHandler.h>
|
||||
#include <NumericalConstants.h>
|
||||
#include <SharedUtil.h>
|
||||
|
||||
#include "../NetworkLogging.h"
|
||||
|
@ -133,7 +134,6 @@ void SendQueue::sendPacket(const Packet& packet) {
|
|||
|
||||
void SendQueue::ack(SequenceNumber ack) {
|
||||
// this is a response from the client, re-set our timeout expiry and our last response time
|
||||
_timeoutExpiryCount = 0;
|
||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
||||
|
||||
if (_lastACKSequenceNumber == (uint32_t) ack) {
|
||||
|
@ -157,11 +157,13 @@ void SendQueue::ack(SequenceNumber ack) {
|
|||
}
|
||||
|
||||
_lastACKSequenceNumber = (uint32_t) ack;
|
||||
|
||||
// call notify_one on the condition_variable_any in case the send thread is sleeping with a full congestion window
|
||||
_emptyCondition.notify_one();
|
||||
}
|
||||
|
||||
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
||||
// this is a response from the client, re-set our timeout expiry
|
||||
_timeoutExpiryCount = 0;
|
||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
||||
|
||||
{
|
||||
|
@ -175,7 +177,6 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
|||
|
||||
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
|
||||
// this is a response from the client, re-set our timeout expiry
|
||||
_timeoutExpiryCount = 0;
|
||||
_lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch());
|
||||
|
||||
{
|
||||
|
@ -314,10 +315,9 @@ void SendQueue::run() {
|
|||
}
|
||||
|
||||
bool SendQueue::maybeSendNewPacket() {
|
||||
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
|
||||
if (!isFlowWindowFull()) {
|
||||
// we didn't re-send a packet, so time to send a new one
|
||||
|
||||
|
||||
if (!_packets.isEmpty()) {
|
||||
SequenceNumber nextNumber = getNextSequenceNumber();
|
||||
|
||||
|
@ -438,28 +438,31 @@ bool SendQueue::maybeResendPacket() {
|
|||
}
|
||||
|
||||
bool SendQueue::isInactive(bool sentAPacket) {
|
||||
if (!sentAPacket) {
|
||||
// check if it is time to break this connection
|
||||
|
||||
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
||||
// at least 5 seconds
|
||||
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
|
||||
static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000;
|
||||
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE &&
|
||||
_lastReceiverResponse > 0 &&
|
||||
(QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) {
|
||||
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
||||
// then signal the queue is inactive and return so it can be cleaned up
|
||||
|
||||
// check for connection timeout first
|
||||
|
||||
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
|
||||
// at least 5 seconds
|
||||
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
|
||||
static const int MIN_MS_BEFORE_INACTIVE = 5 * 1000;
|
||||
|
||||
auto sinceLastResponse = (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse);
|
||||
|
||||
if (sinceLastResponse >= quint64(NUM_TIMEOUTS_BEFORE_INACTIVE * (_estimatedTimeout / USECS_PER_MSEC)) &&
|
||||
_lastReceiverResponse > 0 &&
|
||||
sinceLastResponse > MIN_MS_BEFORE_INACTIVE) {
|
||||
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
|
||||
// then signal the queue is inactive and return so it can be cleaned up
|
||||
|
||||
#ifdef UDT_CONNECTION_DEBUG
|
||||
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
|
||||
<< "and 5s before receiving any ACK/NAK and is now inactive. Stopping.";
|
||||
qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts"
|
||||
<< "and" << MIN_MS_BEFORE_INACTIVE << "milliseconds before receiving any ACK/NAK and is now inactive. Stopping.";
|
||||
#endif
|
||||
|
||||
deactivate();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
deactivate();
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!sentAPacket) {
|
||||
// During our processing above we didn't send any packets
|
||||
|
||||
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
|
||||
|
@ -468,7 +471,7 @@ bool SendQueue::isInactive(bool sentAPacket) {
|
|||
DoubleLock doubleLock(_packets.getLock(), _naksLock);
|
||||
DoubleLock::Lock locker(doubleLock, std::try_to_lock);
|
||||
|
||||
if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) {
|
||||
if (locker.owns_lock() && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) {
|
||||
// The packets queue and loss list mutexes are now both locked and they're both empty
|
||||
|
||||
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
|
||||
|
@ -479,21 +482,22 @@ bool SendQueue::isInactive(bool sentAPacket) {
|
|||
// use our condition_variable_any to wait
|
||||
auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT);
|
||||
|
||||
// we have the lock again - Make sure to unlock it
|
||||
locker.unlock();
|
||||
|
||||
if (cvStatus == std::cv_status::timeout) {
|
||||
if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) {
|
||||
#ifdef UDT_CONNECTION_DEBUG
|
||||
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
|
||||
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()
|
||||
<< "seconds and receiver has ACKed all packets."
|
||||
<< "The queue is now inactive and will be stopped.";
|
||||
#endif
|
||||
|
||||
// we have the lock again - Make sure to unlock it
|
||||
locker.unlock();
|
||||
|
||||
// Deactivate queue
|
||||
deactivate();
|
||||
return true;
|
||||
}
|
||||
|
||||
} else {
|
||||
// We think the client is still waiting for data (based on the sequence number gap)
|
||||
// Let's wait either for a response from the client or until the estimated timeout
|
||||
|
@ -503,17 +507,18 @@ bool SendQueue::isInactive(bool sentAPacket) {
|
|||
// use our condition_variable_any to wait
|
||||
auto cvStatus = _emptyCondition.wait_for(locker, waitDuration);
|
||||
|
||||
if (cvStatus == std::cv_status::timeout) {
|
||||
// increase the number of timeouts
|
||||
++_timeoutExpiryCount;
|
||||
if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()
|
||||
&& SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
|
||||
// after a timeout if we still have sent packets that the client hasn't ACKed we
|
||||
// add them to the loss list
|
||||
|
||||
if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) {
|
||||
// after a timeout if we still have sent packets that the client hasn't ACKed we
|
||||
// add them to the loss list
|
||||
|
||||
// Note that thanks to the DoubleLock we have the _naksLock right now
|
||||
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
|
||||
}
|
||||
// Note that thanks to the DoubleLock we have the _naksLock right now
|
||||
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
|
||||
|
||||
// we have the lock again - time to unlock it
|
||||
locker.unlock();
|
||||
|
||||
emit timeout();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -527,4 +532,8 @@ void SendQueue::deactivate() {
|
|||
emit queueInactive();
|
||||
|
||||
_state = State::Stopped;
|
||||
}
|
||||
}
|
||||
|
||||
bool SendQueue::isFlowWindowFull() const {
|
||||
return seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > _flowWindowSize;
|
||||
}
|
||||
|
|
|
@ -78,6 +78,8 @@ signals:
|
|||
void packetRetransmitted();
|
||||
|
||||
void queueInactive();
|
||||
|
||||
void timeout();
|
||||
|
||||
private slots:
|
||||
void run();
|
||||
|
@ -97,6 +99,8 @@ private:
|
|||
|
||||
bool isInactive(bool sentAPacket);
|
||||
void deactivate(); // makes the queue inactive and cleans it up
|
||||
|
||||
bool isFlowWindowFull() const;
|
||||
|
||||
// Increments current sequence number and return it
|
||||
SequenceNumber getNextSequenceNumber();
|
||||
|
@ -118,7 +122,6 @@ private:
|
|||
|
||||
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
|
||||
std::atomic<int> _syncInterval { udt::DEFAULT_SYN_INTERVAL_USECS }; // Sync interval, set from CC
|
||||
std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client
|
||||
std::atomic<uint64_t> _lastReceiverResponse { 0 }; // Timestamp for the last time we got new data from the receiver (ACK/NAK)
|
||||
|
||||
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
|
||||
|
|
Loading…
Reference in a new issue