Merge pull request #13632 from Atlante45/fix/udt-rtt

Fix potential reliable connection lockup.
This commit is contained in:
John Conklin II 2018-08-16 13:15:23 -07:00 committed by GitHub
commit bbf6162361
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 48 additions and 1234 deletions

View file

@ -945,22 +945,14 @@ void AssetServer::sendStatsPacket() {
upstreamStats["2. Sent Packets"] = stat.second.sentPackets;
upstreamStats["3. Recvd ACK"] = events[Events::ReceivedACK];
upstreamStats["4. Procd ACK"] = events[Events::ProcessedACK];
upstreamStats["5. Recvd LACK"] = events[Events::ReceivedLightACK];
upstreamStats["6. Recvd NAK"] = events[Events::ReceivedNAK];
upstreamStats["7. Recvd TNAK"] = events[Events::ReceivedTimeoutNAK];
upstreamStats["8. Sent ACK2"] = events[Events::SentACK2];
upstreamStats["9. Retransmitted"] = events[Events::Retransmission];
upstreamStats["5. Retransmitted"] = events[Events::Retransmission];
nodeStats["Upstream Stats"] = upstreamStats;
QJsonObject downstreamStats;
downstreamStats["1. Recvd (P/s)"] = stat.second.receiveRate;
downstreamStats["2. Recvd Packets"] = stat.second.receivedPackets;
downstreamStats["3. Sent ACK"] = events[Events::SentACK];
downstreamStats["4. Sent LACK"] = events[Events::SentLightACK];
downstreamStats["5. Sent NAK"] = events[Events::SentNAK];
downstreamStats["6. Sent TNAK"] = events[Events::SentTimeoutNAK];
downstreamStats["7. Recvd ACK2"] = events[Events::ReceivedACK2];
downstreamStats["8. Duplicates"] = events[Events::Duplicate];
downstreamStats["4. Duplicates"] = events[Events::Duplicate];
nodeStats["Downstream Stats"] = downstreamStats;
QString uuid;

View file

@ -39,191 +39,3 @@ void CongestionControl::setPacketSendPeriod(double newSendPeriod) {
_packetSendPeriod = newSendPeriod;
}
}
DefaultCC::DefaultCC() :
_lastDecreaseMaxSeq(SequenceNumber {SequenceNumber::MAX })
{
_mss = udt::MAX_PACKET_SIZE_WITH_UDP_HEADER;
_congestionWindowSize = 16;
setPacketSendPeriod(1.0);
}
bool DefaultCC::onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) {
double increase = 0;
// Note from UDT original code:
// The minimum increase parameter is increased from "1.0 / _mss" to 0.01
// because the original was too small and caused sending rate to stay at low level
// for long time.
const double minimumIncrease = 0.01;
// we will only adjust once per sync interval so check that it has been at least that long now
auto now = p_high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastRCTime).count() < synInterval()) {
return false;
}
// our last rate increase time is now
_lastRCTime = now;
if (_slowStart) {
// we are in slow start phase - increase the congestion window size by the number of packets just ACKed
_congestionWindowSize += seqlen(_lastACK, ackNum);
// update the last ACK
_lastACK = ackNum;
// check if we can get out of slow start (is our new congestion window size bigger than the max)
if (_congestionWindowSize > _maxCongestionWindowSize) {
_slowStart = false;
if (_receiveRate > 0) {
// if we have a valid receive rate we set the send period to whatever the receive rate dictates
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
} else {
// no valid receive rate, packet send period is dictated by estimated RTT and current congestion window size
setPacketSendPeriod((_rtt + synInterval()) / _congestionWindowSize);
}
}
} else {
// not in slow start - window size should be arrival rate * (RTT + SYN) + 16
_congestionWindowSize = _receiveRate / USECS_PER_SECOND * (_rtt + synInterval()) + 16;
}
// during slow start we perform no rate increases
if (_slowStart) {
return false;
}
// if loss has happened since the last rate increase we do not perform another increase
if (_loss) {
_loss = false;
return false;
}
double capacitySpeedDelta = (_bandwidth - USECS_PER_SECOND / _packetSendPeriod);
// UDT uses what they call DAIMD - additive increase multiplicative decrease with decreasing increases
// This factor is a protocol parameter that is part of the DAIMD algorithim
static const int AIMD_DECREASING_INCREASE_FACTOR = 9;
if ((_packetSendPeriod > _lastDecreasePeriod) && ((_bandwidth / AIMD_DECREASING_INCREASE_FACTOR) < capacitySpeedDelta)) {
capacitySpeedDelta = _bandwidth / AIMD_DECREASING_INCREASE_FACTOR;
}
if (capacitySpeedDelta <= 0) {
increase = minimumIncrease;
} else {
// use UDTs DAIMD algorithm to figure out what the send period increase factor should be
// inc = max(10 ^ ceil(log10(B * MSS * 8 ) * Beta / MSS, minimumIncrease)
// B = estimated link capacity
// Beta = 1.5 * 10^(-6)
static const double BETA = 0.0000015;
static const double BITS_PER_BYTE = 8.0;
increase = pow(10.0, ceil(log10(capacitySpeedDelta * _mss * BITS_PER_BYTE))) * BETA / _mss;
if (increase < minimumIncrease) {
increase = minimumIncrease;
}
}
setPacketSendPeriod((_packetSendPeriod * synInterval()) / (_packetSendPeriod * increase + synInterval()));
return false;
}
void DefaultCC::onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {
// stop the slow start if we haven't yet
if (_slowStart) {
stopSlowStart();
// if the change to send rate was driven by a known receive rate, then we don't continue with the decrease
if (_receiveRate > 0) {
return;
}
}
_loss = true;
++_nakCount;
static const double INTER_PACKET_ARRIVAL_INCREASE = 1.125;
static const int MAX_DECREASES_PER_CONGESTION_EPOCH = 5;
// check if this NAK starts a new congestion period - this will be the case if the
// NAK received occured for a packet sent after the last decrease
if (rangeStart > _lastDecreaseMaxSeq) {
_delayedDecrease = (rangeStart == rangeEnd);
_lastDecreasePeriod = _packetSendPeriod;
if (!_delayedDecrease) {
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
} else {
_loss = false;
}
// use EWMA to update the average number of NAKs per congestion
static const double NAK_EWMA_ALPHA = 0.125;
_avgNAKNum = (int)ceil(_avgNAKNum * (1 - NAK_EWMA_ALPHA) + _nakCount * NAK_EWMA_ALPHA);
// update the count of NAKs and count of decreases in this interval
_nakCount = 1;
_decreaseCount = 1;
_lastDecreaseMaxSeq = _sendCurrSeqNum;
if (_avgNAKNum < 1) {
_randomDecreaseThreshold = 1;
} else {
// avoid synchronous rate decrease across connections using randomization
std::random_device rd;
std::mt19937 generator(rd());
std::uniform_int_distribution<> distribution(1, std::max(1, _avgNAKNum));
_randomDecreaseThreshold = distribution(generator);
}
} else if (_delayedDecrease && _nakCount == 2) {
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
} else if ((_decreaseCount++ < MAX_DECREASES_PER_CONGESTION_EPOCH) && ((_nakCount % _randomDecreaseThreshold) == 0)) {
// there have been fewer than MAX_DECREASES_PER_CONGESTION_EPOCH AND this NAK matches the random count at which we
// decided we would decrease the packet send period
setPacketSendPeriod(ceil(_packetSendPeriod * INTER_PACKET_ARRIVAL_INCREASE));
_lastDecreaseMaxSeq = _sendCurrSeqNum;
}
}
void DefaultCC::onTimeout() {
if (_slowStart) {
stopSlowStart();
} else {
// UDT used to do the following on timeout if not in slow start - we should check if it could be helpful
_lastDecreasePeriod = _packetSendPeriod;
_packetSendPeriod = ceil(_packetSendPeriod * 2);
// this seems odd - the last ack they were setting _lastDecreaseMaxSeq to only applies to slow start
_lastDecreaseMaxSeq = _lastACK;
}
}
void DefaultCC::stopSlowStart() {
_slowStart = false;
if (_receiveRate > 0) {
// Set the sending rate to the receiving rate.
setPacketSendPeriod(USECS_PER_SECOND / _receiveRate);
} else {
// If no receiving rate is observed, we have to compute the sending
// rate according to the current window size, and decrease it
// using the method below.
setPacketSendPeriod(double(_congestionWindowSize) / (_rtt + synInterval()));
}
}
void DefaultCC::setInitialSendSequenceNumber(udt::SequenceNumber seqNum) {
_lastACK = _lastDecreaseMaxSeq = seqNum - 1;
}

