handle priority of re-transmission of loss

This commit is contained in:
Stephen Birarda 2015-07-31 10:46:52 -07:00
parent 98a53cbd72
commit 50c0b59ba2
2 changed files with 106 additions and 89 deletions

View file

@ -85,8 +85,11 @@ void SendQueue::ack(SequenceNumber ack) {
{ // remove any sequence numbers equal to or lower than this ACK in the loss list { // remove any sequence numbers equal to or lower than this ACK in the loss list
QWriteLocker nakLocker(&_naksLock); QWriteLocker nakLocker(&_naksLock);
if (_naks.getLength() > 0) {
_naks.remove(_naks.getFirstSequenceNumber(), ack); _naks.remove(_naks.getFirstSequenceNumber(), ack);
} }
}
_lastACKSequenceNumber = (uint32_t) ack; _lastACKSequenceNumber = (uint32_t) ack;
} }
@ -118,6 +121,21 @@ SequenceNumber SendQueue::getNextSequenceNumber() {
return _currentSequenceNumber; return _currentSequenceNumber;
} }
void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber) {
// write the sequence number and send the packet
newPacket->writeSequenceNumber(sequenceNumber);
sendPacket(*newPacket);
{
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[newPacket->getSequenceNumber()].swap(newPacket);
Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list");
}
emit packetSent();
}
void SendQueue::run() { void SendQueue::run() {
_isRunning = true; _isRunning = true;
@ -125,101 +143,99 @@ void SendQueue::run() {
// Record timing // Record timing
_lastSendTimestamp = high_resolution_clock::now(); _lastSendTimestamp = high_resolution_clock::now();
// we're only allowed to send if the flow window size qDebug() << _lastACKSequenceNumber
// is greater than or equal to the gap between the last ACKed sent and the one we are about to send << (uint32_t) (_currentSequenceNumber + 1)
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1) <= _flowWindowSize) { << seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1);
bool hasResend = false;
SequenceNumber sequenceNumber; bool resentPacket = false;
{
// Check nak list for packet to resend while (!resentPacket) {
QWriteLocker locker(&_naksLock); // prioritize a loss retransmission
_naksLock.lockForWrite();
if (_naks.getLength() > 0) { if (_naks.getLength() > 0) {
hasResend = true;
sequenceNumber = _naks.popFirstSequenceNumber();
}
}
std::unique_ptr<Packet> nextPacket; // pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
_naksLock.unlock();
// Find packet in sent list using SequenceNumber // pull the packet to re-send from the sent packets list
if (hasResend) { _sentLock.lockForRead();
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(sequenceNumber); // see if we can find the packet to re-send
Q_ASSERT_X(it != _sentPackets.end(), auto it = _sentPackets.find(resendNumber);
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) { if (it != _sentPackets.end()) {
it->second.swap(nextPacket); // we found the packet - grab it
_sentPackets.erase(it); auto& resendPacket = *(it->second);
// unlock the sent packets
_sentLock.unlock();
// send it off
sendPacket(resendPacket);
// mark that we did resend a packet
resentPacket = true;
// break out of our while now that we have re-sent a packet
break;
} else {
// we didn't find this packet in the sentPackets queue - assume this means it was ACKed
// we'll fire the loop again to see if there is another to re-send
// unlock the sent packets
_sentLock.unlock();
}
} else {
// unlock the loss list, it's empty
_naksLock.unlock();
// break from the while, we didn't resend a packet
break;
} }
} }
// If there is no packet to resend, grab the next one in the list if (!resentPacket
if (!nextPacket) { && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1) <= _flowWindowSize) {
QWriteLocker locker(&_packetsLock);
// we didn't re-send a packet, so time to send a new one
_packetsLock.lockForWrite();
if (_packets.size() > 0) { if (_packets.size() > 0) {
nextPacket.swap(_packets.front()); SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
std::unique_ptr<Packet> firstPacket;
firstPacket.swap(_packets.front());
_packets.pop_front();
std::unique_ptr<Packet> secondPacket;
if (((uint32_t) nextNumber & 0xF) == 0) {
// the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets
// pull off a second packet if we can before we unlock
if (_packets.size() > 0) {
secondPacket.swap(_packets.front());
_packets.pop_front(); _packets.pop_front();
} }
} }
if (nextPacket) { // unlock the packets, we're done pulling
bool shouldSendSecondOfPair = false; _packetsLock.unlock();
if (!hasResend) { // definitely send the first packet
// if we're not re-sending a packet then need to check if this should be a packet pair sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
sequenceNumber = getNextSequenceNumber();
// the first packet in the pair is every 16 (rightmost 16 bits = 0) packets // do we have a second in a pair to send as well?
if (((uint32_t) sequenceNumber & 0xF) == 0) { if (secondPacket) {
shouldSendSecondOfPair = true; nextNumber = getNextSequenceNumber();
} sendNewPacketAndAddToSentList(move(secondPacket), nextNumber);
} }
// Write packet's sequence number and send it off } else {
nextPacket->writeSequenceNumber(sequenceNumber); _packetsLock.unlock();
sendPacket(*nextPacket);
{
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[nextPacket->getSequenceNumber()].swap(nextPacket);
Q_ASSERT_X(!nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
emit packetSent();
if (shouldSendSecondOfPair) {
std::unique_ptr<Packet> pairedPacket;
// we've detected we should send the second packet in a pair, do that now before sleeping
{
QWriteLocker locker(&_packetsLock);
if (_packets.size() > 0) {
pairedPacket.swap(_packets.front());
_packets.pop_front();
}
}
if (pairedPacket) {
// write this packet's sequence number and send it off
pairedPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*pairedPacket);
{
// add the paired packet to the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[pairedPacket->getSequenceNumber()].swap(pairedPacket);
Q_ASSERT_X(!pairedPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
emit packetSent();
}
}
} }
} }

View file

@ -68,6 +68,7 @@ private:
SendQueue(SendQueue&& other) = delete; SendQueue(SendQueue&& other) = delete;
void sendPacket(const Packet& packet); void sendPacket(const Packet& packet);
void sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket, SequenceNumber sequenceNumber);
// Increments current sequence number and return it // Increments current sequence number and return it
SequenceNumber getNextSequenceNumber(); SequenceNumber getNextSequenceNumber();