Merge pull request #14314 from birarda/bug/reliable-through-loss

avoid ambiguous RTT calcs and enable udt fast retransmit
This commit is contained in:
John Conklin II 2018-10-31 15:49:23 -07:00 committed by GitHub
commit 25be635b76
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 113 deletions

View file

@ -45,8 +45,10 @@ public:
virtual void onTimeout() {}
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {}
virtual void onPacketReSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {}
virtual int estimatedTimeout() const = 0;
protected:
void setMSS(int mss) { _mss = mss; }
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) = 0;

View file

@ -195,7 +195,7 @@ void Connection::recordSentPackets(int wireSize, int payloadSize,
void Connection::recordRetransmission(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {
_stats.record(ConnectionStats::Stats::Retransmission);
_congestionControl->onPacketSent(wireSize, seqNum, timePoint);
_congestionControl->onPacketReSent(wireSize, seqNum, timePoint);
}
void Connection::sendACK() {
@ -303,7 +303,7 @@ void Connection::processControl(ControlPacketPointer controlPacket) {
// where the other end expired our connection. Let's reset.
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Got HandshakeRequest from" << _destination << ", stopping SendQueue";
qCDebug(networking) << "Got HandshakeRequest from" << _destination << ", stopping SendQueue";
#endif
_hasReceivedHandshakeACK = false;
stopSendQueue();
@ -327,19 +327,19 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
return;
}
if (ack <= _lastReceivedACK) {
if (ack < _lastReceivedACK) {
// this is an out of order ACK, bail
// or
// processing an already received ACK, bail
return;
}
_lastReceivedACK = ack;
// ACK the send queue so it knows what was received
getSendQueue().ack(ack);
if (ack > _lastReceivedACK) {
// this is not a repeated ACK, so update our member and tell the send queue
_lastReceivedACK = ack;
// ACK the send queue so it knows what was received
getSendQueue().ack(ack);
}
// give this ACK to the congestion control and update the send queue parameters
updateCongestionControlAndSendQueue([this, ack, &controlPacket] {
if (_congestionControl->onACK(ack, controlPacket->getReceiveTime())) {

View file

@ -481,6 +481,7 @@ bool SendQueue::isInactive(bool attemptedToSendPacket) {
auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT);
if (cvStatus == std::cv_status::timeout && (_packets.isEmpty() || isFlowWindowFull()) && _naks.isEmpty()) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "SendQueue to" << _destination << "has been empty for"
<< EMPTY_QUEUES_INACTIVE_TIMEOUT.count()

View file

@ -27,112 +27,106 @@ TCPVegasCC::TCPVegasCC() {
_baseRTT = std::numeric_limits<int>::max();
}
bool TCPVegasCC::calculateRTT(p_high_resolution_clock::time_point sendTime, p_high_resolution_clock::time_point receiveTime) {
// calculate the RTT (receive time - time ACK sent)
int lastRTT = duration_cast<microseconds>(receiveTime - sendTime).count();
const int MAX_RTT_SAMPLE_MICROSECONDS = 10000000;
if (lastRTT < 0) {
Q_ASSERT_X(false, __FUNCTION__, "calculated an RTT that is not > 0");
return false;
} else if (lastRTT == 0) {
// we do not allow a zero microsecond RTT (as per the UNIX kernel implementation of TCP Vegas)
lastRTT = 1;
} else if (lastRTT > MAX_RTT_SAMPLE_MICROSECONDS) {
// we cap the lastRTT to MAX_RTT_SAMPLE_MICROSECONDS to avoid overflows in window size calculations
lastRTT = MAX_RTT_SAMPLE_MICROSECONDS;
}
if (_ewmaRTT == -1) {
// first RTT sample - set _ewmaRTT to the value and set the variance to half the value
_ewmaRTT = lastRTT;
_rttVariance = lastRTT / 2;
} else {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation
// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/7-transport/Jacobson-88.pdf
// Estimated RTT = (1 - x)(estimatedRTT) + (x)(sampleRTT)
// (where x = 0.125 via Jacobson)
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
static const int RTT_ESTIMATION_ALPHA = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA = 4;
_ewmaRTT = (_ewmaRTT * (RTT_ESTIMATION_ALPHA - 1) + lastRTT) / RTT_ESTIMATION_ALPHA;
_rttVariance = (_rttVariance * (RTT_ESTIMATION_VARIANCE_ALPHA- 1)
+ abs(lastRTT - _ewmaRTT)) / RTT_ESTIMATION_VARIANCE_ALPHA;
}
// keep track of the lowest RTT during connection
_baseRTT = std::min(_baseRTT, lastRTT);
// find the min RTT during the last RTT
_currentMinRTT = std::min(_currentMinRTT, lastRTT);
// add 1 to the number of RTT samples collected during this RTT window
++_numRTTs;
return true;
}
bool TCPVegasCC::onACK(SequenceNumber ack, p_high_resolution_clock::time_point receiveTime) {
auto it = _sentPacketTimes.find(ack);
auto previousAck = _lastACK;
_lastACK = ack;
if (it != _sentPacketTimes.end()) {
bool wasDuplicateACK = (ack == previousAck);
// calculate the RTT (receive time - time ACK sent)
int lastRTT = duration_cast<microseconds>(receiveTime - it->second).count();
auto it = std::find_if(_sentPacketDatas.begin(), _sentPacketDatas.end(), [ack](SentPacketData& packetTime){
return packetTime.sequenceNumber == ack;
});
const int MAX_RTT_SAMPLE_MICROSECONDS = 10000000;
if (!wasDuplicateACK && it != _sentPacketDatas.end()) {
// check if we can unambigiously calculate an RTT from this ACK
if (lastRTT < 0) {
Q_ASSERT_X(false, __FUNCTION__, "calculated an RTT that is not > 0");
// for that to be the case,
// any of the packets this ACK covers (from the current ACK back to our previous ACK)
// must not have been re-sent
bool canBeUsedForRTT = std::none_of(_sentPacketDatas.begin(), _sentPacketDatas.end(),
[ack, previousAck](SentPacketData& sentPacketData)
{
return sentPacketData.sequenceNumber > previousAck
&& sentPacketData.sequenceNumber <= ack
&& sentPacketData.wasResent;
});
auto sendTime = it->timePoint;
// remove all sent packet times up to this sequence number
it = _sentPacketDatas.erase(_sentPacketDatas.begin(), it + 1);
// if we can use this ACK for an RTT calculation then do so
// returning false if we calculate an invalid RTT
if (canBeUsedForRTT && !calculateRTT(sendTime, receiveTime)) {
return false;
} else if (lastRTT == 0) {
// we do not allow a zero microsecond RTT (as per the UNIX kernel implementation of TCP Vegas)
lastRTT = 1;
} else if (lastRTT > MAX_RTT_SAMPLE_MICROSECONDS) {
// we cap the lastRTT to MAX_RTT_SAMPLE_MICROSECONDS to avoid overflows in window size calculations
lastRTT = MAX_RTT_SAMPLE_MICROSECONDS;
}
}
if (_ewmaRTT == -1) {
// first RTT sample - set _ewmaRTT to the value and set the variance to half the value
_ewmaRTT = lastRTT;
_rttVariance = lastRTT / 2;
} else {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation
// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/7-transport/Jacobson-88.pdf
// Estimated RTT = (1 - x)(estimatedRTT) + (x)(sampleRTT)
// (where x = 0.125 via Jacobson)
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
static const int RTT_ESTIMATION_ALPHA = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA = 4;
_ewmaRTT = (_ewmaRTT * (RTT_ESTIMATION_ALPHA - 1) + lastRTT) / RTT_ESTIMATION_ALPHA;
_rttVariance = (_rttVariance * (RTT_ESTIMATION_VARIANCE_ALPHA- 1)
+ abs(lastRTT - _ewmaRTT)) / RTT_ESTIMATION_VARIANCE_ALPHA;
}
// add 1 to the number of ACKs during this RTT
++_numACKs;
// keep track of the lowest RTT during connection
_baseRTT = std::min(_baseRTT, lastRTT);
// find the min RTT during the last RTT
_currentMinRTT = std::min(_currentMinRTT, lastRTT);
auto sinceLastAdjustment = duration_cast<microseconds>(p_high_resolution_clock::now() - _lastAdjustmentTime).count();
if (sinceLastAdjustment >= _ewmaRTT) {
performCongestionAvoidance(ack);
}
// remove this sent packet time from the hash
_sentPacketTimes.erase(it);
auto sinceLastAdjustment = duration_cast<microseconds>(p_high_resolution_clock::now() - _lastAdjustmentTime).count();
if (sinceLastAdjustment >= _ewmaRTT) {
performCongestionAvoidance(ack);
}
++_numACKSinceFastRetransmit;
// perform the fast re-transmit check if this is a duplicate ACK or if this is the first or second ACK
// after a previous fast re-transmit
if (ack == previousAck || _numACKSinceFastRetransmit < 3) {
// we may need to re-send ackNum + 1 if it has been more than our estimated timeout since it was sent
auto it = _sentPacketTimes.find(ack + 1);
if (it != _sentPacketTimes.end()) {
auto now = p_high_resolution_clock::now();
auto sinceSend = duration_cast<microseconds>(now - it->second).count();
if (sinceSend >= estimatedTimeout()) {
// break out of slow start, we've decided this is loss
_slowStart = false;
// reset the fast re-transmit counter
_numACKSinceFastRetransmit = 0;
// return true so the caller knows we needed a fast re-transmit
return true;
}
}
// if this is the 3rd duplicate ACK, we fallback to Reno's fast re-transmit
static const int RENO_FAST_RETRANSMIT_DUPLICATE_COUNT = 3;
++_duplicateACKCount;
if (ack == previousAck && _duplicateACKCount == RENO_FAST_RETRANSMIT_DUPLICATE_COUNT) {
// break out of slow start, we just hit loss
_slowStart = false;
// reset our fast re-transmit counters
_numACKSinceFastRetransmit = 0;
_duplicateACKCount = 0;
// return true so the caller knows we needed a fast re-transmit
return true;
}
if (wasDuplicateACK || _numACKSinceFastRetransmit < 3) {
return needsFastRetransmit(ack, wasDuplicateACK);
} else {
_duplicateACKCount = 0;
}
@ -141,6 +135,49 @@ bool TCPVegasCC::onACK(SequenceNumber ack, p_high_resolution_clock::time_point r
return false;
}
bool TCPVegasCC::needsFastRetransmit(SequenceNumber ack, bool wasDuplicateACK) {
// we may need to re-send ackNum + 1 if it has been more than our estimated timeout since it was sent
auto nextIt = std::find_if(_sentPacketDatas.begin(), _sentPacketDatas.end(), [ack](SentPacketData& packetTime){
return packetTime.sequenceNumber == ack + 1;
});
if (nextIt != _sentPacketDatas.end()) {
auto now = p_high_resolution_clock::now();
auto sinceSend = duration_cast<microseconds>(now - nextIt->timePoint).count();
if (sinceSend >= estimatedTimeout()) {
// break out of slow start, we've decided this is loss
_slowStart = false;
// reset the fast re-transmit counter
_numACKSinceFastRetransmit = 0;
// return true so the caller knows we needed a fast re-transmit
return true;
}
}
// if this is the 3rd duplicate ACK, we fallback to Reno's fast re-transmit
static const int RENO_FAST_RETRANSMIT_DUPLICATE_COUNT = 3;
++_duplicateACKCount;
if (wasDuplicateACK && _duplicateACKCount == RENO_FAST_RETRANSMIT_DUPLICATE_COUNT) {
// break out of slow start, we just hit loss
_slowStart = false;
// reset our fast re-transmit counters
_numACKSinceFastRetransmit = 0;
_duplicateACKCount = 0;
// return true so the caller knows we needed a fast re-transmit
return true;
}
return false;
}
void TCPVegasCC::performCongestionAvoidance(udt::SequenceNumber ack) {
static int VEGAS_ALPHA_SEGMENTS = 4;
static int VEGAS_BETA_SEGMENTS = 6;
@ -158,7 +195,7 @@ void TCPVegasCC::performCongestionAvoidance(udt::SequenceNumber ack) {
int64_t windowSizeDiff = (int64_t) _congestionWindowSize * (rtt - _baseRTT) / _baseRTT;
if (_numACKs <= 2) {
if (_numRTTs <= 2) {
performRenoCongestionAvoidance(ack);
} else {
if (_slowStart) {
@ -209,7 +246,7 @@ void TCPVegasCC::performCongestionAvoidance(udt::SequenceNumber ack) {
_currentMinRTT = std::numeric_limits<int>::max();
// reset our count of collected RTT samples
_numACKs = 0;
_numRTTs = 0;
}
@ -230,29 +267,29 @@ void TCPVegasCC::performRenoCongestionAvoidance(SequenceNumber ack) {
return;
}
int numAcked = _numACKs;
int numRTTCollected = _numRTTs;
if (_slowStart) {
// while in slow start we grow the congestion window by the number of ACKed packets
// allowing it to grow as high as the slow start threshold
int congestionWindow = _congestionWindowSize + numAcked;
int congestionWindow = _congestionWindowSize + numRTTCollected;
if (congestionWindow > udt::MAX_PACKETS_IN_FLIGHT) {
// we're done with slow start, set the congestion window to the slow start threshold
_congestionWindowSize = udt::MAX_PACKETS_IN_FLIGHT;
// figure out how many left over ACKs we should apply using the regular reno congestion avoidance
numAcked = congestionWindow - udt::MAX_PACKETS_IN_FLIGHT;
numRTTCollected = congestionWindow - udt::MAX_PACKETS_IN_FLIGHT;
} else {
_congestionWindowSize = congestionWindow;
numAcked = 0;
numRTTCollected = 0;
}
}
// grab the size of the window prior to reno additive increase
int preAIWindowSize = _congestionWindowSize;
if (numAcked > 0) {
if (numRTTCollected > 0) {
// Once we are out of slow start, we use additive increase to grow the window slowly.
// We grow the congestion window by a single packet everytime the entire congestion window is sent.
@ -263,7 +300,7 @@ void TCPVegasCC::performRenoCongestionAvoidance(SequenceNumber ack) {
}
// increase the window size by (1 / window size) for every ACK received
_ackAICount += numAcked;
_ackAICount += numRTTCollected;
if (_ackAICount >= preAIWindowSize) {
// when _ackAICount % preAIWindowSize == 0 then _ackAICount is 0
// when _ackAICount % preAIWindowSize != 0 then _ackAICount is _ackAICount - (_ackAICount % preAIWindowSize)
@ -277,8 +314,19 @@ void TCPVegasCC::performRenoCongestionAvoidance(SequenceNumber ack) {
}
void TCPVegasCC::onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {
if (_sentPacketTimes.find(seqNum) == _sentPacketTimes.end()) {
_sentPacketTimes[seqNum] = timePoint;
_sentPacketDatas.emplace_back(seqNum, timePoint);
}
void TCPVegasCC::onPacketReSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {
// look for our information for this sent packet
auto it = std::find_if(_sentPacketDatas.begin(), _sentPacketDatas.end(), [seqNum](SentPacketData& sentPacketInfo){
return sentPacketInfo.sequenceNumber == seqNum;
});
// if we found information for this packet (it hasn't been erased because it hasn't yet been ACKed)
// then mark it as re-sent so we know it cannot be used for RTT calculations
if (it != _sentPacketDatas.end()) {
it->wasResent = true;
}
}

View file

@ -30,6 +30,7 @@ public:
virtual void onTimeout() override {};
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) override;
virtual void onPacketReSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) override;
virtual int estimatedTimeout() const override;
@ -37,11 +38,23 @@ protected:
virtual void performCongestionAvoidance(SequenceNumber ack);
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) override { _lastACK = seqNum - 1; }
private:
bool calculateRTT(p_high_resolution_clock::time_point sendTime, p_high_resolution_clock::time_point receiveTime);
bool needsFastRetransmit(SequenceNumber ack, bool wasDuplicateACK);
bool isCongestionWindowLimited();
void performRenoCongestionAvoidance(SequenceNumber ack);
using PacketTimeList = std::map<SequenceNumber, p_high_resolution_clock::time_point>;
PacketTimeList _sentPacketTimes; // Map of sequence numbers to sent time
struct SentPacketData {
SentPacketData(SequenceNumber seqNum, p_high_resolution_clock::time_point tPoint)
: sequenceNumber(seqNum), timePoint(tPoint) {};
SequenceNumber sequenceNumber;
p_high_resolution_clock::time_point timePoint;
bool wasResent { false };
};
using PacketTimeList = std::vector<SentPacketData>;
PacketTimeList _sentPacketDatas; // association of sequence numbers to sent time, for RTT calc
p_high_resolution_clock::time_point _lastAdjustmentTime; // Time of last congestion control adjustment
@ -56,7 +69,7 @@ private:
int _ewmaRTT { -1 }; // Exponential weighted moving average RTT
int _rttVariance { 0 }; // Variance in collected RTT values
int _numACKs { 0 }; // Number of ACKs received during the last RTT (since last performed congestion avoidance)
int _numRTTs { 0 }; // Number of RTTs calculated during the last RTT (since last performed congestion avoidance)
int _ackAICount { 0 }; // Counter for number of ACKs received for Reno additive increase
int _duplicateACKCount { 0 }; // Counter for duplicate ACKs received