From 57da7c2ba71e848dba7b785e970ff3c005c45625 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 23 Mar 2016 17:29:46 -0700 Subject: [PATCH] add handling for short circuit loss in SendQueue --- .../networking/src/udt/CongestionControl.cpp | 2 +- libraries/networking/src/udt/Connection.cpp | 7 ++ libraries/networking/src/udt/Connection.h | 1 + libraries/networking/src/udt/SendQueue.cpp | 99 +++++++++++-------- libraries/networking/src/udt/SendQueue.h | 7 +- tools/udt-test/src/UDTTest.cpp | 2 +- 6 files changed, 73 insertions(+), 45 deletions(-) diff --git a/libraries/networking/src/udt/CongestionControl.cpp b/libraries/networking/src/udt/CongestionControl.cpp index 1d1a6628fe..d30be2c139 100644 --- a/libraries/networking/src/udt/CongestionControl.cpp +++ b/libraries/networking/src/udt/CongestionControl.cpp @@ -201,7 +201,7 @@ void DefaultCC::onTimeout() { void DefaultCC::stopSlowStart() { _slowStart = false; - + if (_receiveRate > 0) { // Set the sending rate to the receiving rate. setPacketSendPeriod(USECS_PER_SECOND / _receiveRate); diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index e5f3508b81..af70295840 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -103,6 +103,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); QObject::connect(_sendQueue.get(), &SendQueue::timeout, this, &Connection::queueTimeout); + QObject::connect(_sendQueue.get(), &SendQueue::shortCircuitLoss, this, &Connection::queueShortCircuitLoss); // set defaults on the send queue from our congestion control object and estimatedTimeout() _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -140,6 +141,12 @@ void Connection::queueTimeout() { }); } +void Connection::queueShortCircuitLoss(quint32 sequenceNumber) { + updateCongestionControlAndSendQueue([this, sequenceNumber]{ + _congestionControl->onLoss(SequenceNumber { sequenceNumber }, SequenceNumber { sequenceNumber }); + }); +} + void Connection::sendReliablePacket(std::unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); getSendQueue().queuePacket(std::move(packet)); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 4f5a8793e7..08a2df9b97 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -87,6 +87,7 @@ private slots: void recordRetransmission(); void queueInactive(); void queueTimeout(); + void queueShortCircuitLoss(quint32 sequenceNumber); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 9701561ec7..2c3303537c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -128,8 +128,8 @@ void SendQueue::stop() { _emptyCondition.notify_one(); } -void SendQueue::sendPacket(const Packet& packet) { - _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination); +int SendQueue::sendPacket(const Packet& packet) { + return _socket->writeDatagram(packet.getData(), packet.getDataSize(), _destination); } void SendQueue::ack(SequenceNumber ack) { @@ -178,7 +178,7 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { // this is a response from the client, re-set our timeout expiry _lastReceiverResponse = uint64_t(QDateTime::currentMSecsSinceEpoch()); - + { std::lock_guard nakLocker(_naksLock); _naks.clear(); @@ -232,15 +232,16 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } -void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber) { +bool SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber) { // write the sequence number and send the packet newPacket->writeSequenceNumber(sequenceNumber); - sendPacket(*newPacket); - + // Save packet/payload size before we move it auto packetSize = newPacket->getDataSize(); auto payloadSize = newPacket->getPayloadSize(); + auto bytesWritten = sendPacket(*newPacket); + { // Insert the packet we have just sent in the sent list QWriteLocker locker(&_sentLock); @@ -249,8 +250,24 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, entry.second.swap(newPacket); } Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); - + emit packetSent(packetSize, payloadSize); + + if (bytesWritten < 0) { + // this is a short-circuit loss - we failed to put this packet on the wire + // so immediately add it to the loss list + + { + std::lock_guard nakLocker(_naksLock); + _naks.append(sequenceNumber); + } + + emit shortCircuitLoss(quint32(sequenceNumber)); + + return false; + } else { + return true; + } } void SendQueue::run() { @@ -285,12 +302,12 @@ void SendQueue::run() { auto nextPacketTimestamp = p_high_resolution_clock::now(); while (_state == State::Running) { - bool sentAPacket = maybeResendPacket(); + bool attemptedToSendPacket = maybeResendPacket(); // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet - if (!sentAPacket) { - sentAPacket = maybeSendNewPacket(); + if (!attemptedToSendPacket) { + attemptedToSendPacket = maybeSendNewPacket(); } // since we're a while loop, give the thread a chance to process events @@ -300,7 +317,7 @@ void SendQueue::run() { // If the send queue has been innactive, skip the sleep for // Either _isRunning will have been set to false and we'll break // Or something happened and we'll keep going - if (_state != State::Running || isInactive(sentAPacket)) { + if (_state != State::Running || isInactive(attemptedToSendPacket)) { return; } @@ -324,33 +341,34 @@ bool SendQueue::maybeSendNewPacket() { // grab the first packet we will send std::unique_ptr firstPacket = _packets.takePacket(); Q_ASSERT(firstPacket); - - std::unique_ptr secondPacket; - bool shouldSendPairTail = false; - - if (((uint32_t) nextNumber & 0xF) == 0) { - // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets - // pull off a second packet if we can before we unlock - shouldSendPairTail = true; - - secondPacket = _packets.takePacket(); + + + // attempt to send the first packet + if (sendNewPacketAndAddToSentList(move(firstPacket), nextNumber)) { + std::unique_ptr secondPacket; + bool shouldSendPairTail = false; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + shouldSendPairTail = true; + + secondPacket = _packets.takePacket(); + } + + // do we have a second in a pair to send as well? + if (secondPacket) { + sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); + } else if (shouldSendPairTail) { + // we didn't get a second packet to send in the probe pair + // send a control packet of type ProbePairTail so the receiver can still do + // proper bandwidth estimation + static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); + _socket->writeBasePacket(*pairTailPacket, _destination); + } } - - // definitely send the first packet - sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); - - // do we have a second in a pair to send as well? - if (secondPacket) { - sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); - } else if (shouldSendPairTail) { - // we didn't get a second packet to send in the probe pair - // send a control packet of type ProbePairTail so the receiver can still do - // proper bandwidth estimation - static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); - _socket->writeBasePacket(*pairTailPacket, _destination); - } - - // We sent our packet(s), return here + + // We attempted to send packet(s), return here return true; } } @@ -375,8 +393,9 @@ bool SendQueue::maybeResendPacket() { // see if we can find the packet to re-send auto it = _sentPackets.find(resendNumber); - + if (it != _sentPackets.end()) { + auto& entry = it->second; // we found the packet - grab it auto& resendPacket = *(entry.second); @@ -437,7 +456,7 @@ bool SendQueue::maybeResendPacket() { return false; } -bool SendQueue::isInactive(bool sentAPacket) { +bool SendQueue::isInactive(bool attemptedToSendPacket) { // check for connection timeout first // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been @@ -462,7 +481,7 @@ bool SendQueue::isInactive(bool sentAPacket) { return true; } - if (!sentAPacket) { + if (!attemptedToSendPacket) { // During our processing above we didn't send any packets // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 9400ae8352..dc151e9f4d 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -79,6 +79,7 @@ signals: void queueInactive(); + void shortCircuitLoss(quint32 sequenceNumber); void timeout(); private slots: @@ -91,13 +92,13 @@ private: void sendHandshake(); - void sendPacket(const Packet& packet); - void sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); + int sendPacket(const Packet& packet); + bool sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); bool maybeSendNewPacket(); // Figures out what packet to send next bool maybeResendPacket(); // Determines whether to resend a packet and which one - bool isInactive(bool sentAPacket); + bool isInactive(bool attemptedToSendPacket); void deactivate(); // makes the queue inactive and cleans it up bool isFlowWindowFull() const; diff --git a/tools/udt-test/src/UDTTest.cpp b/tools/udt-test/src/UDTTest.cpp index 533e6371e9..2b5e306b09 100644 --- a/tools/udt-test/src/UDTTest.cpp +++ b/tools/udt-test/src/UDTTest.cpp @@ -77,7 +77,7 @@ UDTTest::UDTTest(int& argc, char** argv) : // randomize the seed for packet size randomization srand(time(NULL)); - + _socket.bind(QHostAddress::AnyIPv4, _argumentParser.value(PORT_OPTION).toUInt()); qDebug() << "Test socket is listening on" << _socket.localPort();