diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 47a45485fa..5acba518e4 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -15,10 +15,9 @@ using namespace udt; -void UdtCC::init() { - _rcInterval = _synInterval; +void DefaultCC::init() { _lastRCTime = usecTimestampNow(); - setAckTimer(_rcInterval); + setAckTimer(synInterval()); _lastAck = _sendCurrSeqNum; _lastDecSeq = SequenceNumber{ SequenceNumber::MAX }; @@ -27,7 +26,7 @@ void UdtCC::init() { _packetSendPeriod = 1.0; } -void UdtCC::onACK(SequenceNumber ackNum) { +void DefaultCC::onACK(SequenceNumber ackNum) { int64_t B = 0; double inc = 0; // Note: 1/24/2012 @@ -37,7 +36,7 @@ void UdtCC::onACK(SequenceNumber ackNum) { const double min_inc = 0.01; uint64_t currtime = usecTimestampNow(); - if (currtime - _lastRCTime < (uint64_t)_rcInterval) { + if (currtime - _lastRCTime < (uint64_t)synInterval()) { return; } @@ -52,11 +51,11 @@ void UdtCC::onACK(SequenceNumber ackNum) { if (_recvieveRate > 0) { _packetSendPeriod = 1000000.0 / _recvieveRate; } else { - _packetSendPeriod = (_rtt + _rcInterval) / _congestionWindowSize; + _packetSendPeriod = (_rtt + synInterval()) / _congestionWindowSize; } } } else { - _congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + _rcInterval) + 16; + _congestionWindowSize = _recvieveRate / 1000000.0 * (_rtt + synInterval()) + 16; } // During Slow Start, no rate increase @@ -86,10 +85,10 @@ void UdtCC::onACK(SequenceNumber ackNum) { } } - _packetSendPeriod = (_packetSendPeriod * _rcInterval) / (_packetSendPeriod * inc + _rcInterval); + _packetSendPeriod = (_packetSendPeriod * synInterval()) / (_packetSendPeriod * inc + synInterval()); } -void UdtCC::onLoss(const std::vector& losslist) { +void DefaultCC::onLoss(const std::vector& losslist) { //Slow Start stopped, if it hasn't yet if (_slowStart) { _slowStart = false; @@ -101,7 +100,7 @@ void UdtCC::onLoss(const std::vector& losslist) { // If no receiving rate is observed, we have to compute the sending // rate according to the current window size, and decrease it // using the method below. - _packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval); + _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval()); } _loss = true; @@ -128,13 +127,13 @@ void UdtCC::onLoss(const std::vector& losslist) { } } -void UdtCC::onTimeout() { +void DefaultCC::onTimeout() { if (_slowStart) { _slowStart = false; if (_recvieveRate > 0) { _packetSendPeriod = 1000000.0 / _recvieveRate; } else { - _packetSendPeriod = _congestionWindowSize / (_rtt + _rcInterval); + _packetSendPeriod = _congestionWindowSize / (_rtt + synInterval()); } } else { /* diff --git a/libraries/networking/src/udt/CongestionControl.h b/libraries/networking/src/udt/CongestionControl.h index 9f61c7944f..27c1c8e8fd 100644 --- a/libraries/networking/src/udt/CongestionControl.h +++ b/libraries/networking/src/udt/CongestionControl.h @@ -17,30 +17,32 @@ #include "SequenceNumber.h" namespace udt { + +static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms +class Connection; class Packet; class CongestionControl { + friend class Connection; public: - static const int32_t DEFAULT_SYN_INTERVAL = 10000; // 10 ms - CongestionControl() {} + CongestionControl() {}; + CongestionControl(int synInterval) : _synInterval(synInterval) {} virtual ~CongestionControl() {} + + int synInterval() const { return _synInterval; } virtual void init() {} virtual void close() {} virtual void onAck(SequenceNumber ackNum) {} virtual void onLoss(const std::vector& lossList) {} - virtual void onPacketSent(const Packet& packet) {} - virtual void onPacketReceived(const Packet& packet) {} protected: - void setAckTimer(int syn) { _ackPeriod = (syn > _synInterval) ? _synInterval : syn; } - void setAckInterval(int interval) { _ackInterval = interval; } + void setAckTimer(int period) { _ackPeriod = (period > _synInterval) ? _synInterval : period; } + void setAckInterval(int ackInterval) { _ackInterval = ackInterval; } void setRto(int rto) { _userDefinedRto = true; _rto = rto; } - int32_t _synInterval = DEFAULT_SYN_INTERVAL; // UDT constant parameter, SYN - double _packetSendPeriod = 1.0; // Packet sending period, in microseconds double _congestionWindowSize = 16.0; // Congestion window size, in packets @@ -66,6 +68,8 @@ private: int _ackPeriod = 0; // Periodical timer to send an ACK, in milliseconds int _ackInterval = 0; // How many packets to send one ACK, in packets + int _synInterval { DEFAULT_SYN_INTERVAL }; + bool _userDefinedRto = false; // if the RTO value is defined by users int _rto = -1; // RTO value, microseconds }; @@ -75,6 +79,8 @@ class CongestionControlVirtualFactory { public: virtual ~CongestionControlVirtualFactory() {} + static int synInterval() { return DEFAULT_SYN_INTERVAL; } + virtual std::unique_ptr create() = 0; }; @@ -82,13 +88,12 @@ template class CongestionControlFactory: public CongestionControlVirtu { public: virtual ~CongestionControlFactory() {} - virtual std::unique_ptr create() { return std::unique_ptr(new T()); } }; -class UdtCC: public CongestionControl { +class DefaultCC: public CongestionControl { public: - UdtCC() {} + DefaultCC() {} public: virtual void init(); @@ -97,7 +102,6 @@ public: virtual void onTimeout(); private: - int _rcInterval = 0; // UDT Rate control interval uint64_t _lastRCTime = 0; // last rate increase time bool _slowStart = true; // if in slow start phase SequenceNumber _lastAck; // last ACKed seq num @@ -112,4 +116,4 @@ private: } -#endif // hifi_CongestionControl_h \ No newline at end of file +#endif // hifi_CongestionControl_h diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index dbcee6e622..d43901f705 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -11,7 +11,12 @@ #include "Connection.h" +#include + +#include + #include "../HifiSockAddr.h" +#include "CongestionControl.h" #include "ControlPacket.h" #include "Packet.h" #include "Socket.h" @@ -20,10 +25,25 @@ using namespace udt; using namespace std; using namespace std::chrono; -Connection::Connection(Socket* parentSocket, HifiSockAddr destination) : +Connection::Connection(Socket* parentSocket, HifiSockAddr destination, unique_ptr congestionControl) : _parentSocket(parentSocket), - _destination(destination) + _destination(destination), + _congestionControl(move(congestionControl)) { + +} + +Connection::~Connection() { + if (_sendQueue) { + // tell our send queue to stop and wait until its send thread is done + QThread* sendQueueThread = _sendQueue->thread(); + + _sendQueue->stop(); + _sendQueue->deleteLater(); + + sendQueueThread->quit(); + sendQueueThread->wait(); + } } void Connection::sendReliablePacket(unique_ptr packet) { @@ -37,6 +57,31 @@ void Connection::sendReliablePacket(unique_ptr packet) { _sendQueue->queuePacket(move(packet)); } +void Connection::sync() { + // we send out a periodic ACK every rate control interval + sendACK(); + + // check if we need to re-transmit a loss list + // we do this if it has been longer than the current nakInterval since we last sent + auto now = high_resolution_clock::now(); + + if (duration_cast(now - _lastNAKTime).count() >= _nakInterval) { + // construct a NAK packet that will hold all of the lost sequence numbers + auto lossListPacket = ControlPacket::create(ControlPacket::TimeoutNAK, _lossList.getLength() * sizeof(SequenceNumber)); + + // TODO: pack in the lost sequence numbers + + // have our SendQueue send off this control packet + _sendQueue->sendPacket(*lossListPacket); + + // record this as the last NAK time + _lastNAKTime = high_resolution_clock::now(); + + ++_totalSentTimeoutNAKs; + } + +} + void Connection::sendACK(bool wasCausedBySyncTimeout) { static const int ACK_PACKET_PAYLOAD_BYTES = sizeof(_lastSentACK) + sizeof(_currentACKSubSequenceNumber) + sizeof(_rtt) + sizeof(int32_t) + sizeof(int32_t); @@ -65,10 +110,10 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { } else if (nextACKNumber == _lastSentACK) { // We already sent this ACK, but check if we should re-send it. - // We will re-send if it has been more than RTT + (4 * RTT variance) since the last ACK - milliseconds sinceLastACK = duration_cast(currentTime - lastACKSendTime); + // We will re-send if it has been more than the estimated timeout since the last ACK + microseconds sinceLastACK = duration_cast(currentTime - lastACKSendTime); - if (sinceLastACK.count() < (_rtt + (4 * _rttVariance))) { + if (sinceLastACK.count() < estimatedTimeout()) { return; } } @@ -98,9 +143,11 @@ void Connection::sendACK(bool wasCausedBySyncTimeout) { // write this ACK to the map of sent ACKs _sentACKs[_currentACKSubSequenceNumber] = { nextACKNumber, high_resolution_clock::now() }; + + ++_totalSentACKs; } -void Connection::sendLightACK() const { +void Connection::sendLightACK() { // create the light ACK packet, make it static so we can re-use it static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber); static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES); @@ -120,6 +167,8 @@ void Connection::sendLightACK() const { // have the send queue send off our packet immediately _sendQueue->sendPacket(*lightACKPacket); + + ++_totalSentLightACKs; } SequenceNumber Connection::nextACK() const { @@ -162,6 +211,25 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) { // have the send queue send off our packet immediately _sendQueue->sendPacket(*lossReport); + + // record our last NAK time + _lastNAKTime = high_resolution_clock::now(); + + ++_totalSentNAKs; + + // figure out when we should send the next loss report, if we haven't heard anything back + _nakInterval = (_rtt + 4 * _rttVariance); + + int receivedPacketsPerSecond = _receiveWindow.getPacketReceiveSpeed(); + if (receivedPacketsPerSecond > 0) { + // the NAK interval is at least the _minNAKInterval + // but might be the time required for all lost packets to be retransmitted + _nakInterval = std::max((int) (_lossList.getLength() * (USECS_PER_SECOND / receivedPacketsPerSecond)), + _minNAKInterval); + } else { + // the NAK interval is at least the _minNAKInterval but might be the estimated timeout + _nakInterval = std::max(estimatedTimeout(), _minNAKInterval); + } } if (seq > _lastReceivedSequenceNumber) { @@ -171,6 +239,8 @@ void Connection::processReceivedSequenceNumber(SequenceNumber seq) { // Otherwise, it's a resend, remove it from the loss list _lossList.remove(seq); } + + ++_totalReceivedDataPackets; } void Connection::processControl(unique_ptr controlPacket) { @@ -188,6 +258,9 @@ void Connection::processControl(unique_ptr controlPacket) { case ControlPacket::NAK: processNAK(move(controlPacket)); break; + case ControlPacket::TimeoutNAK: + processTimeoutNAK(move(controlPacket)); + break; } } @@ -202,7 +275,7 @@ void Connection::processACK(std::unique_ptr controlPacket) { auto currentTime = high_resolution_clock::now(); static high_resolution_clock::time_point lastACK2SendTime; - milliseconds sinceLastACK2 = duration_cast(currentTime - lastACK2SendTime); + microseconds sinceLastACK2 = duration_cast(currentTime - lastACK2SendTime); if (sinceLastACK2.count() > _synInterval || currentACKSubSequenceNumber == _lastSentACK2) { // setup a static ACK2 packet we will re-use @@ -218,6 +291,8 @@ void Connection::processACK(std::unique_ptr controlPacket) { // update the last sent ACK2 and the last ACK2 send time _lastSentACK2 = currentACKSubSequenceNumber; lastACK2SendTime = high_resolution_clock::now(); + + ++_totalSentACK2s; } // read the ACKed sequence number @@ -261,6 +336,7 @@ void Connection::processACK(std::unique_ptr controlPacket) { updateRTT(rtt); // set the RTT for congestion control + _congestionControl->setRtt(_rtt); if (controlPacket->getPayloadSize() > (qint64) (sizeof(SequenceNumber) + sizeof(SequenceNumber) + sizeof(rtt))) { int32_t deliveryRate, bandwidth; @@ -268,9 +344,12 @@ void Connection::processACK(std::unique_ptr controlPacket) { controlPacket->readPrimitive(&bandwidth); // set the delivery rate and bandwidth for congestion control + _congestionControl->setRcvRate(deliveryRate); + _congestionControl->setBandwidth(bandwidth); } // fire the onACK callback for congestion control + _congestionControl->onAck(ack); // update the total count of received ACKs ++_totalReceivedACKs; @@ -289,6 +368,8 @@ void Connection::processLightACK(std::unique_ptr controlPacket) { // update the last received ACK to the this one _lastReceivedACK = ack; } + + ++_totalReceivedLightACKs; } void Connection::processACK2(std::unique_ptr controlPacket) { @@ -304,26 +385,40 @@ void Connection::processACK2(std::unique_ptr controlPacket) { // calculate the RTT (time now - time ACK sent) auto now = high_resolution_clock::now(); - int rtt = duration_cast(now - pair.second).count(); + int rtt = duration_cast(now - pair.second).count(); updateRTT(rtt); // set the RTT for congestion control + _congestionControl->setRtt(_rtt); // update the last ACKed ACK if (pair.first > _lastReceivedAcknowledgedACK) { _lastReceivedAcknowledgedACK = pair.first; } } + + ++_totalReceivedACK2s; } void Connection::processNAK(std::unique_ptr controlPacket) { // read the loss report SequenceNumber start, end; controlPacket->readPrimitive(&start); + if (controlPacket->bytesLeftToRead() >= (qint64)sizeof(SequenceNumber)) { controlPacket->readPrimitive(&end); } + + + + ++_totalReceivedNAKs; +} + +void Connection::processTimeoutNAK(std::unique_ptr controlPacket) { + // read the NAKed sequence numbers from the packet + + ++_totalReceivedTimeoutNAKs; } void Connection::updateRTT(int rtt) { @@ -337,6 +432,11 @@ void Connection::updateRTT(int rtt) { // Deviation = (1 - x)(deviation) + x |sampleRTT - estimatedRTT| // (where x = 0.25 via Jacobson) - _rttVariance = (_rttVariance * 3 + abs(rtt - _rtt)) >> 2; - _rtt = (_rtt * 7 + rtt) >> 3; + static const int RTT_ESTIMATION_ALPHA_NUMERATOR = 8; + static const int RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR = 4; + + _rtt = (_rtt * (1 - RTT_ESTIMATION_ALPHA_NUMERATOR) + rtt) / RTT_ESTIMATION_ALPHA_NUMERATOR; + + _rttVariance = (_rttVariance * (1 - RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR) + + abs(rtt - _rtt)) / RTT_ESTIMATION_VARIANCE_ALPHA_NUMERATOR; } diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 11c5126023..7cc2563fc5 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -22,6 +22,7 @@ namespace udt { +class CongestionControl; class ControlPacket; class Packet; class Socket; @@ -32,12 +33,12 @@ public: using SequenceNumberTimePair = std::pair; using SentACKMap = std::unordered_map; - Connection(Socket* parentSocket, HifiSockAddr destination); + Connection(Socket* parentSocket, HifiSockAddr destination, std::unique_ptr congestionControl); + ~Connection(); void sendReliablePacket(std::unique_ptr packet); - - void sendACK(bool wasCausedBySyncTimeout = true); - void sendLightACK() const; + + void sync(); // rate control method, fired by Socket for all connections on SYN interval SequenceNumber nextACK() const; @@ -47,14 +48,24 @@ public: void processControl(std::unique_ptr controlPacket); private: + void sendACK(bool wasCausedBySyncTimeout = true); + void sendLightACK(); + void processACK(std::unique_ptr controlPacket); void processLightACK(std::unique_ptr controlPacket); void processACK2(std::unique_ptr controlPacket); void processNAK(std::unique_ptr controlPacket); + void processTimeoutNAK(std::unique_ptr controlPacket); void updateRTT(int rtt); - int _synInterval; // Periodical Rate Control Interval, defaults to 10ms + int estimatedTimeout() const { return _rtt + _rttVariance * 4; } + + int _synInterval; // Periodical Rate Control Interval, in microseconds, defaults to 10ms + + int _nakInterval; // NAK timeout interval, in microseconds + int _minNAKInterval { 100000 }; // NAK timeout interval lower bound, default of 100ms + std::chrono::high_resolution_clock::time_point _lastNAKTime; LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber { SequenceNumber::MAX }; // The largest sequence number received from the peer @@ -66,9 +77,7 @@ private: SequenceNumber _lastSentACK { SequenceNumber::MAX }; // The last sent ACK SequenceNumber _lastSentACK2; // The last sent ACK sub-sequence number in an ACK2 - int _totalReceivedACKs { 0 }; - - int32_t _rtt; // RTT, in milliseconds + int32_t _rtt; // RTT, in microseconds int32_t _rttVariance; // RTT variance int _flowWindowSize; // Flow control window size @@ -80,6 +89,25 @@ private: PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed std::unique_ptr _sendQueue; + + std::unique_ptr _congestionControl; + + // Control Packet stat collection + int _totalReceivedACKs { 0 }; + int _totalSentACKs { 0 }; + int _totalSentLightACKs { 0 }; + int _totalReceivedLightACKs { 0 }; + int _totalReceivedACK2s { 0 }; + int _totalSentACK2s { 0 }; + int _totalReceivedNAKs { 0 }; + int _totalSentNAKs { 0 }; + int _totalReceivedTimeoutNAKs { 0 }; + int _totalSentTimeoutNAKs { 0 }; + + // Data packet stat collection + int _totalReceivedDataPackets { 0 }; + + }; } diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index 4563b657a9..fdd0b65e36 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -29,7 +29,8 @@ public: enum Type : uint16_t { ACK, ACK2, - NAK + NAK, + TimeoutNAK }; static std::unique_ptr create(Type type, qint64 size = -1); diff --git a/libraries/networking/src/udt/PacketTimeWindow.cpp b/libraries/networking/src/udt/PacketTimeWindow.cpp index c8c49e6603..469915d311 100644 --- a/libraries/networking/src/udt/PacketTimeWindow.cpp +++ b/libraries/networking/src/udt/PacketTimeWindow.cpp @@ -18,11 +18,14 @@ using namespace udt; using namespace std::chrono; +static const int DEFAULT_PACKET_INTERVAL_MICROSECONDS = 1000000; +static const int DEFAULT_PROBE_INTERVAL_MICROSECONDS = 1000; + PacketTimeWindow::PacketTimeWindow(int numPacketIntervals, int numProbeIntervals) : _numPacketIntervals(numPacketIntervals), _numProbeIntervals(numProbeIntervals), - _packetIntervals({ _numPacketIntervals }), - _probeIntervals({ _numProbeIntervals }) + _packetIntervals({ _numPacketIntervals, DEFAULT_PACKET_INTERVAL_MICROSECONDS }), + _probeIntervals({ _numProbeIntervals, DEFAULT_PROBE_INTERVAL_MICROSECONDS }) { } @@ -40,8 +43,9 @@ int32_t meanOfMedianFilteredValues(std::vector intervals, int numValues, in int count = 0; int sum = 0; - int upperBound = median * 8; - int lowerBound = median / 8; + static const int MEDIAN_FILTERING_BOUND_MULTIPLIER = 8; + int upperBound = median * MEDIAN_FILTERING_BOUND_MULTIPLIER; + int lowerBound = median / MEDIAN_FILTERING_BOUND_MULTIPLIER; for (auto& interval : intervals) { if ((interval < upperBound) && interval > lowerBound) { diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index bc619d95bc..0931159483 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -13,6 +13,7 @@ #include +#include #include #include @@ -21,6 +22,7 @@ #include "Socket.h" using namespace udt; +using namespace std::chrono; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr dest) { auto queue = std::unique_ptr(new SendQueue(socket, dest)); @@ -42,17 +44,7 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : _socket(socket), _destination(dest) { - _sendTimer.reset(new QTimer(this)); - _sendTimer->setSingleShot(true); - QObject::connect(_sendTimer.get(), &QTimer::timeout, this, &SendQueue::sendNextPacket); - _packetSendPeriod = DEFAULT_SEND_PERIOD; - _lastSendTimestamp = 0; -} - -SendQueue::~SendQueue() { - assert(thread() == QThread::currentThread()); - _sendTimer->stop(); } void SendQueue::queuePacket(std::unique_ptr packet) { @@ -60,24 +52,24 @@ void SendQueue::queuePacket(std::unique_ptr packet) { QWriteLocker locker(&_packetsLock); _packets.push_back(std::move(packet)); } - if (!_running) { - start(); + if (!_isRunning) { + run(); } } -void SendQueue::start() { +void SendQueue::run() { // We need to make sure this is called on the right thread if (thread() != QThread::currentThread()) { - QMetaObject::invokeMethod(this, "start", Qt::QueuedConnection); + QMetaObject::invokeMethod(this, "run", Qt::QueuedConnection); } - _running = true; + _isRunning = true; - // This will send a packet and fire the send timer - sendNextPacket(); + // This will loop and sleep to send packets + loop(); } void SendQueue::stop() { - _running = false; + _isRunning = false; } void SendQueue::sendPacket(const BasePacket& packet) { @@ -116,61 +108,68 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } -void SendQueue::sendNextPacket() { - if (!_running) { - return; - } - - // Record timing - auto sendTime = msecTimestampNow(); // msec - _lastSendTimestamp = sendTime; - - if (_nextPacket) { - // Write packet's sequence number and send it off - _nextPacket->writeSequenceNumber(getNextSequenceNumber()); - sendPacket(*_nextPacket); +void SendQueue::loop() { + while (_isRunning) { + // Record timing + _lastSendTimestamp = high_resolution_clock::now(); - // Insert the packet we have just sent in the sent list - QWriteLocker locker(&_sentLock); - _sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket); - Q_ASSERT_X(!_nextPacket, - "SendQueue::sendNextPacket()", "Overriden packet in sent list"); - } - - bool hasResend = false; - SequenceNumber seqNum; - { - // Check nak list for packet to resend - QWriteLocker locker(&_naksLock); - if (_naks.getLength() > 0) { - hasResend = true; - seqNum = _naks.popFirstSequenceNumber(); + if (_nextPacket) { + // Write packet's sequence number and send it off + _nextPacket->writeSequenceNumber(getNextSequenceNumber()); + sendPacket(*_nextPacket); + + // Insert the packet we have just sent in the sent list + QWriteLocker locker(&_sentLock); + _sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket); + Q_ASSERT_X(!_nextPacket, + "SendQueue::sendNextPacket()", "Overriden packet in sent list"); + } + + bool hasResend = false; + SequenceNumber seqNum; + { + // Check nak list for packet to resend + QWriteLocker locker(&_naksLock); + if (_naks.getLength() > 0) { + hasResend = true; + seqNum = _naks.popFirstSequenceNumber(); + } + } + + // Find packet in sent list using SequenceNumber + if (hasResend) { + QWriteLocker locker(&_sentLock); + auto it = _sentPackets.find(seqNum); + Q_ASSERT_X(it != _sentPackets.end(), + "SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend"); + + if (it != _sentPackets.end()) { + it->second.swap(_nextPacket); + _sentPackets.erase(it); + } + } + + // If there is no packet to resend, grab the next one in the list + if (!_nextPacket) { + QWriteLocker locker(&_packetsLock); + _nextPacket.swap(_packets.front()); + _packets.pop_front(); + } + + // since we're a while loop, give the thread a chance to process events + QCoreApplication::processEvents(); + + // we just processed events so check now if we were just told to stop + if (!_isRunning) { + break; + } + + // sleep as long as we need until next packet send, if we can + auto now = high_resolution_clock::now(); + auto microsecondDuration = (_lastSendTimestamp + microseconds(_packetSendPeriod)) - now; + + if (microsecondDuration.count() > 0) { + usleep(microsecondDuration.count()); } } - - // Find packet in sent list using SequenceNumber - if (hasResend) { - QWriteLocker locker(&_sentLock); - auto it = _sentPackets.find(seqNum); - Q_ASSERT_X(it != _sentPackets.end(), - "SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend"); - - if (it != _sentPackets.end()) { - it->second.swap(_nextPacket); - _sentPackets.erase(it); - } - } - - // If there is no packet to resend, grab the next one in the list - if (!_nextPacket) { - QWriteLocker locker(&_packetsLock); - _nextPacket.swap(_packets.front()); - _packets.pop_front(); - } - - // check if we need to fire off a packet pair - we do this - - // How long before next packet send - auto timeToSleep = (sendTime + _packetSendPeriod) - msecTimestampNow(); // msec - _sendTimer->start(std::max((quint64)0, timeToSleep)); } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 82c569f6c5..10b35079b0 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -12,6 +12,7 @@ #ifndef hifi_SendQueue_h #define hifi_SendQueue_h +#include #include #include @@ -34,15 +35,13 @@ class SendQueue : public QObject { Q_OBJECT public: - static const int DEFAULT_SEND_PERIOD = 16; // msec + static const int DEFAULT_SEND_PERIOD = 16 * 1000; // 16ms, in microseconds static std::unique_ptr create(Socket* socket, HifiSockAddr dest); void queuePacket(std::unique_ptr packet); int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); } - quint64 getLastSendTimestamp() const { return _lastSendTimestamp; } - SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } int getPacketSendPeriod() const { return _packetSendPeriod; } @@ -52,14 +51,14 @@ public: void sendPacket(const BasePacket& packet); public slots: - void start(); + void run(); void stop(); void ack(SequenceNumber ack); void nak(SequenceNumber start, SequenceNumber end); private slots: - void sendNextPacket(); + void loop(); private: friend struct std::default_delete; @@ -67,7 +66,6 @@ private: SendQueue(Socket* socket, HifiSockAddr dest); SendQueue(SendQueue& other) = delete; SendQueue(SendQueue&& other) = delete; - ~SendQueue(); // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); @@ -83,10 +81,9 @@ private: SequenceNumber _currentSequenceNumber; // Last sequence number sent out std::atomic _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out - std::unique_ptr _sendTimer; // Send timer - std::atomic _packetSendPeriod { 0 }; // Interval between two packet send envent in msec - std::atomic _lastSendTimestamp { 0 }; // Record last time of packet departure - std::atomic _running { false }; + std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds + std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure + std::atomic _isRunning { false }; mutable QReadWriteLock _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 358fb95638..d82915c68f 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -71,7 +71,7 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) { Q_ASSERT_X(!packet.isReliable(), "Socket::writePacket", "Cannot send a reliable packet unreliably"); // write the correct sequence number to the Packet here - packet.writeSequenceNumber(_currentUnreliableSequenceNumber); + packet.writeSequenceNumber(++_unreliableSequenceNumbers[sockAddr]); return writeDatagram(packet.getData(), packet.getDataSize(), sockAddr); } @@ -80,7 +80,7 @@ qint64 Socket::writePacket(std::unique_ptr packet, const HifiSockAddr& s if (packet->isReliable()) { auto it = _connectionsHash.find(sockAddr); if (it == _connectionsHash.end()) { - it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr))); + it = _connectionsHash.insert(it, std::make_pair(sockAddr, new Connection(this, sockAddr, _ccFactory->create()))); } it->second->sendReliablePacket(std::move(packet)); return 0; @@ -146,7 +146,7 @@ void Socket::rateControlSync() { // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control for (auto& connection : _connectionsHash) { - connection.second->sendACK(); + connection.second->sync(); } if (_synTimer.interval() != _synInterval) { @@ -155,3 +155,11 @@ void Socket::rateControlSync() { _synTimer.start(_synInterval); } } + +void Socket::setCongestionControlFactory(std::unique_ptr ccFactory) { + // swap the current unique_ptr for the new factory + _ccFactory.swap(ccFactory); + + // update the _synInterval to the value from the factory + _synInterval = _ccFactory->synInterval(); +} diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index dee6af4117..7cf4283cf3 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -22,6 +22,7 @@ #include #include "../HifiSockAddr.h" +#include "CongestionControl.h" #include "Connection.h" namespace udt { @@ -59,6 +60,8 @@ public: void addUnfilteredHandler(const HifiSockAddr& senderSockAddr, BasePacketHandler handler) { _unfilteredHandlers[senderSockAddr] = handler; } + + void setCongestionControlFactory(std::unique_ptr ccFactory); private slots: void readPendingDatagrams(); @@ -69,13 +72,14 @@ private: PacketFilterOperator _packetFilterOperator; PacketHandler _packetHandler; - SequenceNumber _currentUnreliableSequenceNumber; - std::unordered_map _unfilteredHandlers; + std::unordered_map _unreliableSequenceNumbers; std::unordered_map _connectionsHash; int32_t _synInterval = 10; // 10ms QTimer _synTimer; + + std::unique_ptr _ccFactory { new CongestionControlFactory() }; }; } // namespace udt