View file

@ -32,11 +32,9 @@ class CongestionControl {
friend class Connection;
public:
CongestionControl() {};
CongestionControl(int synInterval) : _synInterval(synInterval) {}
virtual ~CongestionControl() {}
int synInterval() const { return _synInterval; }
CongestionControl() = default;
virtual ~CongestionControl() = default;
void setMaxBandwidth(int maxBandwidth);
virtual void init() {}
@ -44,50 +42,28 @@ public:
// return value specifies if connection should perform a fast re-transmit of ACK + 1 (used in TCP style congestion control)
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) { return false; }
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) {}
virtual void onTimeout() {}
virtual bool shouldNAK() { return true; }
virtual bool shouldACK2() { return true; }
virtual bool shouldProbe() { return true; }
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) {}
virtual int estimatedTimeout() const = 0;
protected:
void setAckInterval(int ackInterval) { _ackInterval = ackInterval; }
void setRTO(int rto) { _userDefinedRTO = true; _rto = rto; }
void setMSS(int mss) { _mss = mss; }
void setMaxCongestionWindowSize(int window) { _maxCongestionWindowSize = window; }
void setBandwidth(int bandwidth) { _bandwidth = bandwidth; }
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) = 0;
void setSendCurrentSequenceNumber(SequenceNumber seqNum) { _sendCurrSeqNum = seqNum; }
void setReceiveRate(int rate) { _receiveRate = rate; }
void setRTT(int rtt) { _rtt = rtt; }
void setPacketSendPeriod(double newSendPeriod); // call this internally to ensure send period doesn't go past max bandwidth
double _packetSendPeriod { 1.0 }; // Packet sending period, in microseconds
int _congestionWindowSize { 16 }; // Congestion window size, in packets
int _bandwidth { 0 }; // estimated bandwidth, packets per second
std::atomic<int> _maxBandwidth { -1 }; // Maximum desired bandwidth, bits per second
int _maxCongestionWindowSize { 0 }; // maximum cwnd size, in packets
int _mss { 0 }; // Maximum Packet Size, including all packet headers
SequenceNumber _sendCurrSeqNum; // current maximum seq num sent out
int _receiveRate { 0 }; // packet arrive rate at receiver side, packets per second
int _rtt { 0 }; // current estimated RTT, microsecond
private:
CongestionControl(const CongestionControl& other) = delete;
CongestionControl& operator=(const CongestionControl& other) = delete;
int _ackInterval { 0 }; // How many packets to send one ACK, in packets
int _lightACKInterval { 64 }; // How many packets to send one light ACK, in packets
int _synInterval { DEFAULT_SYN_INTERVAL };
bool _userDefinedRTO { false }; // if the RTO value is defined by users
int _rto { -1 }; // RTO value, microseconds
};
@ -95,8 +71,6 @@ class CongestionControlVirtualFactory {
public:
virtual ~CongestionControlVirtualFactory() {}
static int synInterval() { return DEFAULT_SYN_INTERVAL; }
virtual std::unique_ptr<CongestionControl> create() = 0;
};
@ -105,35 +79,6 @@ public:
virtual ~CongestionControlFactory() {}
virtual std::unique_ptr<CongestionControl> create() override { return std::unique_ptr<T>(new T()); }
};
class DefaultCC: public CongestionControl {
public:
DefaultCC();
public:
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) override;
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) override;
virtual void onTimeout() override;
protected:
virtual void setInitialSendSequenceNumber(SequenceNumber seqNum) override;
private:
void stopSlowStart(); // stops the slow start on loss or timeout
p_high_resolution_clock::time_point _lastRCTime = p_high_resolution_clock::now(); // last rate increase time
bool _slowStart { true }; // if in slow start phase
SequenceNumber _lastACK; // last ACKed sequence number from previous
bool _loss { false }; // if loss happened since last rate increase
SequenceNumber _lastDecreaseMaxSeq; // max pkt seq num sent out when last decrease happened
double _lastDecreasePeriod { 1 }; // value of _packetSendPeriod when last decrease happened
int _nakCount { 0 }; // number of NAKs in congestion epoch
int _randomDecreaseThreshold { 1 }; // random threshold on decrease by number of loss events
int _avgNAKNum { 0 }; // average number of NAKs per congestion
int _decreaseCount { 0 }; // number of decreases in a congestion epoch
bool _delayedDecrease { false };
};
}

View file

@ -39,28 +39,12 @@ Connection::Connection(Socket* parentSocket, HifiSockAddr destination, std::uniq
Q_ASSERT_X(_congestionControl, "Connection::Connection", "Must be called with a valid CongestionControl object");
_congestionControl->init();
// setup default SYN, RTT and RTT Variance based on the SYN interval in CongestionControl object
_synInterval = _congestionControl->synInterval();
resetRTT();
// set the initial RTT and flow window size on congestion control object
_congestionControl->setRTT(_rtt);
_congestionControl->setMaxCongestionWindowSize(_flowWindowSize);
// Setup packets
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber)
+ sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t);
static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int ACK2_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int NAK_PACKET_PAYLOAD_BYTES = 2 * sizeof(SequenceNumber);
static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber);
static const int HANDSHAKE_ACK_PAYLOAD_BYTES = sizeof(SequenceNumber);
_ackPacket = ControlPacket::create(ControlPacket::ACK, ACK_PACKET_PAYLOAD_BYTES);
_lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES);
_ack2Packet = ControlPacket::create(ControlPacket::ACK2, ACK2_PAYLOAD_BYTES);
_lossReport = ControlPacket::create(ControlPacket::NAK, NAK_PACKET_PAYLOAD_BYTES);
_handshakeACK = ControlPacket::create(ControlPacket::HandshakeACK, HANDSHAKE_ACK_PAYLOAD_BYTES);
@ -101,11 +85,6 @@ void Connection::stopSendQueue() {
}
}
void Connection::resetRTT() {
_rtt = _synInterval * 10;
_rttVariance = _rtt / 2;
}
void Connection::setMaxBandwidth(int maxBandwidth) {
_congestionControl->setMaxBandwidth(maxBandwidth);
}
@ -135,15 +114,12 @@ SendQueue& Connection::getSendQueue() {
QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission);
QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive);
QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout);
QObject::connect(_sendQueue.get(), &SendQueue::shortCircuitLoss, this, &Connection::queueShortCircuitLoss);
// set defaults on the send queue from our congestion control object and estimatedTimeout()
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setSyncInterval(_synInterval);
_sendQueue->setEstimatedTimeout(estimatedTimeout());
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
_sendQueue->setProbePacketEnabled(_congestionControl->shouldProbe());
_sendQueue->setEstimatedTimeout(_congestionControl->estimatedTimeout());
_sendQueue->setFlowWindowSize(_congestionControl->_congestionWindowSize);
// give the randomized sequence number to the congestion control object
_congestionControl->setInitialSendSequenceNumber(_sendQueue->getCurrentSequenceNumber());
@ -167,12 +143,6 @@ void Connection::queueTimeout() {
});
}
void Connection::queueShortCircuitLoss(quint32 sequenceNumber) {
updateCongestionControlAndSendQueue([this, sequenceNumber] {
_congestionControl->onLoss(SequenceNumber { sequenceNumber }, SequenceNumber { sequenceNumber });
});
}
void Connection::sendReliablePacket(std::unique_ptr<Packet> packet) {
Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably.");
getSendQueue().queuePacket(std::move(packet));
@ -213,43 +183,6 @@ void Connection::queueReceivedMessagePacket(std::unique_ptr<Packet> packet) {
}
void Connection::sync() {
if (_isReceivingData) {
// check if we should expire the receive portion of this connection
// this occurs if it has been 16 timeouts since the last data received and at least 5 seconds
static const int NUM_TIMEOUTS_BEFORE_EXPIRY = 16;
static const int MIN_SECONDS_BEFORE_EXPIRY = 5;
auto now = p_high_resolution_clock::now();
auto sincePacketReceive = now - _lastReceiveTime;
if (duration_cast<microseconds>(sincePacketReceive).count() >= NUM_TIMEOUTS_BEFORE_EXPIRY * estimatedTimeout()
&& duration_cast<seconds>(sincePacketReceive).count() >= MIN_SECONDS_BEFORE_EXPIRY ) {
// the receive side of this connection is expired
_isReceivingData = false;
}
// reset the number of light ACKs or non SYN ACKs during this sync interval
_lightACKsDuringSYN = 1;
_acksDuringSYN = 1;
if (_congestionControl->_ackInterval > 1) {
// we send out a periodic ACK every rate control interval
sendACK();
}
if (_congestionControl->shouldNAK() && _lossList.getLength() > 0) {
// check if we need to re-transmit a loss list
// we do this if it has been longer than the current nakInterval since we last sent
auto now = p_high_resolution_clock::now();
if (duration_cast<microseconds>(now - _lastNAKTime).count() >= _nakInterval) {
// Send a timeout NAK packet
sendTimeoutNAK();
}
}
}
}
void Connection::recordSentPackets(int wireSize, int payloadSize,
@ -265,159 +198,23 @@ void Connection::recordRetransmission(int wireSize, SequenceNumber seqNum, p_hig
_congestionControl->onPacketSent(wireSize, seqNum, timePoint);
}
void Connection::sendACK(bool wasCausedBySyncTimeout) {
static p_high_resolution_clock::time_point lastACKSendTime;
auto currentTime = p_high_resolution_clock::now();
void Connection::sendACK() {
SequenceNumber nextACKNumber = nextACK();
Q_ASSERT_X(nextACKNumber >= _lastSentACK, "Connection::sendACK", "Sending lower ACK, something is wrong");
// if our congestion control doesn't want to send an ACK for every packet received
// check if we already sent this ACK
if (_congestionControl->_ackInterval > 1 && nextACKNumber == _lastSentACK) {
// if we use ACK2s, check if the receiving side already confirmed receipt of this ACK
if (_congestionControl->shouldACK2() && nextACKNumber < _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
// We will re-send if it has been more than the estimated timeout since the last ACK
microseconds sinceLastACK = duration_cast<microseconds>(currentTime - lastACKSendTime);
if (sinceLastACK.count() < estimatedTimeout()) {
return;
}
}
// we have received new packets since the last sent ACK
// or our congestion control dictates that we always send ACKs
// update the last sent ACK
_lastSentACK = nextACKNumber;
_ackPacket->reset(); // We need to reset it every time.
// pack in the ACK sub-sequence number
_ackPacket->writePrimitive(++_currentACKSubSequenceNumber);
// pack in the ACK number
_ackPacket->writePrimitive(nextACKNumber);
// pack in the RTT and variance
_ackPacket->writePrimitive(_rtt);
// pack the available buffer size, in packets
// in our implementation we have no hard limit on receive buffer size, send the default value
_ackPacket->writePrimitive((int32_t) udt::MAX_PACKETS_IN_FLIGHT);
if (wasCausedBySyncTimeout) {
// grab the up to date packet receive speed and estimated bandwidth
int32_t packetReceiveSpeed = _receiveWindow.getPacketReceiveSpeed();
int32_t estimatedBandwidth = _receiveWindow.getEstimatedBandwidth();
// update those values in our connection stats
_stats.recordReceiveRate(packetReceiveSpeed);
_stats.recordEstimatedBandwidth(estimatedBandwidth);
// pack in the receive speed and estimatedBandwidth
_ackPacket->writePrimitive(packetReceiveSpeed);
_ackPacket->writePrimitive(estimatedBandwidth);
}
// record this as the last ACK send time
lastACKSendTime = p_high_resolution_clock::now();
// have the socket send off our packet
_parentSocket->writeBasePacket(*_ackPacket, _destination);
Q_ASSERT_X(_sentACKs.empty() || _sentACKs.back().first + 1 == _currentACKSubSequenceNumber,
"Connection::sendACK", "Adding an invalid ACK to _sentACKs");
// write this ACK to the map of sent ACKs
_sentACKs.push_back({ _currentACKSubSequenceNumber, { nextACKNumber, p_high_resolution_clock::now() }});
// reset the number of data packets received since last ACK
_packetsSinceACK = 0;
_stats.record(ConnectionStats::Stats::SentACK);
}
void Connection::sendLightACK() {
SequenceNumber nextACKNumber = nextACK();
if (nextACKNumber == _lastReceivedAcknowledgedACK) {
// we already got an ACK2 for this ACK we would be sending, don't bother
return;
}
// reset the lightACKPacket before we go to write the ACK to it
_lightACKPacket->reset();
// pack in the ACK
_lightACKPacket->writePrimitive(nextACKNumber);
// have the socket send off our packet immediately
_parentSocket->writeBasePacket(*_lightACKPacket, _destination);
_stats.record(ConnectionStats::Stats::SentLightACK);
}
void Connection::sendACK2(SequenceNumber currentACKSubSequenceNumber) {
// reset the ACK2 Packet before writing the sub-sequence number to it
_ack2Packet->reset();
// write the sub sequence number for this ACK2
_ack2Packet->writePrimitive(currentACKSubSequenceNumber);
// send the ACK2 packet
_parentSocket->writeBasePacket(*_ack2Packet, _destination);
// update the last sent ACK2 and the last ACK2 send time
_lastSentACK2 = currentACKSubSequenceNumber;
_stats.record(ConnectionStats::Stats::SentACK2);
}
void Connection::sendNAK(SequenceNumber sequenceNumberRecieved) {
_lossReport->reset(); // We need to reset it every time.
// pack in the loss report
_lossReport->writePrimitive(_lastReceivedSequenceNumber + 1);
if (_lastReceivedSequenceNumber + 1 != sequenceNumberRecieved - 1) {
_lossReport->writePrimitive(sequenceNumberRecieved - 1);
}
// have the parent socket send off our packet immediately
_parentSocket->writeBasePacket(*_lossReport, _destination);
// record our last NAK time
_lastNAKTime = p_high_resolution_clock::now();
_stats.record(ConnectionStats::Stats::SentNAK);
}
void Connection::sendTimeoutNAK() {
if (_lossList.getLength() > 0) {
int timeoutPayloadSize = std::min((int) (_lossList.getLength() * 2 * sizeof(SequenceNumber)),
ControlPacket::maxPayloadSize());
// construct a NAK packet that will hold all of the lost sequence numbers
auto lossListPacket = ControlPacket::create(ControlPacket::TimeoutNAK, timeoutPayloadSize);
// Pack in the lost sequence numbers
_lossList.write(*lossListPacket, timeoutPayloadSize / (2 * sizeof(SequenceNumber)));
// have our parent socket send off this control packet
_parentSocket->writeBasePacket(*lossListPacket, _destination);
// record this as the last NAK time
_lastNAKTime = p_high_resolution_clock::now();
_stats.record(ConnectionStats::Stats::SentTimeoutNAK);
}
}
SequenceNumber Connection::nextACK() const {
if (_lossList.getLength() > 0) {
return _lossList.getFirstSequenceNumber() - 1;
@ -447,27 +244,8 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
return false;
}
_isReceivingData = true;
// mark our last receive time as now (to push the potential expiry farther)
_lastReceiveTime = p_high_resolution_clock::now();
if (_congestionControl->shouldProbe()) {
// check if this is a packet pair we should estimate bandwidth from, or just a regular packet
if (((uint32_t) sequenceNumber & 0xF) == 0) {
_receiveWindow.onProbePair1Arrival();
} else if (((uint32_t) sequenceNumber & 0xF) == 1) {
// only use this packet for bandwidth estimation if we didn't just receive a control packet in its place
if (!_receivedControlProbeTail) {
_receiveWindow.onProbePair2Arrival();
} else {
// reset our control probe tail marker so the next probe that comes with data can be used
_receivedControlProbeTail = false;
}
}
}
_receiveWindow.onPacketArrival();
// If this is not the next sequence number, report loss
if (sequenceNumber > _lastReceivedSequenceNumber + 1) {
@ -476,24 +254,6 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
} else {
_lossList.append(_lastReceivedSequenceNumber + 1, sequenceNumber - 1);
}
if (_congestionControl->shouldNAK()) {
// Send a NAK packet
sendNAK(sequenceNumber);
// figure out when we should send the next loss report, if we haven't heard anything back
_nakInterval = estimatedTimeout();
int receivedPacketsPerSecond = _receiveWindow.getPacketReceiveSpeed();
if (receivedPacketsPerSecond > 0) {
// the NAK interval is at least the _minNAKInterval
// but might be the time required for all lost packets to be retransmitted
_nakInterval += (int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond));
}
// the NAK interval is at least the _minNAKInterval but might be the value calculated above, if that is larger
_nakInterval = std::max(_nakInterval, _minNAKInterval);
}
}
bool wasDuplicate = false;
@ -505,22 +265,9 @@ bool Connection::processReceivedSequenceNumber(SequenceNumber sequenceNumber, in
// Otherwise, it could be a resend, try and remove it from the loss list
wasDuplicate = !_lossList.remove(sequenceNumber);
}
// increment the counters for data packets received
++_packetsSinceACK;
// check if we need to send an ACK, according to CC params
if (_congestionControl->_ackInterval == 1) {
// using a congestion control that ACKs every packet (like TCP Vegas)
sendACK(true);
} else if (_congestionControl->_ackInterval > 0 && _packetsSinceACK >= _congestionControl->_ackInterval * _acksDuringSYN) {
_acksDuringSYN++;
sendACK(false);
} else if (_congestionControl->_lightACKInterval > 0
&& _packetsSinceACK >= _congestionControl->_lightACKInterval * _lightACKsDuringSYN) {
sendLightACK();
++_lightACKsDuringSYN;
}
// using a congestion control that ACKs every packet (like TCP Vegas)
sendACK();
if (wasDuplicate) {
_stats.record(ConnectionStats::Stats::Duplicate);
@ -544,37 +291,12 @@ void Connection::processControl(ControlPacketPointer controlPacket) {
processACK(move(controlPacket));
}
break;
case ControlPacket::LightACK:
if (_hasReceivedHandshakeACK) {
processLightACK(move(controlPacket));
}
break;
case ControlPacket::ACK2:
if (_hasReceivedHandshake) {
processACK2(move(controlPacket));
}
break;
case ControlPacket::NAK:
if (_hasReceivedHandshakeACK) {
processNAK(move(controlPacket));
}
break;
case ControlPacket::TimeoutNAK:
if (_hasReceivedHandshakeACK) {
processTimeoutNAK(move(controlPacket));
}
break;
case ControlPacket::Handshake:
processHandshake(move(controlPacket));
break;
case ControlPacket::HandshakeACK:
processHandshakeACK(move(controlPacket));
break;
case ControlPacket::ProbeTail:
if (_isReceivingData) {
processProbeTail(move(controlPacket));
}
break;
case ControlPacket::HandshakeRequest:
if (_hasReceivedHandshakeACK) {
// We're already in a state where we've received a handshake ack, so we are likely in a state
@ -591,27 +313,6 @@ void Connection::processControl(ControlPacketPointer controlPacket) {
}
void Connection::processACK(ControlPacketPointer controlPacket) {
// read the ACK sub-sequence number
SequenceNumber currentACKSubSequenceNumber;
controlPacket->readPrimitive(&currentACKSubSequenceNumber);
// Check if we need send an ACK2 for this ACK
// This will be the case if it has been longer than the sync interval OR
// it looks like they haven't received our ACK2 for this ACK
auto currentTime = p_high_resolution_clock::now();
static p_high_resolution_clock::time_point lastACK2SendTime =
p_high_resolution_clock::now() - std::chrono::microseconds(_synInterval);
microseconds sinceLastACK2 = duration_cast<microseconds>(currentTime - lastACK2SendTime);
if (_congestionControl->shouldACK2()
&& (sinceLastACK2.count() >= _synInterval || currentACKSubSequenceNumber == _lastSentACK2)) {
// Send ACK2 packet
sendACK2(currentACKSubSequenceNumber);
lastACK2SendTime = p_high_resolution_clock::now();
}
// read the ACKed sequence number
SequenceNumber ack;
controlPacket->readPrimitive(&ack);
@ -626,22 +327,9 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
return;
}
// read the RTT
int32_t rtt;
controlPacket->readPrimitive(&rtt);
if (ack < _lastReceivedACK) {
if (ack <= _lastReceivedACK) {
// this is an out of order ACK, bail
return;
}
// this is a valid ACKed sequence number - update the flow window size and the last received ACK
int32_t packedFlowWindow;
controlPacket->readPrimitive(&packedFlowWindow);
_flowWindowSize = packedFlowWindow;
if (ack == _lastReceivedACK) {
// or
// processing an already received ACK, bail
return;
}
@ -650,39 +338,7 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
// ACK the send queue so it knows what was received
getSendQueue().ack(ack);
// update the RTT
updateRTT(rtt);
// write this RTT to stats
_stats.recordRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRTT(_rtt);
if (controlPacket->bytesLeftToRead() > 0) {
int32_t receiveRate, bandwidth;
Q_ASSERT_X(controlPacket->bytesLeftToRead() == sizeof(receiveRate) + sizeof(bandwidth),
"Connection::processACK", "sync interval ACK packet does not contain expected data");
controlPacket->readPrimitive(&receiveRate);
controlPacket->readPrimitive(&bandwidth);
// set the delivery rate and bandwidth for congestion control
// these are calculated using an EWMA
static const int EMWA_ALPHA_NUMERATOR = 8;
// record these samples in connection stats
_stats.recordSendRate(receiveRate);
_stats.recordEstimatedBandwidth(bandwidth);
_deliveryRate = (_deliveryRate * (EMWA_ALPHA_NUMERATOR - 1) + receiveRate) / EMWA_ALPHA_NUMERATOR;
_bandwidth = (_bandwidth * (EMWA_ALPHA_NUMERATOR - 1) + bandwidth) / EMWA_ALPHA_NUMERATOR;
_congestionControl->setReceiveRate(_deliveryRate);
_congestionControl->setBandwidth(_bandwidth);
}
// give this ACK to the congestion control and update the send queue parameters
updateCongestionControlAndSendQueue([this, ack, &controlPacket] {
@ -695,92 +351,6 @@ void Connection::processACK(ControlPacketPointer controlPacket) {
_stats.record(ConnectionStats::Stats::ProcessedACK);
}
void Connection::processLightACK(ControlPacketPointer controlPacket) {
// read the ACKed sequence number
SequenceNumber ack;
controlPacket->readPrimitive(&ack);
// must be larger than the last received ACK to be processed
if (ack > _lastReceivedACK) {
// NOTE: the following makes sense in UDT where there is a dynamic receive buffer.
// Since we have a receive buffer that is always of a default size, we don't use this light ACK to
// drop the flow window size.
// decrease the flow window size by the offset between the last received ACK and this ACK
// _flowWindowSize -= seqoff(_lastReceivedACK, ack);
// update the last received ACK to the this one
_lastReceivedACK = ack;
// send light ACK to the send queue
getSendQueue().ack(ack);
}
_stats.record(ConnectionStats::Stats::ReceivedLightACK);
}
void Connection::processACK2(ControlPacketPointer controlPacket) {
// pull the sub sequence number from the packet
SequenceNumber subSequenceNumber;
controlPacket->readPrimitive(&subSequenceNumber);
// check if we had that subsequence number in our map
auto it = std::find_if_not(_sentACKs.begin(), _sentACKs.end(), [&subSequenceNumber](const ACKListPair& pair){
return pair.first < subSequenceNumber;
});
if (it != _sentACKs.end()) {
if (it->first == subSequenceNumber){
// update the RTT using the ACK window
// calculate the RTT (time now - time ACK sent)
auto now = p_high_resolution_clock::now();
int rtt = duration_cast<microseconds>(now - it->second.second).count();
updateRTT(rtt);
// write this RTT to stats
_stats.recordRTT(rtt);
// set the RTT for congestion control
_congestionControl->setRTT(_rtt);
// update the last ACKed ACK
if (it->second.first > _lastReceivedAcknowledgedACK) {
_lastReceivedAcknowledgedACK = it->second.first;
}
} else if (it->first < subSequenceNumber) {
Q_UNREACHABLE();
}
}
// erase this sub-sequence number and anything below it now that we've gotten our timing information
_sentACKs.erase(_sentACKs.begin(), it);
_stats.record(ConnectionStats::Stats::ReceivedACK2);
}
void Connection::processNAK(ControlPacketPointer controlPacket) {
// read the loss report
SequenceNumber start, end;
controlPacket->readPrimitive(&start);
end = start;
if (controlPacket->bytesLeftToRead() >= (qint64)sizeof(SequenceNumber)) {
controlPacket->readPrimitive(&end);
}
// send that off to the send queue so it knows there was loss
getSendQueue().nak(start, end);
// give the loss to the congestion control object and update the send queue parameters
updateCongestionControlAndSendQueue([this, start, end] {
_congestionControl->onLoss(start, end);
});
_stats.record(ConnectionStats::Stats::ReceivedNAK);
}
void Connection::processHandshake(ControlPacketPointer controlPacket) {
SequenceNumber initialSequenceNumber;
controlPacket->readPrimitive(&initialSequenceNumber);
@ -797,7 +367,6 @@ void Connection::processHandshake(ControlPacketPointer controlPacket) {
resetReceiveState();
_initialReceiveSequenceNumber = initialSequenceNumber;
_lastReceivedSequenceNumber = initialSequenceNumber - 1;
_lastSentACK = initialSequenceNumber - 1;
}
_handshakeACK->reset();
@ -829,33 +398,6 @@ void Connection::processHandshakeACK(ControlPacketPointer controlPacket) {
}
}
void Connection::processTimeoutNAK(ControlPacketPointer controlPacket) {
// Override SendQueue's LossList with the timeout NAK list
getSendQueue().overrideNAKListFromPacket(*controlPacket);
// we don't tell the congestion control object there was loss here - this matches UDTs implementation
// a possible improvement would be to tell it which new loss this timeout packet told us about
_stats.record(ConnectionStats::Stats::ReceivedTimeoutNAK);
}
void Connection::processProbeTail(ControlPacketPointer controlPacket) {
if (((uint32_t) _lastReceivedSequenceNumber & 0xF) == 0) {
// this is the second packet in a probe set so we can estimate bandwidth
// the sender sent this to us in lieu of sending new data (because they didn't have any)
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Processing second packet of probe from control packet instead of data packet";
#endif
_receiveWindow.onProbePair2Arrival();
// mark that we processed a control packet for the second in the pair and we should not mark
// the next data packet received
_receivedControlProbeTail = true;
}
}
void Connection::resetReceiveState() {
// reset all SequenceNumber member variables back to default
@ -863,35 +405,12 @@ void Connection::resetReceiveState() {
_lastReceivedSequenceNumber = defaultSequenceNumber;
_lastReceivedAcknowledgedACK = defaultSequenceNumber;
_currentACKSubSequenceNumber = defaultSequenceNumber;
_lastSentACK = defaultSequenceNumber;
// clear the sent ACKs
_sentACKs.clear();
// clear the loss list and _lastNAKTime
// clear the loss list
_lossList.clear();
_lastNAKTime = p_high_resolution_clock::now();
// the _nakInterval need not be reset, that will happen on loss
// clear sync variables
_isReceivingData = false;
_connectionStart = p_high_resolution_clock::now();
_acksDuringSYN = 1;
_lightACKsDuringSYN = 1;
_packetsSinceACK = 0;
// reset RTT to initial value
resetRTT();
// clear the intervals in the receive window
_receiveWindow.reset();
_receivedControlProbeTail = false;
// clear any pending received messages
for (auto& pendingMessage : _pendingReceivedMessages) {
_parentSocket->messageFailed(this, pendingMessage.first);
@ -899,30 +418,6 @@ void Connection::resetReceiveState() {
_pendingReceivedMessages.clear();
}
void Connection::updateRTT(int rtt) {
// This updates the RTT using exponential weighted moving average
// This is the Jacobson's forumla for RTT estimation
// http://www.mathcs.emory.edu/~cheung/Courses/455/Syllabus/7-transport/Jacobson-88.pdf
// Estimated RTT = (1 - x)(estimatedRTT) + (x)(sampleRTT)
// (where x = 0.125 via Jacobson)
// Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT|
// (where x = 0.25 via Jacobson)
static const int RTT_ESTIMATION_ALPHA_NUMERATOR = 8;
static const int RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR = 4;
_rtt = (_rtt * (RTT_ESTIMATION_ALPHA_NUMERATOR - 1) + rtt) / RTT_ESTIMATION_ALPHA_NUMERATOR;
_rttVariance = (_rttVariance * (RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR - 1)
+ abs(rtt - _rtt)) / RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR;
}
int Connection::estimatedTimeout() const {
return _congestionControl->_userDefinedRTO ? _congestionControl->_rto : _rtt + _rttVariance * 4;
}
void Connection::updateCongestionControlAndSendQueue(std::function<void ()> congestionCallback) {
// update the last sent sequence number in congestion control
_congestionControl->setSendCurrentSequenceNumber(getSendQueue().getCurrentSequenceNumber());
@ -934,8 +429,8 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
// now that we've updated the congestion control, update the packet send period and flow window size
sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod);
sendQueue.setEstimatedTimeout(estimatedTimeout());
sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
sendQueue.setEstimatedTimeout(_congestionControl->estimatedTimeout());
sendQueue.setFlowWindowSize(_congestionControl->_congestionWindowSize);
// record connection stats
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);

View file

@ -22,7 +22,6 @@
#include "ConnectionStats.h"
#include "Constants.h"
#include "LossList.h"
#include "PacketTimeWindow.h"
#include "SendQueue.h"
#include "../HifiSockAddr.h"
@ -51,9 +50,6 @@ private:
class Connection : public QObject {
Q_OBJECT
public:
using SequenceNumberTimePair = std::pair<SequenceNumber, p_high_resolution_clock::time_point>;
using ACKListPair = std::pair<SequenceNumber, SequenceNumberTimePair>;
using SentACKList = std::list<ACKListPair>;
using ControlPacketPointer = std::unique_ptr<ControlPacket>;
Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr<CongestionControl> congestionControl);
@ -87,51 +83,29 @@ private slots:
void recordRetransmission(int wireSize, SequenceNumber sequenceNumber, p_high_resolution_clock::time_point timePoint);
void queueInactive();
void queueTimeout();
void queueShortCircuitLoss(quint32 sequenceNumber);
private:
void sendACK(bool wasCausedBySyncTimeout = true);
void sendLightACK();
void sendACK2(SequenceNumber currentACKSubSequenceNumber);
void sendNAK(SequenceNumber sequenceNumberRecieved);
void sendTimeoutNAK();
void sendACK();
void processACK(ControlPacketPointer controlPacket);
void processLightACK(ControlPacketPointer controlPacket);
void processACK2(ControlPacketPointer controlPacket);
void processNAK(ControlPacketPointer controlPacket);
void processTimeoutNAK(ControlPacketPointer controlPacket);
void processHandshake(ControlPacketPointer controlPacket);
void processHandshakeACK(ControlPacketPointer controlPacket);
void processProbeTail(ControlPacketPointer controlPacket);
void resetReceiveState();
void resetRTT();
SendQueue& getSendQueue();
SequenceNumber nextACK() const;
void updateRTT(int rtt);
int estimatedTimeout() const;
void updateCongestionControlAndSendQueue(std::function<void()> congestionCallback);
void stopSendQueue();
int _synInterval; // Periodical Rate Control Interval, in microseconds
int _nakInterval { -1 }; // NAK timeout interval, in microseconds, set on loss
int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms
p_high_resolution_clock::time_point _lastNAKTime = p_high_resolution_clock::now();
bool _hasReceivedHandshake { false }; // flag for receipt of handshake from server
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
bool _didRequestHandshake { false }; // flag for request of handshake from server
p_high_resolution_clock::time_point _connectionStart = p_high_resolution_clock::now(); // holds the time_point for creation of this connection
p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender
bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection
SequenceNumber _initialSequenceNumber; // Randomized on Connection creation, identifies connection during re-connect requests
SequenceNumber _initialReceiveSequenceNumber; // Randomized by peer Connection on creation, identifies connection during re-connect requests
@ -141,43 +115,18 @@ private:
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer
SequenceNumber _lastReceivedACK; // The last ACK received
SequenceNumber _lastReceivedAcknowledgedACK; // The last sent ACK that has been acknowledged via an ACK2 from the peer
SequenceNumber _currentACKSubSequenceNumber; // The current ACK sub-sequence number (used for Acknowledgment of ACKs)
SequenceNumber _lastSentACK; // The last sent ACK
SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2
int _acksDuringSYN { 1 }; // The number of non-SYN ACKs sent during SYN
int _lightACKsDuringSYN { 1 }; // The number of lite ACKs sent during SYN interval
int32_t _rtt; // RTT, in microseconds
int32_t _rttVariance; // RTT variance
int _flowWindowSize { udt::MAX_PACKETS_IN_FLIGHT }; // Flow control window size
int _bandwidth { 1 }; // Exponential moving average for estimated bandwidth, in packets per second
int _deliveryRate { 16 }; // Exponential moving average for receiver's receive rate, in packets per second
SentACKList _sentACKs; // Map of ACK sub-sequence numbers to ACKed sequence number and sent time
Socket* _parentSocket { nullptr };
HifiSockAddr _destination;
PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for timing
bool _receivedControlProbeTail { false }; // Marker for receipt of control packet probe tail (in lieu of probe with data)
std::unique_ptr<CongestionControl> _congestionControl;
std::unique_ptr<SendQueue> _sendQueue;
std::map<MessageNumber, PendingReceivedMessage> _pendingReceivedMessages;
int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval
// Re-used control packets
ControlPacketPointer _ackPacket;
ControlPacketPointer _lightACKPacket;
ControlPacketPointer _ack2Packet;
ControlPacketPointer _lossReport;
ControlPacketPointer _handshakeACK;
ConnectionStats _stats;

View file

@ -95,11 +95,6 @@ void ConnectionStats::recordReceiveRate(int sample) {
_total.receiveRate = (int)((_total.receiveRate * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
void ConnectionStats::recordEstimatedBandwidth(int sample) {
_currentSample.estimatedBandwith = sample;
_total.estimatedBandwith = (int)((_total.estimatedBandwith * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
}
void ConnectionStats::recordRTT(int sample) {
_currentSample.rtt = sample;
_total.rtt = (int)((_total.rtt * EWMA_PREVIOUS_SAMPLES_WEIGHT) + (sample * EWMA_CURRENT_SAMPLE_WEIGHT));
@ -122,14 +117,6 @@ QDebug& operator<<(QDebug&& debug, const udt::ConnectionStats::Stats& stats) {
HIFI_LOG_EVENT(SentACK)
HIFI_LOG_EVENT(ReceivedACK)
HIFI_LOG_EVENT(ProcessedACK)
HIFI_LOG_EVENT(SentLightACK)
HIFI_LOG_EVENT(ReceivedLightACK)
HIFI_LOG_EVENT(SentACK2)
HIFI_LOG_EVENT(ReceivedACK2)
HIFI_LOG_EVENT(SentNAK)
HIFI_LOG_EVENT(ReceivedNAK)
HIFI_LOG_EVENT(SentTimeoutNAK)
HIFI_LOG_EVENT(ReceivedTimeoutNAK)
HIFI_LOG_EVENT(Retransmission)
HIFI_LOG_EVENT(Duplicate)
;

View file

@ -24,14 +24,6 @@ public:
SentACK,
ReceivedACK,
ProcessedACK,
SentLightACK,
ReceivedLightACK,
SentACK2,
ReceivedACK2,
SentNAK,
ReceivedNAK,
SentTimeoutNAK,
ReceivedTimeoutNAK,
Retransmission,
Duplicate,
@ -89,7 +81,6 @@ public:
void recordSendRate(int sample);
void recordReceiveRate(int sample);
void recordEstimatedBandwidth(int sample);
void recordRTT(int sample);
void recordCongestionWindowSize(int sample);
void recordPacketSendPeriod(int sample);

View file

@ -28,13 +28,8 @@ public:
enum Type : uint16_t {
ACK,
ACK2,
LightACK,
NAK,
TimeoutNAK,
Handshake,
HandshakeACK,
ProbeTail,
HandshakeRequest
};

View file

@ -95,7 +95,7 @@ PacketVersion versionForPacketType(PacketType packetType) {
case PacketType::AvatarIdentityRequest:
return 22;
default:
return 21;
return 22;
}
}

View file

@ -1,125 +0,0 @@
//
// PacketTimeWindow.cpp
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketTimeWindow.h"
#include <numeric>
#include <cmath>
#include <NumericalConstants.h>
using namespace udt;
using namespace std::chrono;
static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000; // 1s
static const int DEFAULT_PROBE_INTERVAL_MICROSECONDS = 1000; // 1ms
PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals) :
_numPacketIntervals(numPacketIntervals),
_numProbeIntervals(numProbeIntervals),
_packetIntervals(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS),
_probeIntervals(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS)
{
}
void PacketTimeWindow::reset() {
_packetIntervals.assign(_numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS);
_probeIntervals.assign(_numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS);
}
template <typename Iterator>
int median(Iterator begin, Iterator end) {
// use std::nth_element to grab the middle - for an even number of elements this is the upper middle
Iterator middle = begin + (end - begin) / 2;
std::nth_element(begin, middle, end);
if ((end - begin) % 2 != 0) {
// odd number of elements, just return the middle
return *middle;
} else {
// even number of elements, return the mean of the upper middle and the lower middle
Iterator lowerMiddle = std::max_element(begin, middle);
return (*middle + *lowerMiddle) / 2;
}
}
int32_t meanOfMedianFilteredValues(std::vector<int> intervals, int numValues, int valuesRequired = 0) {
// grab the median value of the intervals vector
int intervalsMedian = median(intervals.begin(), intervals.end());
// figure out our bounds for median filtering
static const int MEDIAN_FILTERING_BOUND_MULTIPLIER = 8;
int upperBound = intervalsMedian * MEDIAN_FILTERING_BOUND_MULTIPLIER;
int lowerBound = intervalsMedian / MEDIAN_FILTERING_BOUND_MULTIPLIER;
int sum = 0;
int count = 0;
// sum the values that are inside the median filtered bounds
for (auto& interval : intervals) {
if ((interval < upperBound) && (interval > lowerBound)) {
++count;
sum += interval;
}
}
// make sure we hit our threshold of values required
if (count >= valuesRequired) {
// return the frequency (per second) for the mean interval
static const double USECS_PER_SEC = 1000000.0;
return (int32_t) ceil(USECS_PER_SEC / (((double) sum) / ((double) count)));
} else {
return 0;
}
}
int32_t PacketTimeWindow::getPacketReceiveSpeed() const {
// return the mean value of median filtered values (per second) - or zero if there are too few filtered values
return meanOfMedianFilteredValues(_packetIntervals, _numPacketIntervals, _numPacketIntervals / 2);
}
int32_t PacketTimeWindow::getEstimatedBandwidth() const {
// return mean value of median filtered values (per second)
return meanOfMedianFilteredValues(_probeIntervals, _numProbeIntervals);
}
void PacketTimeWindow::onPacketArrival() {
// take the current time
auto now = p_high_resolution_clock::now();
if (_packetIntervals.size() > 0) {
// record the interval between this packet and the last one
_packetIntervals[_currentPacketInterval++] = duration_cast<microseconds>(now - _lastPacketTime).count();
// reset the currentPacketInterval index when it wraps
_currentPacketInterval %= _numPacketIntervals;
}
// remember this as the last packet arrival time
_lastPacketTime = now;
}
void PacketTimeWindow::onProbePair1Arrival() {
// take the current time as the first probe time
_firstProbeTime = p_high_resolution_clock::now();
}
void PacketTimeWindow::onProbePair2Arrival() {
// store the interval between the two probes
auto now = p_high_resolution_clock::now();
_probeIntervals[_currentProbeInterval++] = duration_cast<microseconds>(now - _firstProbeTime).count();
// reset the currentProbeInterval index when it wraps
_currentProbeInterval %= _numProbeIntervals;
}

View file

@ -1,51 +0,0 @@
//
// PacketTimeWindow.h
// libraries/networking/src/udt
//
// Created by Stephen Birarda on 2015-07-28.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#pragma once
#ifndef hifi_PacketTimeWindow_h
#define hifi_PacketTimeWindow_h
#include <vector>
#include <PortableHighResolutionClock.h>
namespace udt {
class PacketTimeWindow {
public:
PacketTimeWindow(int numPacketIntervals = 16, int numProbeIntervals = 16);
void onPacketArrival();
void onProbePair1Arrival();
void onProbePair2Arrival();
int32_t getPacketReceiveSpeed() const;
int32_t getEstimatedBandwidth() const;
void reset();
private:
int _numPacketIntervals { 0 }; // the number of packet intervals to store
int _numProbeIntervals { 0 }; // the number of probe intervals to store
int _currentPacketInterval { 0 }; // index for the current packet interval
int _currentProbeInterval { 0 }; // index for the current probe interval
std::vector<int> _packetIntervals; // vector of microsecond intervals between packet arrivals
std::vector<int> _probeIntervals; // vector of microsecond intervals between probe pair arrivals
p_high_resolution_clock::time_point _lastPacketTime = p_high_resolution_clock::now(); // the time_point when last packet arrived
p_high_resolution_clock::time_point _firstProbeTime = p_high_resolution_clock::now(); // the time_point when first probe in pair arrived
};
}
#endif // hifi_PacketTimeWindow_h

View file

@ -164,16 +164,6 @@ void SendQueue::ack(SequenceNumber ack) {
_emptyCondition.notify_one();
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
}
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
}
void SendQueue::fastRetransmit(udt::SequenceNumber ack) {
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
@ -184,28 +174,6 @@ void SendQueue::fastRetransmit(udt::SequenceNumber ack) {
_emptyCondition.notify_one();
}
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
{
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.clear();
SequenceNumber first, second;
while (packet.bytesLeftToRead() >= (qint64)(2 * sizeof(SequenceNumber))) {
packet.readPrimitive(&first);
packet.readPrimitive(&second);
if (first == second) {
_naks.append(first);
} else {
_naks.append(first, second);
}
}
}
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
}
void SendQueue::sendHandshake() {
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
@ -268,8 +236,6 @@ bool SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
_naks.append(sequenceNumber);
}
emit shortCircuitLoss(quint32(sequenceNumber));
return false;
} else {
return true;
@ -385,10 +351,6 @@ void SendQueue::run() {
}
}
void SendQueue::setProbePacketEnabled(bool enabled) {
_shouldSendProbes = enabled;
}
int SendQueue::maybeSendNewPacket() {
if (!isFlowWindowFull()) {
// we didn't re-send a packet, so time to send a new one
@ -397,40 +359,15 @@ int SendQueue::maybeSendNewPacket() {
SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket = _packets.takePacket();
Q_ASSERT(firstPacket);
std::unique_ptr<Packet> packet = _packets.takePacket();
Q_ASSERT(packet);
// attempt to send the first packet
if (sendNewPacketAndAddToSentList(move(firstPacket), nextNumber)) {
std::unique_ptr<Packet> secondPacket;
bool shouldSendPairTail = false;
// attempt to send the packet
sendNewPacketAndAddToSentList(move(packet), nextNumber);
if (_shouldSendProbes && ((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
shouldSendPairTail = true;
secondPacket = _packets.takePacket();
}
// do we have a second in a pair to send as well?
if (secondPacket) {
sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber());
} else if (shouldSendPairTail) {
// we didn't get a second packet to send in the probe pair
// send a control packet of type ProbePairTail so the receiver can still do
// proper bandwidth estimation
static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail);
_socket->writeBasePacket(*pairTailPacket, _destination);
}
// return the number of attempted packet sends
return shouldSendPairTail ? 2 : 1;
} else {
// we attempted to send a single packet, return 1
return 1;
}
// we attempted to send a packet, return 1
return 1;
}
}

View file

@ -69,16 +69,12 @@ public:
void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
void setSyncInterval(int syncInterval) { _syncInterval = syncInterval; }
void setProbePacketEnabled(bool enabled);
public slots:
void stop();
void ack(SequenceNumber ack);
void nak(SequenceNumber start, SequenceNumber end);
void fastRetransmit(SequenceNumber ack);
void overrideNAKListFromPacket(ControlPacket& packet);
void handshakeACK();
signals:
@ -87,7 +83,6 @@ signals:
void queueInactive();
void shortCircuitLoss(quint32 sequenceNumber);
void timeout();
private slots:
@ -145,9 +140,6 @@ private:
std::condition_variable _handshakeACKCondition;
std::condition_variable_any _emptyCondition;
std::atomic<bool> _shouldSendProbes { true };
};
}

View file

@ -33,18 +33,11 @@ using namespace udt;
Socket::Socket(QObject* parent, bool shouldChangeSocketOptions) :
QObject(parent),
_synTimer(new QTimer(this)),
_readyReadBackupTimer(new QTimer(this)),
_shouldChangeSocketOptions(shouldChangeSocketOptions)
{
connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams);
// make sure our synchronization method is called every SYN interval
connect(_synTimer, &QTimer::timeout, this, &Socket::rateControlSync);
// start our timer for the synchronization time interval
_synTimer->start(_synInterval);
// make sure we hear about errors and state changes from the underlying socket
connect(&_udpSocket, SIGNAL(error(QAbstractSocket::SocketError)),
this, SLOT(handleSocketError(QAbstractSocket::SocketError)));
@ -427,49 +420,9 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
}
}
void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
// the way we do this is a little funny looking - we need to avoid the case where we call sync and
// (because of our Qt direct connection to the Connection's signal that it has been deactivated)
// an iterator on _connectionsHash would be invalidated by our own call to cleanupConnection
// collect the sockets for all connections in a vector
std::vector<HifiSockAddr> sockAddrVector;
sockAddrVector.reserve(_connectionsHash.size());
for (auto& connection : _connectionsHash) {
sockAddrVector.emplace_back(connection.first);
}
// enumerate that vector of HifiSockAddr objects
for (auto& sockAddr : sockAddrVector) {
// pull out the respective connection via a quick find on the unordered_map
auto it = _connectionsHash.find(sockAddr);
if (it != _connectionsHash.end()) {
// if the connection is erased while calling sync since we are re-using the iterator that was invalidated
// we're good to go
auto& connection = _connectionsHash[sockAddr];
connection->sync();
}
}
if (_synTimer->interval() != _synInterval) {
// if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed)
// then restart it now with the right interval
_synTimer->start(_synInterval);
}
}
void Socket::setCongestionControlFactory(std::unique_ptr<CongestionControlVirtualFactory> ccFactory) {
// swap the current unique_ptr for the new factory
_ccFactory.swap(ccFactory);
// update the _synInterval to the value from the factory
_synInterval = _ccFactory->synInterval();
}

View file

@ -102,7 +102,6 @@ public slots:
private slots:
void readPendingDatagrams();
void checkForReadyReadBackup();
void rateControlSync();
void handleSocketError(QAbstractSocket::SocketError socketError);
void handleStateChanged(QAbstractSocket::SocketState socketState);
@ -133,9 +132,6 @@ private:
std::unordered_map<HifiSockAddr, BasePacketHandler> _unfilteredHandlers;
std::unordered_map<HifiSockAddr, SequenceNumber> _unreliableSequenceNumbers;
std::unordered_map<HifiSockAddr, std::unique_ptr<Connection>> _connectionsHash;
int _synInterval { 10 }; // 10ms
QTimer* _synTimer { nullptr };
QTimer* _readyReadBackupTimer { nullptr };

View file

@ -21,8 +21,6 @@ TCPVegasCC::TCPVegasCC() {
_packetSendPeriod = 0.0;
_congestionWindowSize = 2;
setAckInterval(1); // TCP sends an ACK for every packet received
// set our minimum RTT variables to the maximum possible value
// we can't do this as a member initializer until our VS has support for constexpr
_currentMinRTT = std::numeric_limits<int>::max();
@ -103,12 +101,11 @@ bool TCPVegasCC::onACK(SequenceNumber ack, p_high_resolution_clock::time_point r
auto it = _sentPacketTimes.find(ack + 1);
if (it != _sentPacketTimes.end()) {
auto estimatedTimeout = _ewmaRTT + _rttVariance * 4;
auto now = p_high_resolution_clock::now();
auto sinceSend = duration_cast<microseconds>(now - it->second).count();
if (sinceSend >= estimatedTimeout) {
if (sinceSend >= estimatedTimeout()) {
// break out of slow start, we've decided this is loss
_slowStart = false;
@ -215,6 +212,11 @@ void TCPVegasCC::performCongestionAvoidance(udt::SequenceNumber ack) {
_numACKs = 0;
}
int TCPVegasCC::estimatedTimeout() const {
return _ewmaRTT == -1 ? DEFAULT_SYN_INTERVAL : _ewmaRTT + _rttVariance * 4;
}
bool TCPVegasCC::isCongestionWindowLimited() {
if (_slowStart) {
return true;

View file

@ -27,14 +27,11 @@ public:
TCPVegasCC();
virtual bool onACK(SequenceNumber ackNum, p_high_resolution_clock::time_point receiveTime) override;
virtual void onLoss(SequenceNumber rangeStart, SequenceNumber rangeEnd) override {};
virtual void onTimeout() override {};
virtual bool shouldNAK() override { return false; }
virtual bool shouldACK2() override { return false; }
virtual bool shouldProbe() override { return false; }
virtual void onPacketSent(int wireSize, SequenceNumber seqNum, p_high_resolution_clock::time_point timePoint) override;
virtual int estimatedTimeout() const override;
protected:
virtual void performCongestionAvoidance(SequenceNumber ack);
@ -65,7 +62,6 @@ private:
int _duplicateACKCount { 0 }; // Counter for duplicate ACKs received
int _slowStartOddAdjust { 0 }; // Marker for every window adjustment every other RTT in slow-start
};
}

View file

@ -26,9 +26,6 @@ local f_hmac_hash = ProtoField.bytes("hfudt.hmac_hash", "HMAC Hash")
local f_control_type = ProtoField.uint16("hfudt.control_type", "Control Type", base.DEC)
local f_control_type_text = ProtoField.string("hfudt.control_type_text", "Control Type Text", base.ASCII)
local f_ack_sequence_number = ProtoField.uint32("hfudt.ack_sequence_number", "ACKed Sequence Number", base.DEC)
local f_control_sub_sequence = ProtoField.uint32("hfudt.control_sub_sequence", "Control Sub-Sequence Number", base.DEC)
local f_nak_sequence_number = ProtoField.uint32("hfudt.nak_sequence_number", "NAKed Sequence Number", base.DEC)
local f_nak_range_end = ProtoField.uint32("hfudt.nak_range_end", "NAK Range End", base.DEC)
local SEQUENCE_NUMBER_MASK = 0x07FFFFFF
@ -37,19 +34,13 @@ p_hfudt.fields = {
f_control_bit, f_reliable_bit, f_message_bit, f_sequence_number, f_type, f_type_text, f_version,
f_sender_id, f_hmac_hash,
f_message_position, f_message_number, f_message_part_number, f_obfuscation_level,
f_control_type, f_control_type_text, f_control_sub_sequence, f_ack_sequence_number, f_nak_sequence_number, f_nak_range_end,
f_data
f_control_type, f_control_type_text, f_ack_sequence_number, f_data
}
local control_types = {
[0] = { "ACK", "Acknowledgement" },
[1] = { "ACK2", "Acknowledgement of acknowledgement" },
[2] = { "LightACK", "Light Acknowledgement" },
[3] = { "NAK", "Loss report (NAK)" },
[4] = { "TimeoutNAK", "Loss report re-transmission (TimeoutNAK)" },
[5] = { "Handshake", "Handshake" },
[6] = { "HandshakeACK", "Acknowledgement of Handshake" },
[7] = { "ProbeTail", "Probe tail" },
[8] = { "HandshakeRequest", "Request a Handshake" }
}
@ -205,51 +196,18 @@ function p_hfudt.dissector(buf, pinfo, tree)
subtree:add(f_control_type_text, control_types[shifted_type][1])
end
if shifted_type == 0 or shifted_type == 1 then
if shifted_type == 0 then
local data_index = 4
-- this has a sub-sequence number
local second_word = buf(4, 4):le_uint()
subtree:add(f_control_sub_sequence, bit32.band(second_word, SEQUENCE_NUMBER_MASK))
local data_index = 8
if shifted_type == 0 then
-- if this is an ACK let's read out the sequence number
local sequence_number = buf(8, 4):le_uint()
subtree:add(f_ack_sequence_number, bit32.band(sequence_number, SEQUENCE_NUMBER_MASK))
data_index = data_index + 4
end
-- This is an ACK let's read out the sequence number
local sequence_number = buf(data_index, 4):le_uint()
subtree:add(f_ack_sequence_number, bit32.band(sequence_number, SEQUENCE_NUMBER_MASK))
data_index = data_index + 4
data_length = buf:len() - data_index
-- set the data from whatever is left in the packet
subtree:add(f_data, buf(data_index, data_length))
elseif shifted_type == 2 then
-- this is a Light ACK let's read out the sequence number
local sequence_number = buf(4, 4):le_uint()
subtree:add(f_ack_sequence_number, bit32.band(sequence_number, SEQUENCE_NUMBER_MASK))
data_length = buf:len() - 4
-- set the data from whatever is left in the packet
subtree:add(f_data, buf(4, data_length))
elseif shifted_type == 3 or shifted_type == 4 then
if buf:len() <= 12 then
-- this is a NAK pull the sequence number or range
local sequence_number = buf(4, 4):le_uint()
subtree:add(f_nak_sequence_number, bit32.band(sequence_number, SEQUENCE_NUMBER_MASK))
data_length = buf:len() - 4
if buf:len() > 8 then
local range_end = buf(8, 4):le_uint()
subtree:add(f_nak_range_end, bit32.band(range_end, SEQUENCE_NUMBER_MASK))
data_length = data_length - 4
end
end
else
data_length = buf:len() - 4

View file

@ -58,14 +58,12 @@ const QCommandLineOption STATS_INTERVAL {
const QStringList CLIENT_STATS_TABLE_HEADERS {
"Send (Mb/s)", "Est. Max (Mb/s)", "RTT (ms)", "CW (P)", "Period (us)",
"Recv ACK", "Procd ACK", "Recv LACK", "Recv NAK", "Recv TNAK",
"Sent ACK2", "Sent Packets", "Re-sent Packets"
"Recv ACK", "Procd ACK", "Sent Packets", "Re-sent Packets"
};
const QStringList SERVER_STATS_TABLE_HEADERS {
" Mb/s ", "Recv Mb/s", "Est. Max (Mb/s)", "RTT (ms)", "CW (P)",
"Sent ACK", "Sent LACK", "Sent NAK", "Sent TNAK",
"Recv ACK2", "Duplicates (P)"
"Sent ACK", "Duplicates (P)"
};
UDTTest::UDTTest(int& argc, char** argv) :
@ -387,10 +385,6 @@ void UDTTest::sampleStats() {
QString::number(stats.packetSendPeriod).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ProcessedACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedLightACK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedTimeoutNAK]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK2]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.sentPackets).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Retransmission]).rightJustified(CLIENT_STATS_TABLE_HEADERS[++headerIndex].size())
};
@ -420,10 +414,6 @@ void UDTTest::sampleStats() {
QString::number(stats.rtt / USECS_PER_MSEC, 'f', 2).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.congestionWindowSize).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentLightACK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::SentTimeoutNAK]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::ReceivedACK2]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size()),
QString::number(stats.events[udt::ConnectionStats::Stats::Duplicate]).rightJustified(SERVER_STATS_TABLE_HEADERS[++headerIndex].size())
};

View file

@ -15,7 +15,7 @@
#include "UDTTest.h"
int main(int argc, char* argv[]) {
setupHifiApplication("UDT Test);
setupHifiApplication("UDT Test");
UDTTest app(argc, argv);
return app.exec();