From 50c0b59ba2e2a113fe75705f2386ba4c938c0c32 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 31 Jul 2015 10:46:52 -0700 Subject: [PATCH] handle priority of re-transmission of loss --- libraries/networking/src/udt/SendQueue.cpp | 194 +++++++++++---------- libraries/networking/src/udt/SendQueue.h | 1 + 2 files changed, 106 insertions(+), 89 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index c092d089c4..d65cdf951c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -85,7 +85,10 @@ void SendQueue::ack(SequenceNumber ack) { { // remove any sequence numbers equal to or lower than this ACK in the loss list QWriteLocker nakLocker(&_naksLock); - _naks.remove(_naks.getFirstSequenceNumber(), ack); + + if (_naks.getLength() > 0) { + _naks.remove(_naks.getFirstSequenceNumber(), ack); + } } _lastACKSequenceNumber = (uint32_t) ack; @@ -118,6 +121,21 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } +void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber) { + // write the sequence number and send the packet + newPacket->writeSequenceNumber(sequenceNumber); + sendPacket(*newPacket); + + { + // Insert the packet we have just sent in the sent list + QWriteLocker locker(&_sentLock); + _sentPackets[newPacket->getSequenceNumber()].swap(newPacket); + Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); + } + + emit packetSent(); +} + void SendQueue::run() { _isRunning = true; @@ -125,104 +143,102 @@ void SendQueue::run() { // Record timing _lastSendTimestamp = high_resolution_clock::now(); - // we're only allowed to send if the flow window size - // is greater than or equal to the gap between the last ACKed sent and the one we are about to send - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1) <= _flowWindowSize) { - bool hasResend = false; - SequenceNumber sequenceNumber; - { - // Check nak list for packet to resend - QWriteLocker locker(&_naksLock); - if (_naks.getLength() > 0) { - hasResend = true; - sequenceNumber = _naks.popFirstSequenceNumber(); - } - } + qDebug() << _lastACKSequenceNumber + << (uint32_t) (_currentSequenceNumber + 1) + << seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1); + + bool resentPacket = false; + + while (!resentPacket) { + // prioritize a loss retransmission + _naksLock.lockForWrite(); - std::unique_ptr nextPacket; - - // Find packet in sent list using SequenceNumber - if (hasResend) { - QWriteLocker locker(&_sentLock); - auto it = _sentPackets.find(sequenceNumber); - Q_ASSERT_X(it != _sentPackets.end(), - "SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend"); + if (_naks.getLength() > 0) { + + // pull the sequence number we need to re-send + SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); + _naksLock.unlock(); + + // pull the packet to re-send from the sent packets list + _sentLock.lockForRead(); + + // see if we can find the packet to re-send + auto it = _sentPackets.find(resendNumber); if (it != _sentPackets.end()) { - it->second.swap(nextPacket); - _sentPackets.erase(it); - } - } - - // If there is no packet to resend, grab the next one in the list - if (!nextPacket) { - QWriteLocker locker(&_packetsLock); - - if (_packets.size() > 0) { - nextPacket.swap(_packets.front()); - _packets.pop_front(); - } - } - - if (nextPacket) { - bool shouldSendSecondOfPair = false; - - if (!hasResend) { - // if we're not re-sending a packet then need to check if this should be a packet pair - sequenceNumber = getNextSequenceNumber(); + // we found the packet - grab it + auto& resendPacket = *(it->second); - // the first packet in the pair is every 16 (rightmost 16 bits = 0) packets - if (((uint32_t) sequenceNumber & 0xF) == 0) { - shouldSendSecondOfPair = true; - } - } - - // Write packet's sequence number and send it off - nextPacket->writeSequenceNumber(sequenceNumber); - sendPacket(*nextPacket); - - { - // Insert the packet we have just sent in the sent list - QWriteLocker locker(&_sentLock); - _sentPackets[nextPacket->getSequenceNumber()].swap(nextPacket); - Q_ASSERT_X(!nextPacket, - "SendQueue::sendNextPacket()", "Overriden packet in sent list"); - } - - emit packetSent(); - - if (shouldSendSecondOfPair) { - std::unique_ptr pairedPacket; + // unlock the sent packets + _sentLock.unlock(); - // we've detected we should send the second packet in a pair, do that now before sleeping - { - QWriteLocker locker(&_packetsLock); - - if (_packets.size() > 0) { - pairedPacket.swap(_packets.front()); - _packets.pop_front(); - } - } + // send it off + sendPacket(resendPacket); - if (pairedPacket) { - // write this packet's sequence number and send it off - pairedPacket->writeSequenceNumber(getNextSequenceNumber()); - sendPacket(*pairedPacket); - - { - // add the paired packet to the sent list - QWriteLocker locker(&_sentLock); - _sentPackets[pairedPacket->getSequenceNumber()].swap(pairedPacket); - Q_ASSERT_X(!pairedPacket, - "SendQueue::sendNextPacket()", "Overriden packet in sent list"); - } - - emit packetSent(); - } + // mark that we did resend a packet + resentPacket = true; + + // break out of our while now that we have re-sent a packet + break; + } else { + // we didn't find this packet in the sentPackets queue - assume this means it was ACKed + // we'll fire the loop again to see if there is another to re-send + + // unlock the sent packets + _sentLock.unlock(); } + + } else { + // unlock the loss list, it's empty + _naksLock.unlock(); + + // break from the while, we didn't resend a packet + break; } } + if (!resentPacket + && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1) <= _flowWindowSize) { + + // we didn't re-send a packet, so time to send a new one + _packetsLock.lockForWrite(); + + if (_packets.size() > 0) { + SequenceNumber nextNumber = getNextSequenceNumber(); + + // grab the first packet we will send + std::unique_ptr firstPacket; + firstPacket.swap(_packets.front()); + _packets.pop_front(); + + std::unique_ptr secondPacket; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + if (_packets.size() > 0) { + secondPacket.swap(_packets.front()); + _packets.pop_front(); + } + } + + // unlock the packets, we're done pulling + _packetsLock.unlock(); + + // definitely send the first packet + sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); + + // do we have a second in a pair to send as well? + if (secondPacket) { + nextNumber = getNextSequenceNumber(); + sendNewPacketAndAddToSentList(move(secondPacket), nextNumber); + } + + } else { + _packetsLock.unlock(); + } + } + // since we're a while loop, give the thread a chance to process events QCoreApplication::processEvents(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 4a4cf1ffea..a438c5635f 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -68,6 +68,7 @@ private: SendQueue(SendQueue&& other) = delete; void sendPacket(const Packet& packet); + void sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); // Increments current sequence number and return it SequenceNumber getNextSequenceNumber();