mirror of
https://github.com/lubosz/overte.git
synced 2025-04-09 09:44:33 +02:00
Cleanup udt dead code
This commit is contained in:
parent
db8846796f
commit
0bfe2671dd
14 changed files with 34 additions and 810 deletions
|
@ -945,22 +945,14 @@ void AssetServer::sendStatsPacket() {
|
|||
upstreamStats["2. Sent Packets"] = stat.second.sentPackets;
|
||||
upstreamStats["3. Recvd ACK"] = events[Events::ReceivedACK];
|
||||
upstreamStats["4. Procd ACK"] = events[Events::ProcessedACK];
|
||||
upstreamStats["5. Recvd LACK"] = events[Events::ReceivedLightACK];
|
||||
upstreamStats["6. Recvd NAK"] = events[Events::ReceivedNAK];
|
||||
upstreamStats["7. Recvd TNAK"] = events[Events::ReceivedTimeoutNAK];
|
||||
upstreamStats["8. Sent ACK2"] = events[Events::SentACK2];
|
||||
upstreamStats["9. Retransmitted"] = events[Events::Retransmission];
|
||||
upstreamStats["5. Retransmitted"] = events[Events::Retransmission];
|
||||
nodeStats["Upstream Stats"] = upstreamStats;
|
||||
|
||||
QJsonObject downstreamStats;
|
||||
downstreamStats["1. Recvd (P/s)"] = stat.second.receiveRate;
|
||||
downstreamStats["2. Recvd Packets"] = stat.second.receivedPackets;
|
||||
downstreamStats["3. Sent ACK"] = events[Events::SentACK];
|
||||
downstreamStats["4. Sent LACK"] = events[Events::SentLightACK];
|
||||
downstreamStats["5. Sent NAK"] = events[Events::SentNAK];
|
||||
downstreamStats["6. Sent TNAK"] = events[Events::SentTimeoutNAK];
|
||||
downstreamStats["7. Recvd ACK2"] = events[Events::ReceivedACK2];
|
||||
downstreamStats["8. Duplicates"] = events[Events::Duplicate];
|
||||
downstreamStats["4. Duplicates"] = events[Events::Duplicate];
|
||||
nodeStats["Downstream Stats"] = downstreamStats;
|
||||
|
||||
QString uuid;
|
||||
|
|
|
@ -39,191 +39,3 @@ void CongestionControl::setPacketSendPeriod(double newSendPeriod) {
|
|||
_packetSendPeriod = newSendPeriod;
|
||||
}
|
||||
}
|
||||
|
||||
DefaultCC::DefaultCC() :
|
||||
_lastDecreaseMaxSeq(SequenceNumber {SequenceNumber::MAX })
|
||||
{
|
||||
_mss = udt::MAX_PACKET_SIZE_WITH_UDP_HEADER;
|
||||
|
||||
_congestionWindowSize = 16;
|
||||
setPacketSendPeriod(1.0);
|
||||
}
|
||||
|
||||
bool DefaultCC::onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) {
|
||||
double increase = 0;
|
||||
|
||||
// Note from UDT original code:
|
||||
// The minimum increase parameter is increased from "1.0 / _mss" to 0.01
|
||||
// because the original was too small and caused sending rate to stay at low level
|
||||
// for long time.
|
||||
const double minimumIncrease = 0.01;
|
||||
|
||||
// we will only adjust once per sync interval so check that it has been at least that long now
|
||||
auto now = p_high_resolution_clock::now();
|
||||
if (duration_cast<microseconds>(now - _lastRCTime).count() < synInterval()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// our last rate increase time is now
|
||||
_lastRCTime = now;
|
||||
|
||||
if (_slowStart) {
|
||||
// we are in slow start phase - increase the congestion window size by the number of packets just ACKed
|
||||
_congestionWindowSize += seqlen(_lastACK, ackNum);
|
||||
|
||||
// update the last ACK
|
||||
_lastACK = ackNum;
|
||||
|
||||
// check if we can get out of slow start (is our new congestion window size bigger than the max)
|
||||
if (_congestionWindowSize > _maxCongestionWindowSize) {
|
||||
_slowStart = false;
|
||||
|
||||
if (_receiveRate > 0) {
|
||||
// if we have a valid receive rate we set the send period to whatever the receive rate dictates
|
||||
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
|
||||
} else {
|
||||
// no valid receive rate, packet send period is dictated by estimated RTT and current congestion window size
|
||||
setPacketSendPeriod((_rtt + synInterval()) / _congestionWindowSize);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not in slow start - window size should be arrival rate * (RTT + SYN) + 16
|
||||
_congestionWindowSize = _receiveRate / USECS_PER_SECOND * (_rtt + synInterval()) + 16;
|
||||
}
|
||||
|
||||
// during slow start we perform no rate increases
|
||||
if (_slowStart) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if loss has happened since the last rate increase we do not perform another increase
|
||||
if (_loss) {
|
||||
_loss = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
double capacitySpeedDelta = (_bandwidth - USECS_PER_SECOND / _packetSendPeriod);
|
||||
|
||||
// UDT uses what they call DAIMD - additive increase multiplicative decrease with decreasing increases
|
||||
// This factor is a protocol parameter that is part of the DAIMD algorithim
|
||||
static const int AIMD_DECREASING_INCREASE_FACTOR = 9;
|
||||
|
||||
if ((_packetSendPeriod > _lastDecreasePeriod) && ((_bandwidth / AIMD_DECREASING_INCREASE_FACTOR) < capacitySpeedDelta)) {
|
||||
capacitySpeedDelta = _bandwidth / AIMD_DECREASING_INCREASE_FACTOR;
|
||||
}
|
||||
|
||||
if (capacitySpeedDelta <= 0) {
|
||||
increase = minimumIncrease;
|
||||
} else {
|
||||
// use UDTs DAIMD algorithm to figure out what the send period increase factor should be
|
||||
|
||||
// inc = max(10 ^ ceil(log10(B * MSS * 8 ) * Beta / MSS, minimumIncrease)
|
||||
// B = estimated link capacity
|
||||
// Beta = 1.5 * 10^(-6)
|
||||
|
||||
static const double BETA = 0.0000015;
|
||||
static const double BITS_PER_BYTE = 8.0;
|
||||
|
||||
increase = pow(10.0, ceil(log10(capacitySpeedDelta * _mss * BITS_PER_BYTE))) * BETA / _mss;
|
||||
|
||||
if (increase < minimumIncrease) {
|
||||
increase = minimumIncrease;
|
||||
}
|
||||
}
|
||||
|
||||
setPacketSendPeriod((_packetSendPeriod * synInterval()) / (_packetSendPeriod * increase + synInterval()));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
|
||||
// stop the slow start if we haven't yet
|
||||
if (_slowStart) {
|
||||
stopSlowStart();
|
||||
|
||||
// if the change to send rate was driven by a known receive rate, then we don't continue with the decrease
|
||||
if (_receiveRate > 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
_loss = true;
|
||||
++_nakCount;
|
||||
|
||||
static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125;
|
||||
static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5;
|
||||
|
||||
// check if this NAK starts a new congestion period - this will be the case if the
|
||||
// NAK received occured for a packet sent after the last decrease
|
||||
if (rangeStart > _lastDecreaseMaxSeq) {
|
||||
_delayedDecrease = (rangeStart == rangeEnd);
|
||||
|
||||
_lastDecreasePeriod = _packetSendPeriod;
|
||||
|
||||
if (!_delayedDecrease) {
|
||||
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
|
||||
} else {
|
||||
_loss = false;
|
||||
}
|
||||
|
||||
// use EWMA to update the average number of NAKs per congestion
|
||||
static const double NAK_EWMA_ALPHA = 0.125;
|
||||
_avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA);
|
||||
|
||||
// update the count of NAKs and count of decreases in this interval
|
||||
_nakCount = 1;
|
||||
_decreaseCount = 1;
|
||||
|
||||
_lastDecreaseMaxSeq = _sendCurrSeqNum;
|
||||
|
||||
if (_avgNAKNum < 1) {
|
||||
_randomDecreaseThreshold = 1;
|
||||
} else {
|
||||
// avoid synchronous rate decrease across connections using randomization
|
||||
std::random_device rd;
|
||||
std::mt19937 generator(rd());
|
||||
std::uniform_int_distribution<> distribution(1, std::max(1, _avgNAKNum));
|
||||
|
||||
_randomDecreaseThreshold = distribution(generator);
|
||||
}
|
||||
} else if (_delayedDecrease && _nakCount == 2) {
|
||||
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
|
||||
} else if ((_decreaseCount++ < MAX_DECREASES_PER_CONGESTION_EPOCH) && ((_nakCount % _randomDecreaseThreshold) == 0)) {
|
||||
// there have been fewer than MAX_DECREASES_PER_CONGESTION_EPOCH AND this NAK matches the random count at which we
|
||||
// decided we would decrease the packet send period
|
||||
|
||||
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
|
||||
_lastDecreaseMaxSeq = _sendCurrSeqNum;
|
||||
}
|
||||
}
|
||||
|
||||
void DefaultCC::onTimeout() {
|
||||
if (_slowStart) {
|
||||
stopSlowStart();
|
||||
} else {
|
||||
// UDT used to do the following on timeout if not in slow start - we should check if it could be helpful
|
||||
_lastDecreasePeriod = _packetSendPeriod;
|
||||
_packetSendPeriod = ceil(_packetSendPeriod * 2);
|
||||
|
||||
// this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start
|
||||
_lastDecreaseMaxSeq = _lastACK;
|
||||
}
|
||||
}
|
||||
|
||||
void DefaultCC::stopSlowStart() {
|
||||
_slowStart = false;
|
||||
|
||||
if (_receiveRate > 0) {
|
||||
// Set the sending rate to the receiving rate.
|
||||
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
|
||||
} else {
|
||||
// If no receiving rate is observed, we have to compute the sending
|
||||
// rate according to the current window size, and decrease it
|
||||
// using the method below.
|
||||
setPacketSendPeriod(double(_congestionWindowSize) / (_rtt + synInterval()));
|
||||
}
|
||||
}
|
||||
|
||||
void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) {
|
||||
_lastACK = _lastDecreaseMaxSeq = seqNum - 1;
|
||||
}
|
||||
|
|
|
@ -44,21 +44,12 @@ public:
|
|||
// return value specifies if connection should perform a fast re-transmit of ACK + 1 (used in TCP style congestion control)
|
||||
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) { return false; }
|
||||
|
||||
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {}
|
||||
virtual void onTimeout() {}
|
||||
|
||||
virtual bool shouldNAK() { return true; }
|
||||
virtual bool shouldACK2() { return true; }
|
||||
virtual bool shouldProbe() { return true; }
|
||||
|
||||
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {}
|
||||
protected:
|
||||
void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
|
||||
void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; }
|
||||
|
||||
void setMSS(int mss) { _mss = mss; }
|
||||
void setMaxCongestionWindowSize(int window) { _maxCongestionWindowSize = window; }
|
||||
void setBandwidth(int bandwidth) { _bandwidth = bandwidth; }
|
||||
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) = 0;
|
||||
void setSendCurrentSequenceNumber(SequenceNumber seqNum) { _sendCurrSeqNum = seqNum; }
|
||||
void setReceiveRate(int rate) { _receiveRate = rate; }
|
||||
|
@ -67,8 +58,7 @@ protected:
|
|||
|
||||
double _packetSendPeriod { 1.0 }; // Packet sending period, in microseconds
|
||||
int _congestionWindowSize { 16 }; // Congestion window size, in packets
|
||||
|
||||
int _bandwidth { 0 }; // estimated bandwidth, packets per second
|
||||
|
||||
std::atomic<int> _maxBandwidth { -1 }; // Maximum desired bandwidth, bits per second
|
||||
int _maxCongestionWindowSize { 0 }; // maximum cwnd size, in packets
|
||||
|
||||
|
@ -81,13 +71,7 @@ private:
|
|||
CongestionControl(const CongestionControl& other) = delete;
|
||||
CongestionControl& operator=(const CongestionControl& other) = delete;
|
||||
|
||||
int _ackInterval { 0 }; // How many packets to send one ACK, in packets
|
||||
int _lightACKInterval { 64 }; // How many packets to send one light ACK, in packets
|
||||
|
||||
int _synInterval { DEFAULT_SYN_INTERVAL };
|
||||
|
||||
bool _userDefinedRTO { false }; // if the RTO value is defined by users
|
||||
int _rto { -1 }; // RTO value, microseconds
|
||||
};
|
||||
|
||||
|
||||
|
@ -105,35 +89,6 @@ public:
|
|||
virtual ~CongestionControlFactory() {}
|
||||
virtual std::unique_ptr<CongestionControl> create() override { return std::unique_ptr<T>(new T()); }
|
||||
};
|
||||
|
||||
class DefaultCC: public CongestionControl {
|
||||
public:
|
||||
DefaultCC();
|
||||
|
||||
public:
|
||||
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) override;
|
||||
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) override;
|
||||
virtual void onTimeout() override;
|
||||
|
||||
protected:
|
||||
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) override;
|
||||
|
||||
private:
|
||||
void stopSlowStart(); // stops the slow start on loss or timeout
|
||||
|
||||
p_high_resolution_clock::time_point _lastRCTime = p_high_resolution_clock::now(); // last rate increase time
|
||||
|
||||
bool _slowStart { true }; // if in slow start phase
|
||||
SequenceNumber _lastACK; // last ACKed sequence number from previous
|
||||
bool _loss { false }; // if loss happened since last rate increase
|
||||
SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened
|
||||
double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened
|
||||
int _nakCount { 0 }; // number of NAKs in congestion epoch
|
||||
int _randomDecreaseThreshold { 1 }; // random threshold on decrease by number of loss events
|
||||
int _avgNAKNum { 0 }; // average number of NAKs per congestion
|
||||
int _decreaseCount { 0 }; // number of decreases in a congestion epoch
|
||||
bool _delayedDecrease { false };
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -50,17 +50,11 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
|
|||
_congestionControl->setMaxCongestionWindowSize(_flowWindowSize);
|
||||
|
||||
// Setup packets
|
||||
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
|
||||
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK)
|
||||
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
|
||||
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
|
||||
static const int ACK2_PAYLOAD_BYTES = sizeof(SequenceNumber);
|
||||
static const int NAK_PACKET_PAYLOAD_BYTES = 2 * sizeof(SequenceNumber);
|
||||
static const int HANDSHAKE_ACK_PAYLOAD_BYTES = sizeof(SequenceNumber);
|
||||
|
||||
_ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
|
||||
_lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
|
||||
_ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
|
||||
_lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES);
|
||||
_handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, HANDSHAKE_ACK_PAYLOAD_BYTES);
|
||||
|
||||
|
||||
|
@ -135,7 +129,6 @@ SendQueue& Connection::getSendQueue() {
|
|||
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
|
||||
QObject::connect(_sendQueue.get(), &SendQueue::shortCircuitLoss, this, &Connection::queueShortCircuitLoss);
|
||||
|
||||
|
||||
// set defaults on the send queue from our congestion control object and estimatedTimeout()
|
||||
|
@ -143,7 +136,6 @@ SendQueue& Connection::getSendQueue() {
|
|||
_sendQueue->setSyncInterval(_synInterval);
|
||||
_sendQueue->setEstimatedTimeout(estimatedTimeout());
|
||||
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
|
||||
_sendQueue->setProbePacketEnabled(_congestionControl->shouldProbe());
|
||||
|
||||
// give the randomized sequence number to the congestion control object
|
||||
_congestionControl->setInitialSendSequenceNumber(_sendQueue->getCurrentSequenceNumber());
|
||||
|
@ -167,12 +159,6 @@ void Connection::queueTimeout() {
|
|||
});
|
||||
}
|
||||
|
||||
void Connection::queueShortCircuitLoss(quint32 sequenceNumber) {
|
||||
updateCongestionControlAndSendQueue([this, sequenceNumber] {
|
||||
_congestionControl->onLoss(SequenceNumber { sequenceNumber }, SequenceNumber { sequenceNumber });
|
||||
});
|
||||
}
|
||||
|
||||
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
|
||||
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
|
||||
getSendQueue().queuePacket(std::move(packet));
|
||||
|
@ -229,26 +215,6 @@ void Connection::sync() {
|
|||
// the receive side of this connection is expired
|
||||
_isReceivingData = false;
|
||||
}
|
||||
|
||||
// reset the number of light ACKs or non SYN ACKs during this sync interval
|
||||
_lightACKsDuringSYN = 1;
|
||||
_acksDuringSYN = 1;
|
||||
|
||||
if (_congestionControl->_ackInterval > 1) {
|
||||
// we send out a periodic ACK every rate control interval
|
||||
sendACK();
|
||||
}
|
||||
|
||||
if (_congestionControl->shouldNAK() && _lossList.getLength() > 0) {
|
||||
// check if we need to re-transmit a loss list
|
||||
// we do this if it has been longer than the current nakInterval since we last sent
|
||||
auto now = p_high_resolution_clock::now();
|
||||
|
||||
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
|
||||
// Send a timeout NAK packet
|
||||
sendTimeoutNAK();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -267,28 +233,10 @@ void Connection::recordRetransmission(int wireSize, SequenceNumber seqNum, p_hig
|
|||
|
||||
void Connection::sendACK(bool wasCausedBySyncTimeout) {
|
||||
static p_high_resolution_clock::time_point lastACKSendTime;
|
||||
auto currentTime = p_high_resolution_clock::now();
|
||||
|
||||
SequenceNumber nextACKNumber = nextACK();
|
||||
Q_ASSERT_X(nextACKNumber >= _lastSentACK, "Connection::sendACK", "Sending lower ACK, something is wrong");
|
||||
|
||||
// if our congestion control doesn't want to send an ACK for every packet received
|
||||
// check if we already sent this ACK
|
||||
if (_congestionControl->_ackInterval > 1 && nextACKNumber == _lastSentACK) {
|
||||
|
||||
// if we use ACK2s, check if the receiving side already confirmed receipt of this ACK
|
||||
if (_congestionControl->shouldACK2() && nextACKNumber < _lastReceivedAcknowledgedACK) {
|
||||
// we already got an ACK2 for this ACK we would be sending, don't bother
|
||||
return;
|
||||
}
|
||||
|
||||
// We will re-send if it has been more than the estimated timeout since the last ACK
|
||||
microseconds sinceLastACK = duration_cast<microseconds>(currentTime - lastACKSendTime);
|
||||
|
||||
if (sinceLastACK.count() < estimatedTimeout()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
// we have received new packets since the last sent ACK
|
||||
// or our congestion control dictates that we always send ACKs
|
||||
|
||||
|
@ -296,10 +244,7 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
|
|||
_lastSentACK = nextACKNumber;
|
||||
|
||||
_ackPacket->reset(); // We need to reset it every time.
|
||||
|
||||
// pack in the ACK sub-sequence number
|
||||
_ackPacket->writePrimitive(++_currentACKSubSequenceNumber);
|
||||
|
||||
|
||||
// pack in the ACK number
|
||||
_ackPacket->writePrimitive(nextACKNumber);
|
||||
|
||||
|
@ -311,17 +256,14 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
|
|||
_ackPacket->writePrimitive((int32_t) udt::MAX_PACKETS_IN_FLIGHT);
|
||||
|
||||
if (wasCausedBySyncTimeout) {
|
||||
// grab the up to date packet receive speed and estimated bandwidth
|
||||
// grab the up to date packet receive speed
|
||||
int32_t packetReceiveSpeed = _receiveWindow.getPacketReceiveSpeed();
|
||||
int32_t estimatedBandwidth = _receiveWindow.getEstimatedBandwidth();
|
||||
|
||||
// update those values in our connection stats
|
||||
_stats.recordReceiveRate(packetReceiveSpeed);
|
||||
_stats.recordEstimatedBandwidth(estimatedBandwidth);
|
||||
|
||||
// pack in the receive speed and estimatedBandwidth
|
||||
// pack in the receive speed
|
||||
_ackPacket->writePrimitive(packetReceiveSpeed);
|
||||
_ackPacket->writePrimitive(estimatedBandwidth);
|
||||
}
|
||||
|
||||
// record this as the last ACK send time
|
||||
|
@ -330,94 +272,9 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) {
|
|||
// have the socket send off our packet
|
||||
_parentSocket->writeBasePacket(*_ackPacket, _destination);
|
||||
|
||||
Q_ASSERT_X(_sentACKs.empty() || _sentACKs.back().first + 1 == _currentACKSubSequenceNumber,
|
||||
"Connection::sendACK", "Adding an invalid ACK to _sentACKs");
|
||||
|
||||
// write this ACK to the map of sent ACKs
|
||||
_sentACKs.push_back({ _currentACKSubSequenceNumber, { nextACKNumber, p_high_resolution_clock::now() }});
|
||||
|
||||
// reset the number of data packets received since last ACK
|
||||
_packetsSinceACK = 0;
|
||||
|
||||
_stats.record(ConnectionStats::Stats::SentACK);
|
||||
}
|
||||
|
||||
void Connection::sendLightACK() {
|
||||
SequenceNumber nextACKNumber = nextACK();
|
||||
|
||||
if (nextACKNumber == _lastReceivedAcknowledgedACK) {
|
||||
// we already got an ACK2 for this ACK we would be sending, don't bother
|
||||
return;
|
||||
}
|
||||
|
||||
// reset the lightACKPacket before we go to write the ACK to it
|
||||
_lightACKPacket->reset();
|
||||
|
||||
// pack in the ACK
|
||||
_lightACKPacket->writePrimitive(nextACKNumber);
|
||||
|
||||
// have the socket send off our packet immediately
|
||||
_parentSocket->writeBasePacket(*_lightACKPacket, _destination);
|
||||
|
||||
_stats.record(ConnectionStats::Stats::SentLightACK);
|
||||
}
|
||||
|
||||
void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) {
|
||||
// reset the ACK2 Packet before writing the sub-sequence number to it
|
||||
_ack2Packet->reset();
|
||||
|
||||
// write the sub sequence number for this ACK2
|
||||
_ack2Packet->writePrimitive(currentACKSubSequenceNumber);
|
||||
|
||||
// send the ACK2 packet
|
||||
_parentSocket->writeBasePacket(*_ack2Packet, _destination);
|
||||
|
||||
// update the last sent ACK2 and the last ACK2 send time
|
||||
_lastSentACK2 = currentACKSubSequenceNumber;
|
||||
|
||||
_stats.record(ConnectionStats::Stats::SentACK2);
|
||||
}
|
||||
|
||||
void Connection::sendNAK(SequenceNumber sequenceNumberRecieved) {
|
||||
_lossReport->reset(); // We need to reset it every time.
|
||||
|
||||
// pack in the loss report
|
||||
_lossReport->writePrimitive(_lastReceivedSequenceNumber + 1);
|
||||
if (_lastReceivedSequenceNumber + 1 != sequenceNumberRecieved - 1) {
|
||||
_lossReport->writePrimitive(sequenceNumberRecieved - 1);
|
||||
}
|
||||
|
||||
// have the parent socket send off our packet immediately
|
||||
_parentSocket->writeBasePacket(*_lossReport, _destination);
|
||||
|
||||
// record our last NAK time
|
||||
_lastNAKTime = p_high_resolution_clock::now();
|
||||
|
||||
_stats.record(ConnectionStats::Stats::SentNAK);
|
||||
}
|
||||
|
||||
void Connection::sendTimeoutNAK() {
|
||||
if (_lossList.getLength() > 0) {
|
||||
|
||||
int timeoutPayloadSize = std::min((int) (_lossList.getLength() * 2 * sizeof(SequenceNumber)),
|
||||
ControlPacket::maxPayloadSize());
|
||||
|
||||
// construct a NAK packet that will hold all of the lost sequence numbers
|
||||
auto lossListPacket = ControlPacket::create(ControlPacket::TimeoutNAK, timeoutPayloadSize);
|
||||
|
||||
// Pack in the lost sequence numbers
|
||||
_lossList.write(*lossListPacket, timeoutPayloadSize / (2 * sizeof(SequenceNumber)));
|
||||
|
||||
// have our parent socket send off this control packet
|
||||
_parentSocket->writeBasePacket(*lossListPacket, _destination);
|
||||
|
||||
// record this as the last NAK time
|
||||
_lastNAKTime = p_high_resolution_clock::now();
|
||||
|
||||
_stats.record(ConnectionStats::Stats::SentTimeoutNAK);
|
||||
}
|
||||
}
|
||||
|
||||
SequenceNumber Connection::nextACK() const {
|
||||
if (_lossList.getLength() > 0) {
|
||||
return _lossList.getFirstSequenceNumber() - 1;
|
||||
|
@ -452,21 +309,6 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
|
|||
// mark our last receive time as now (to push the potential expiry farther)
|
||||
_lastReceiveTime = p_high_resolution_clock::now();
|
||||
|
||||
if (_congestionControl->shouldProbe()) {
|
||||
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet
|
||||
if (((uint32_t) sequenceNumber & 0xF) == 0) {
|
||||
_receiveWindow.onProbePair1Arrival();
|
||||
} else if (((uint32_t) sequenceNumber & 0xF) == 1) {
|
||||
// only use this packet for bandwidth estimation if we didn't just receive a control packet in its place
|
||||
if (!_receivedControlProbeTail) {
|
||||
_receiveWindow.onProbePair2Arrival();
|
||||
} else {
|
||||
// reset our control probe tail marker so the next probe that comes with data can be used
|
||||
_receivedControlProbeTail = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_receiveWindow.onPacketArrival();
|
||||
|
||||
// If this is not the next sequence number, report loss
|
||||
|
@ -476,24 +318,6 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
|
|||
} else {
|
||||
_lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1);
|
||||
}
|
||||
|
||||
if (_congestionControl->shouldNAK()) {
|
||||
// Send a NAK packet
|
||||
sendNAK(sequenceNumber);
|
||||
|
||||
// figure out when we should send the next loss report, if we haven't heard anything back
|
||||
_nakInterval = estimatedTimeout();
|
||||
|
||||
int receivedPacketsPerSecond = _receiveWindow.getPacketReceiveSpeed();
|
||||
if (receivedPacketsPerSecond > 0) {
|
||||
// the NAK interval is at least the _minNAKInterval
|
||||
// but might be the time required for all lost packets to be retransmitted
|
||||
_nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond));
|
||||
}
|
||||
|
||||
// the NAK interval is at least the _minNAKInterval but might be the value calculated above, if that is larger
|
||||
_nakInterval = std::max(_nakInterval, _minNAKInterval);
|
||||
}
|
||||
}
|
||||
|
||||
bool wasDuplicate = false;
|
||||
|
@ -505,22 +329,9 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
|
|||
// Otherwise, it could be a resend, try and remove it from the loss list
|
||||
wasDuplicate = !_lossList.remove(sequenceNumber);
|
||||
}
|
||||
|
||||
// increment the counters for data packets received
|
||||
++_packetsSinceACK;
|
||||
|
||||
// check if we need to send an ACK, according to CC params
|
||||
if (_congestionControl->_ackInterval == 1) {
|
||||
// using a congestion control that ACKs every packet (like TCP Vegas)
|
||||
sendACK(true);
|
||||
} else if (_congestionControl->_ackInterval > 0 && _packetsSinceACK >= _congestionControl->_ackInterval * _acksDuringSYN) {
|
||||
_acksDuringSYN++;
|
||||
sendACK(false);
|
||||
} else if (_congestionControl->_lightACKInterval > 0
|
||||
&& _packetsSinceACK >= _congestionControl->_lightACKInterval * _lightACKsDuringSYN) {
|
||||
sendLightACK();
|
||||
++_lightACKsDuringSYN;
|
||||
}
|
||||
|
||||
// using a congestion control that ACKs every packet (like TCP Vegas)
|
||||
sendACK(true);
|
||||
|
||||
if (wasDuplicate) {
|
||||
_stats.record(ConnectionStats::Stats::Duplicate);
|
||||
|
@ -544,37 +355,12 @@ void Connection::processControl(ControlPacketPointer controlPacket) {
|
|||
processACK(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::LightACK:
|
||||
if (_hasReceivedHandshakeACK) {
|
||||
processLightACK(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::ACK2:
|
||||
if (_hasReceivedHandshake) {
|
||||
processACK2(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::NAK:
|
||||
if (_hasReceivedHandshakeACK) {
|
||||
processNAK(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::TimeoutNAK:
|
||||
if (_hasReceivedHandshakeACK) {
|
||||
processTimeoutNAK(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::Handshake:
|
||||
processHandshake(move(controlPacket));
|
||||
break;
|
||||
case ControlPacket::HandshakeACK:
|
||||
processHandshakeACK(move(controlPacket));
|
||||
break;
|
||||
case ControlPacket::ProbeTail:
|
||||
if (_isReceivingData) {
|
||||
processProbeTail(move(controlPacket));
|
||||
}
|
||||
break;
|
||||
case ControlPacket::HandshakeRequest:
|
||||
if (_hasReceivedHandshakeACK) {
|
||||
// We're already in a state where we've received a handshake ack, so we are likely in a state
|
||||
|
@ -587,31 +373,16 @@ void Connection::processControl(ControlPacketPointer controlPacket) {
|
|||
stopSendQueue();
|
||||
}
|
||||
break;
|
||||
case ControlPacket::LightACK:
|
||||
case ControlPacket::ACK2:
|
||||
case ControlPacket::NAK:
|
||||
case ControlPacket::TimeoutNAK:
|
||||
case ControlPacket::ProbeTail:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void Connection::processACK(ControlPacketPointer controlPacket) {
|
||||
// read the ACK sub-sequence number
|
||||
SequenceNumber currentACKSubSequenceNumber;
|
||||
controlPacket->readPrimitive(¤tACKSubSequenceNumber);
|
||||
|
||||
// Check if we need send an ACK2 for this ACK
|
||||
// This will be the case if it has been longer than the sync interval OR
|
||||
// it looks like they haven't received our ACK2 for this ACK
|
||||
auto currentTime = p_high_resolution_clock::now();
|
||||
static p_high_resolution_clock::time_point lastACK2SendTime =
|
||||
p_high_resolution_clock::now() - std::chrono::microseconds(_synInterval);
|
||||
|
||||
microseconds sinceLastACK2 = duration_cast<microseconds>(currentTime - lastACK2SendTime);
|
||||
|
||||
if (_congestionControl->shouldACK2()
|
||||
&& (sinceLastACK2.count() >= _synInterval || currentACKSubSequenceNumber == _lastSentACK2)) {
|
||||
// Send ACK2 packet
|
||||
sendACK2(currentACKSubSequenceNumber);
|
||||
|
||||
lastACK2SendTime = p_high_resolution_clock::now();
|
||||
}
|
||||
|
||||
// read the ACKed sequence number
|
||||
SequenceNumber ack;
|
||||
controlPacket->readPrimitive(&ack);
|
||||
|
@ -661,27 +432,23 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
|
|||
_congestionControl->setRTT(_rtt);
|
||||
|
||||
if (controlPacket->bytesLeftToRead() > 0) {
|
||||
int32_t receiveRate, bandwidth;
|
||||
int32_t receiveRate;
|
||||
|
||||
Q_ASSERT_X(controlPacket->bytesLeftToRead() == sizeof(receiveRate) + sizeof(bandwidth),
|
||||
Q_ASSERT_X(controlPacket->bytesLeftToRead() == sizeof(receiveRate),
|
||||
"Connection::processACK", "sync interval ACK packet does not contain expected data");
|
||||
|
||||
controlPacket->readPrimitive(&receiveRate);
|
||||
controlPacket->readPrimitive(&bandwidth);
|
||||
|
||||
// set the delivery rate and bandwidth for congestion control
|
||||
// set the delivery rate for congestion control
|
||||
// these are calculated using an EWMA
|
||||
static const int EMWA_ALPHA_NUMERATOR = 8;
|
||||
|
||||
// record these samples in connection stats
|
||||
_stats.recordSendRate(receiveRate);
|
||||
_stats.recordEstimatedBandwidth(bandwidth);
|
||||
|
||||
_deliveryRate = (_deliveryRate * (EMWA_ALPHA_NUMERATOR - 1) + receiveRate) / EMWA_ALPHA_NUMERATOR;
|
||||
_bandwidth = (_bandwidth * (EMWA_ALPHA_NUMERATOR - 1) + bandwidth) / EMWA_ALPHA_NUMERATOR;
|
||||
|
||||
_congestionControl->setReceiveRate(_deliveryRate);
|
||||
_congestionControl->setBandwidth(_bandwidth);
|
||||
}
|
||||
|
||||
// give this ACK to the congestion control and update the send queue parameters
|
||||
|
@ -695,92 +462,6 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
|
|||
_stats.record(ConnectionStats::Stats::ProcessedACK);
|
||||
}
|
||||
|
||||
void Connection::processLightACK(ControlPacketPointer controlPacket) {
|
||||
// read the ACKed sequence number
|
||||
SequenceNumber ack;
|
||||
controlPacket->readPrimitive(&ack);
|
||||
|
||||
// must be larger than the last received ACK to be processed
|
||||
if (ack > _lastReceivedACK) {
|
||||
// NOTE: the following makes sense in UDT where there is a dynamic receive buffer.
|
||||
// Since we have a receive buffer that is always of a default size, we don't use this light ACK to
|
||||
// drop the flow window size.
|
||||
|
||||
// decrease the flow window size by the offset between the last received ACK and this ACK
|
||||
// _flowWindowSize -= seqoff(_lastReceivedACK, ack);
|
||||
|
||||
// update the last received ACK to the this one
|
||||
_lastReceivedACK = ack;
|
||||
|
||||
// send light ACK to the send queue
|
||||
getSendQueue().ack(ack);
|
||||
}
|
||||
|
||||
_stats.record(ConnectionStats::Stats::ReceivedLightACK);
|
||||
}
|
||||
|
||||
void Connection::processACK2(ControlPacketPointer controlPacket) {
|
||||
// pull the sub sequence number from the packet
|
||||
SequenceNumber subSequenceNumber;
|
||||
controlPacket->readPrimitive(&subSequenceNumber);
|
||||
|
||||
// check if we had that subsequence number in our map
|
||||
auto it = std::find_if_not(_sentACKs.begin(), _sentACKs.end(), [&subSequenceNumber](const ACKListPair& pair){
|
||||
return pair.first < subSequenceNumber;
|
||||
});
|
||||
|
||||
if (it != _sentACKs.end()) {
|
||||
if (it->first == subSequenceNumber){
|
||||
// update the RTT using the ACK window
|
||||
|
||||
// calculate the RTT (time now - time ACK sent)
|
||||
auto now = p_high_resolution_clock::now();
|
||||
int rtt = duration_cast<microseconds>(now - it->second.second).count();
|
||||
|
||||
updateRTT(rtt);
|
||||
// write this RTT to stats
|
||||
_stats.recordRTT(rtt);
|
||||
|
||||
// set the RTT for congestion control
|
||||
_congestionControl->setRTT(_rtt);
|
||||
|
||||
// update the last ACKed ACK
|
||||
if (it->second.first > _lastReceivedAcknowledgedACK) {
|
||||
_lastReceivedAcknowledgedACK = it->second.first;
|
||||
}
|
||||
} else if (it->first < subSequenceNumber) {
|
||||
Q_UNREACHABLE();
|
||||
}
|
||||
}
|
||||
|
||||
// erase this sub-sequence number and anything below it now that we've gotten our timing information
|
||||
_sentACKs.erase(_sentACKs.begin(), it);
|
||||
|
||||
_stats.record(ConnectionStats::Stats::ReceivedACK2);
|
||||
}
|
||||
|
||||
void Connection::processNAK(ControlPacketPointer controlPacket) {
|
||||
// read the loss report
|
||||
SequenceNumber start, end;
|
||||
controlPacket->readPrimitive(&start);
|
||||
|
||||
end = start;
|
||||
|
||||
if (controlPacket->bytesLeftToRead() >= (qint64)sizeof(SequenceNumber)) {
|
||||
controlPacket->readPrimitive(&end);
|
||||
}
|
||||
|
||||
// send that off to the send queue so it knows there was loss
|
||||
getSendQueue().nak(start, end);
|
||||
|
||||
// give the loss to the congestion control object and update the send queue parameters
|
||||
updateCongestionControlAndSendQueue([this, start, end] {
|
||||
_congestionControl->onLoss(start, end);
|
||||
});
|
||||
|
||||
_stats.record(ConnectionStats::Stats::ReceivedNAK);
|
||||
}
|
||||
|
||||
void Connection::processHandshake(ControlPacketPointer controlPacket) {
|
||||
SequenceNumber initialSequenceNumber;
|
||||
controlPacket->readPrimitive(&initialSequenceNumber);
|
||||
|
@ -829,68 +510,27 @@ void Connection::processHandshakeACK(ControlPacketPointer controlPacket) {
|
|||
}
|
||||
}
|
||||
|
||||
void Connection::processTimeoutNAK(ControlPacketPointer controlPacket) {
|
||||
// Override SendQueue's LossList with the timeout NAK list
|
||||
getSendQueue().overrideNAKListFromPacket(*controlPacket);
|
||||
|
||||
// we don't tell the congestion control object there was loss here - this matches UDTs implementation
|
||||
// a possible improvement would be to tell it which new loss this timeout packet told us about
|
||||
|
||||
_stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK);
|
||||
}
|
||||
|
||||
void Connection::processProbeTail(ControlPacketPointer controlPacket) {
|
||||
if (((uint32_t) _lastReceivedSequenceNumber & 0xF) == 0) {
|
||||
// this is the second packet in a probe set so we can estimate bandwidth
|
||||
// the sender sent this to us in lieu of sending new data (because they didn't have any)
|
||||
|
||||
#ifdef UDT_CONNECTION_DEBUG
|
||||
qCDebug(networking) << "Processing second packet of probe from control packet instead of data packet";
|
||||
#endif
|
||||
|
||||
_receiveWindow.onProbePair2Arrival();
|
||||
|
||||
// mark that we processed a control packet for the second in the pair and we should not mark
|
||||
// the next data packet received
|
||||
_receivedControlProbeTail = true;
|
||||
}
|
||||
}
|
||||
|
||||
void Connection::resetReceiveState() {
|
||||
|
||||
// reset all SequenceNumber member variables back to default
|
||||
SequenceNumber defaultSequenceNumber;
|
||||
|
||||
_lastReceivedSequenceNumber = defaultSequenceNumber;
|
||||
|
||||
_lastReceivedAcknowledgedACK = defaultSequenceNumber;
|
||||
_currentACKSubSequenceNumber = defaultSequenceNumber;
|
||||
|
||||
|
||||
_lastSentACK = defaultSequenceNumber;
|
||||
|
||||
// clear the sent ACKs
|
||||
_sentACKs.clear();
|
||||
|
||||
// clear the loss list and _lastNAKTime
|
||||
// clear the loss list
|
||||
_lossList.clear();
|
||||
_lastNAKTime = p_high_resolution_clock::now();
|
||||
|
||||
// the _nakInterval need not be reset, that will happen on loss
|
||||
|
||||
// clear sync variables
|
||||
_isReceivingData = false;
|
||||
_connectionStart = p_high_resolution_clock::now();
|
||||
|
||||
_acksDuringSYN = 1;
|
||||
_lightACKsDuringSYN = 1;
|
||||
_packetsSinceACK = 0;
|
||||
|
||||
// reset RTT to initial value
|
||||
resetRTT();
|
||||
|
||||
// clear the intervals in the receive window
|
||||
_receiveWindow.reset();
|
||||
_receivedControlProbeTail = false;
|
||||
|
||||
// clear any pending received messages
|
||||
for (auto& pendingMessage : _pendingReceivedMessages) {
|
||||
|
@ -920,7 +560,7 @@ void Connection::updateRTT(int rtt) {
|
|||
}
|
||||
|
||||
int Connection::estimatedTimeout() const {
|
||||
return _congestionControl->_userDefinedRTO ? _congestionControl->_rto : _rtt + _rttVariance * 4;
|
||||
return _rtt + _rttVariance * 4;
|
||||
}
|
||||
|
||||
void Connection::updateCongestionControlAndSendQueue(std::function<void ()> congestionCallback) {
|
||||
|
|
|
@ -51,9 +51,6 @@ private:
|
|||
class Connection : public QObject {
|
||||
Q_OBJECT
|
||||
public:
|
||||
using SequenceNumberTimePair = std::pair<SequenceNumber, p_high_resolution_clock::time_point>;
|
||||
using ACKListPair = std::pair<SequenceNumber, SequenceNumberTimePair>;
|
||||
using SentACKList = std::list<ACKListPair>;
|
||||
using ControlPacketPointer = std::unique_ptr<ControlPacket>;
|
||||
|
||||
Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
|
||||
|
@ -87,23 +84,13 @@ private slots:
|
|||
void recordRetransmission(int wireSize, SequenceNumber sequenceNumber, p_high_resolution_clock::time_point timePoint);
|
||||
void queueInactive();
|
||||
void queueTimeout();
|
||||
void queueShortCircuitLoss(quint32 sequenceNumber);
|
||||
|
||||
private:
|
||||
void sendACK(bool wasCausedBySyncTimeout = true);
|
||||
void sendLightACK();
|
||||
void sendACK2(SequenceNumber currentACKSubSequenceNumber);
|
||||
void sendNAK(SequenceNumber sequenceNumberRecieved);
|
||||
void sendTimeoutNAK();
|
||||
|
||||
void processACK(ControlPacketPointer controlPacket);
|
||||
void processLightACK(ControlPacketPointer controlPacket);
|
||||
void processACK2(ControlPacketPointer controlPacket);
|
||||
void processNAK(ControlPacketPointer controlPacket);
|
||||
void processTimeoutNAK(ControlPacketPointer controlPacket);
|
||||
void processHandshake(ControlPacketPointer controlPacket);
|
||||
void processHandshakeACK(ControlPacketPointer controlPacket);
|
||||
void processProbeTail(ControlPacketPointer controlPacket);
|
||||
|
||||
void resetReceiveState();
|
||||
void resetRTT();
|
||||
|
@ -120,10 +107,6 @@ private:
|
|||
|
||||
int _synInterval; // Periodical Rate Control Interval, in microseconds
|
||||
|
||||
int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss
|
||||
int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms
|
||||
p_high_resolution_clock::time_point _lastNAKTime = p_high_resolution_clock::now();
|
||||
|
||||
bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server
|
||||
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
|
||||
bool _didRequestHandshake { false }; // flag for request of handshake from server
|
||||
|
@ -141,43 +124,28 @@ private:
|
|||
LossList _lossList; // List of all missing packets
|
||||
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
|
||||
SequenceNumber _lastReceivedACK; // The last ACK received
|
||||
SequenceNumber _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
|
||||
SequenceNumber _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs)
|
||||
|
||||
SequenceNumber _lastSentACK; // The last sent ACK
|
||||
SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
|
||||
|
||||
int _acksDuringSYN { 1 }; // The number of non-SYN ACKs sent during SYN
|
||||
int _lightACKsDuringSYN { 1 }; // The number of lite ACKs sent during SYN interval
|
||||
|
||||
int32_t _rtt; // RTT, in microseconds
|
||||
int32_t _rttVariance; // RTT variance
|
||||
int _flowWindowSize { udt::MAX_PACKETS_IN_FLIGHT }; // Flow control window size
|
||||
|
||||
int _bandwidth { 1 }; // Exponential moving average for estimated bandwidth, in packets per second
|
||||
int _deliveryRate { 16 }; // Exponential moving average for receiver's receive rate, in packets per second
|
||||
|
||||
SentACKList _sentACKs; // Map of ACK sub-sequence numbers to ACKed sequence number and sent time
|
||||
|
||||
Socket* _parentSocket { nullptr };
|
||||
HifiSockAddr _destination;
|
||||
|
||||
PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for timing
|
||||
bool _receivedControlProbeTail { false }; // Marker for receipt of control packet probe tail (in lieu of probe with data)
|
||||
PacketTimeWindow _receiveWindow { 16 }; // Window of interval between packets (16)
|
||||
|
||||
std::unique_ptr<CongestionControl> _congestionControl;
|
||||
|
||||
std::unique_ptr<SendQueue> _sendQueue;
|
||||
|
||||
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
|
||||
|
||||
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval
|
||||
|
||||
// Re-used control packets
|
||||
ControlPacketPointer _ackPacket;
|
||||
ControlPacketPointer _lightACKPacket;
|
||||
ControlPacketPointer _ack2Packet;
|
||||
ControlPacketPointer _lossReport;
|
||||
ControlPacketPointer _handshakeACK;
|
||||
|
||||
ConnectionStats _stats;
|
||||
|
|
|
@ -95,11 +95,6 @@ void ConnectionStats::recordReceiveRate(int sample) {
|
|||
_total.receiveRate = (int)((_total.receiveRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
|
||||
}
|
||||
|
||||
void ConnectionStats::recordEstimatedBandwidth(int sample) {
|
||||
_currentSample.estimatedBandwith = sample;
|
||||
_total.estimatedBandwith = (int)((_total.estimatedBandwith * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
|
||||
}
|
||||
|
||||
void ConnectionStats::recordRTT(int sample) {
|
||||
_currentSample.rtt = sample;
|
||||
_total.rtt = (int)((_total.rtt * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
|
||||
|
@ -122,14 +117,6 @@ QDebug& operator<<(QDebug&& debug, const udt::ConnectionStats::Stats& stats) {
|
|||
HIFI_LOG_EVENT(SentACK)
|
||||
HIFI_LOG_EVENT(ReceivedACK)
|
||||
HIFI_LOG_EVENT(ProcessedACK)
|
||||
HIFI_LOG_EVENT(SentLightACK)
|
||||
HIFI_LOG_EVENT(ReceivedLightACK)
|
||||
HIFI_LOG_EVENT(SentACK2)
|
||||
HIFI_LOG_EVENT(ReceivedACK2)
|
||||
HIFI_LOG_EVENT(SentNAK)
|
||||
HIFI_LOG_EVENT(ReceivedNAK)
|
||||
HIFI_LOG_EVENT(SentTimeoutNAK)
|
||||
HIFI_LOG_EVENT(ReceivedTimeoutNAK)
|
||||
HIFI_LOG_EVENT(Retransmission)
|
||||
HIFI_LOG_EVENT(Duplicate)
|
||||
;
|
||||
|
|
|
@ -24,14 +24,6 @@ public:
|
|||
SentACK,
|
||||
ReceivedACK,
|
||||
ProcessedACK,
|
||||
SentLightACK,
|
||||
ReceivedLightACK,
|
||||
SentACK2,
|
||||
ReceivedACK2,
|
||||
SentNAK,
|
||||
ReceivedNAK,
|
||||
SentTimeoutNAK,
|
||||
ReceivedTimeoutNAK,
|
||||
Retransmission,
|
||||
Duplicate,
|
||||
|
||||
|
@ -89,7 +81,6 @@ public:
|
|||
|
||||
void recordSendRate(int sample);
|
||||
void recordReceiveRate(int sample);
|
||||
void recordEstimatedBandwidth(int sample);
|
||||
void recordRTT(int sample);
|
||||
void recordCongestionWindowSize(int sample);
|
||||
void recordPacketSendPeriod(int sample);
|
||||
|
|
|
@ -20,20 +20,16 @@ using namespace udt;
|
|||
using namespace std::chrono;
|
||||
|
||||
static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000; // 1s
|
||||
static const int DEFAULT_PROBE_INTERVAL_MICROSECONDS = 1000; // 1ms
|
||||
|
||||
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals) :
|
||||
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals) :
|
||||
_numPacketIntervals(numPacketIntervals),
|
||||
_numProbeIntervals(numProbeIntervals),
|
||||
_packetIntervals(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS),
|
||||
_probeIntervals(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS)
|
||||
_packetIntervals(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void PacketTimeWindow::reset() {
|
||||
_packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS);
|
||||
_probeIntervals.assign(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS);
|
||||
}
|
||||
|
||||
template <typename Iterator>
|
||||
|
@ -87,11 +83,6 @@ int32_t PacketTimeWindow::getPacketReceiveSpeed() const {
|
|||
return meanOfMedianFilteredValues(_packetIntervals, _numPacketIntervals, _numPacketIntervals / 2);
|
||||
}
|
||||
|
||||
int32_t PacketTimeWindow::getEstimatedBandwidth() const {
|
||||
// return mean value of median filtered values (per second)
|
||||
return meanOfMedianFilteredValues(_probeIntervals, _numProbeIntervals);
|
||||
}
|
||||
|
||||
void PacketTimeWindow::onPacketArrival() {
|
||||
|
||||
// take the current time
|
||||
|
@ -108,18 +99,3 @@ void PacketTimeWindow::onPacketArrival() {
|
|||
// remember this as the last packet arrival time
|
||||
_lastPacketTime = now;
|
||||
}
|
||||
|
||||
void PacketTimeWindow::onProbePair1Arrival() {
|
||||
// take the current time as the first probe time
|
||||
_firstProbeTime = p_high_resolution_clock::now();
|
||||
}
|
||||
|
||||
void PacketTimeWindow::onProbePair2Arrival() {
|
||||
// store the interval between the two probes
|
||||
auto now = p_high_resolution_clock::now();
|
||||
|
||||
_probeIntervals[_currentProbeInterval++] = duration_cast<microseconds>(now - _firstProbeTime).count();
|
||||
|
||||
// reset the currentProbeInterval index when it wraps
|
||||
_currentProbeInterval %= _numProbeIntervals;
|
||||
}
|
||||
|
|
|
@ -22,28 +22,21 @@ namespace udt {
|
|||
|
||||
class PacketTimeWindow {
|
||||
public:
|
||||
PacketTimeWindow(int numPacketIntervals = 16, int numProbeIntervals = 16);
|
||||
PacketTimeWindow(int numPacketIntervals = 16);
|
||||
|
||||
void onPacketArrival();
|
||||
void onProbePair1Arrival();
|
||||
void onProbePair2Arrival();
|
||||
|
||||
int32_t getPacketReceiveSpeed() const;
|
||||
int32_t getEstimatedBandwidth() const;
|
||||
|
||||
void reset();
|
||||
private:
|
||||
int _numPacketIntervals { 0 }; // the number of packet intervals to store
|
||||
int _numProbeIntervals { 0 }; // the number of probe intervals to store
|
||||
|
||||
int _currentPacketInterval { 0 }; // index for the current packet interval
|
||||
int _currentProbeInterval { 0 }; // index for the current probe interval
|
||||
|
||||
std::vector<int> _packetIntervals; // vector of microsecond intervals between packet arrivals
|
||||
std::vector<int> _probeIntervals; // vector of microsecond intervals between probe pair arrivals
|
||||
|
||||
p_high_resolution_clock::time_point _lastPacketTime = p_high_resolution_clock::now(); // the time_point when last packet arrived
|
||||
p_high_resolution_clock::time_point _firstProbeTime = p_high_resolution_clock::now(); // the time_point when first probe in pair arrived
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -164,16 +164,6 @@ void SendQueue::ack(SequenceNumber ack) {
|
|||
_emptyCondition.notify_one();
|
||||
}
|
||||
|
||||
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
|
||||
{
|
||||
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||
_naks.insert(start, end);
|
||||
}
|
||||
|
||||
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
|
||||
_emptyCondition.notify_one();
|
||||
}
|
||||
|
||||
void SendQueue::fastRetransmit(udt::SequenceNumber ack) {
|
||||
{
|
||||
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||
|
@ -184,28 +174,6 @@ void SendQueue::fastRetransmit(udt::SequenceNumber ack) {
|
|||
_emptyCondition.notify_one();
|
||||
}
|
||||
|
||||
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
|
||||
{
|
||||
std::lock_guard<std::mutex> nakLocker(_naksLock);
|
||||
_naks.clear();
|
||||
|
||||
SequenceNumber first, second;
|
||||
while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) {
|
||||
packet.readPrimitive(&first);
|
||||
packet.readPrimitive(&second);
|
||||
|
||||
if (first == second) {
|
||||
_naks.append(first);
|
||||
} else {
|
||||
_naks.append(first, second);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
|
||||
_emptyCondition.notify_one();
|
||||
}
|
||||
|
||||
void SendQueue::sendHandshake() {
|
||||
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
|
||||
if (!_hasReceivedHandshakeACK) {
|
||||
|
@ -268,8 +236,6 @@ bool SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
|
|||
_naks.append(sequenceNumber);
|
||||
}
|
||||
|
||||
emit shortCircuitLoss(quint32(sequenceNumber));
|
||||
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
|
@ -385,10 +351,6 @@ void SendQueue::run() {
|
|||
}
|
||||
}
|
||||
|
||||
void SendQueue::setProbePacketEnabled(bool enabled) {
|
||||
_shouldSendProbes = enabled;
|
||||
}
|
||||
|
||||
int SendQueue::maybeSendNewPacket() {
|
||||
if (!isFlowWindowFull()) {
|
||||
// we didn't re-send a packet, so time to send a new one
|
||||
|
@ -397,40 +359,15 @@ int SendQueue::maybeSendNewPacket() {
|
|||
SequenceNumber nextNumber = getNextSequenceNumber();
|
||||
|
||||
// grab the first packet we will send
|
||||
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
|
||||
Q_ASSERT(firstPacket);
|
||||
std::unique_ptr<Packet> packet = _packets.takePacket();
|
||||
Q_ASSERT(packet);
|
||||
|
||||
|
||||
// attempt to send the first packet
|
||||
if (sendNewPacketAndAddToSentList(move(firstPacket), nextNumber)) {
|
||||
std::unique_ptr<Packet> secondPacket;
|
||||
bool shouldSendPairTail = false;
|
||||
// attempt to send the packet
|
||||
sendNewPacketAndAddToSentList(move(packet), nextNumber);
|
||||
|
||||
if (_shouldSendProbes && ((uint32_t) nextNumber & 0xF) == 0) {
|
||||
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
|
||||
// pull off a second packet if we can before we unlock
|
||||
shouldSendPairTail = true;
|
||||
|
||||
secondPacket = _packets.takePacket();
|
||||
}
|
||||
|
||||
// do we have a second in a pair to send as well?
|
||||
if (secondPacket) {
|
||||
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
|
||||
} else if (shouldSendPairTail) {
|
||||
// we didn't get a second packet to send in the probe pair
|
||||
// send a control packet of type ProbePairTail so the receiver can still do
|
||||
// proper bandwidth estimation
|
||||
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
|
||||
_socket->writeBasePacket(*pairTailPacket, _destination);
|
||||
}
|
||||
|
||||
// return the number of attempted packet sends
|
||||
return shouldSendPairTail ? 2 : 1;
|
||||
} else {
|
||||
// we attempted to send a single packet, return 1
|
||||
return 1;
|
||||
}
|
||||
// we attempted to send a packet, return 1
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -69,16 +69,12 @@ public:
|
|||
|
||||
void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
|
||||
void setSyncInterval(int syncInterval) { _syncInterval = syncInterval; }
|
||||
|
||||
void setProbePacketEnabled(bool enabled);
|
||||
|
||||
public slots:
|
||||
void stop();
|
||||
|
||||
void ack(SequenceNumber ack);
|
||||
void nak(SequenceNumber start, SequenceNumber end);
|
||||
void fastRetransmit(SequenceNumber ack);
|
||||
void overrideNAKListFromPacket(ControlPacket& packet);
|
||||
void handshakeACK();
|
||||
|
||||
signals:
|
||||
|
@ -87,7 +83,6 @@ signals:
|
|||
|
||||
void queueInactive();
|
||||
|
||||
void shortCircuitLoss(quint32 sequenceNumber);
|
||||
void timeout();
|
||||
|
||||
private slots:
|
||||
|
@ -145,9 +140,6 @@ private:
|
|||
std::condition_variable _handshakeACKCondition;
|
||||
|
||||
std::condition_variable_any _emptyCondition;
|
||||
|
||||
|
||||
std::atomic<bool> _shouldSendProbes { true };
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -21,8 +21,6 @@ TCPVegasCC::TCPVegasCC() {
|
|||
_packetSendPeriod = 0.0;
|
||||
_congestionWindowSize = 2;
|
||||
|
||||
setAckInterval(1); // TCP sends an ACK for every packet received
|
||||
|
||||
// set our minimum RTT variables to the maximum possible value
|
||||
// we can't do this as a member initializer until our VS has support for constexpr
|
||||
_currentMinRTT = std::numeric_limits<int>::max();
|
||||
|
|
|
@ -27,13 +27,8 @@ public:
|
|||
TCPVegasCC();
|
||||
|
||||
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) override;
|
||||
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) override {};
|
||||
virtual void onTimeout() override {};
|
||||
|
||||
virtual bool shouldNAK() override { return false; }
|
||||
virtual bool shouldACK2() override { return false; }
|
||||
virtual bool shouldProbe() override { return false; }
|
||||
|
||||
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) override;
|
||||
|
||||
protected:
|
||||
|
@ -65,7 +60,6 @@ private:
|
|||
int _duplicateACKCount { 0 }; // Counter for duplicate ACKs received
|
||||
|
||||
int _slowStartOddAdjust { 0 }; // Marker for every window adjustment every other RTT in slow-start
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -58,14 +58,12 @@ const QCommandLineOption STATS_INTERVAL {
|
|||
|
||||
const QStringList CLIENT_STATS_TABLE_HEADERS {
|
||||
"Send (Mb/s)", "Est. Max (Mb/s)", "RTT (ms)", "CW (P)", "Period (us)",
|
||||
"Recv ACK", "Procd ACK", "Recv LACK", "Recv NAK", "Recv TNAK",
|
||||
"Sent ACK2", "Sent Packets", "Re-sent Packets"
|
||||
"Recv ACK", "Procd ACK", "Sent Packets", "Re-sent Packets"
|
||||
};
|
||||
|
||||
const QStringList SERVER_STATS_TABLE_HEADERS {
|
||||
" Mb/s ", "Recv Mb/s", "Est. Max (Mb/s)", "RTT (ms)", "CW (P)",
|
||||
"Sent ACK", "Sent LACK", "Sent NAK", "Sent TNAK",
|
||||
"Recv ACK2", "Duplicates (P)"
|
||||
"Sent ACK", "Duplicates (P)"
|
||||
};
|
||||
|
||||
UDTTest::UDTTest(int& argc, char** argv) :
|
||||
|
@ -387,11 +385,6 @@ void UDTTest::sampleStats() {
|
|||
QString::number(stats.packetSendPeriod).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ProcessedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedLightACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedTimeoutNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK2]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.sentPackets).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::Retransmission]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size())
|
||||
};
|
||||
|
||||
|
@ -420,10 +413,6 @@ void UDTTest::sampleStats() {
|
|||
QString::number(stats.rtt / USECS_PER_MSEC, 'f', 2).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.congestionWindowSize).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::SentLightACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::SentNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::SentTimeoutNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK2]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
|
||||
QString::number(stats.events[udt::ConnectionStats::Stats::Duplicate]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size())
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue