update the flow window size for the send queue

This commit is contained in:
Stephen Birarda 2015-07-29 15:53:44 -07:00
parent 55555cf13e
commit 4907418587
3 changed files with 63 additions and 44 deletions

View file

@ -368,10 +368,15 @@ void Connection::processACK(std::unique_ptr<ControlPacket> controlPacket) {
_congestionControl->setBandwidth(_bandwidth);
}
// fire the onACK callback for congestion control
// update the last sent sequence number in congestion control
_congestionControl->setSendCurrentSequenceNumber(_sendQueue->getCurrentSequenceNumber());
// fire the onACK callback for congestion control
_congestionControl->onAck(ack);
// now that we've updated the congestion control, update the packet send period and flow window size
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
// update the total count of received ACKs
++_totalReceivedACKs;
@ -437,10 +442,15 @@ void Connection::processNAK(std::unique_ptr<ControlPacket> controlPacket) {
// send that off to the send queue so it knows there was loss
_sendQueue->nak(start, end);
// Tell the congestion control object that there was loss
// update the last sent sequence number in congestion control
_congestionControl->setSendCurrentSequenceNumber(_sendQueue->getCurrentSequenceNumber());
// give the loss to the congestion control object
_congestionControl->onLoss(start, end);
// now that we've updated the congestion control, update the packet send period and flow window size
_sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod);
_sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
++_totalReceivedNAKs;
}

View file

@ -80,13 +80,13 @@ void SendQueue::sendPacket(const BasePacket& packet) {
}
void SendQueue::ack(SequenceNumber ack) {
if (_lastAck == ack) {
if (_lastACKSequenceNumber == (uint32_t) ack) {
return;
}
{ // remove any ACKed packets from the map of sent packets
QWriteLocker locker(&_sentLock);
for (auto seq = _lastAck; seq <= ack; ++seq) {
for (auto seq = SequenceNumber { (uint32_t) _lastACKSequenceNumber }; seq <= ack; ++seq) {
_sentPackets.erase(seq);
}
}
@ -96,7 +96,7 @@ void SendQueue::ack(SequenceNumber ack) {
_naks.remove(_naks.getFirstSequenceNumber(), ack);
}
_lastAck = ack;
_lastACKSequenceNumber = (uint32_t) ack;
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
@ -131,48 +131,52 @@ void SendQueue::loop() {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
if (_nextPacket) {
// we're only allowed to send if the flow window size
// is greater than or equal to the gap between the last ACKed sent and the one we are about to send
if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber + 1) <= _flowWindowSize) {
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (_naks.getLength() > 0) {
hasResend = true;
seqNum = _naks.popFirstSequenceNumber();
}
}
std::unique_ptr<Packet> nextPacket;
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!nextPacket) {
QWriteLocker locker(&_packetsLock);
nextPacket.swap(_packets.front());
_packets.pop_front();
}
// Write packet's sequence number and send it off
_nextPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*_nextPacket);
nextPacket->writeSequenceNumber(getNextSequenceNumber());
sendPacket(*nextPacket);
// Insert the packet we have just sent in the sent list
QWriteLocker locker(&_sentLock);
_sentPackets[_nextPacket->getSequenceNumber()].swap(_nextPacket);
Q_ASSERT_X(!_nextPacket,
_sentPackets[nextPacket->getSequenceNumber()].swap(nextPacket);
Q_ASSERT_X(!nextPacket,
"SendQueue::sendNextPacket()", "Overriden packet in sent list");
}
bool hasResend = false;
SequenceNumber seqNum;
{
// Check nak list for packet to resend
QWriteLocker locker(&_naksLock);
if (_naks.getLength() > 0) {
hasResend = true;
seqNum = _naks.popFirstSequenceNumber();
}
}
// Find packet in sent list using SequenceNumber
if (hasResend) {
QWriteLocker locker(&_sentLock);
auto it = _sentPackets.find(seqNum);
Q_ASSERT_X(it != _sentPackets.end(),
"SendQueue::sendNextPacket()", "Couldn't find NAKed packet to resend");
if (it != _sentPackets.end()) {
it->second.swap(_nextPacket);
_sentPackets.erase(it);
}
}
// If there is no packet to resend, grab the next one in the list
if (!_nextPacket) {
QWriteLocker locker(&_packetsLock);
_nextPacket.swap(_packets.front());
_packets.pop_front();
}
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();

View file

@ -22,6 +22,7 @@
#include "../HifiSockAddr.h"
#include "Constants.h"
#include "SequenceNumber.h"
#include "LossList.h"
@ -45,6 +46,8 @@ public:
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
void setFlowWindowSize(int flowWindowSize) { _flowWindowSize = flowWindowSize; }
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
@ -74,11 +77,11 @@ private:
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
std::unique_ptr<Packet> _nextPacket; // Next packet to be sent
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
SequenceNumber _lastAck; // Last ACKed sequence number
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber; // Atomic for last sequence number sent out
@ -87,6 +90,8 @@ private:
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
std::atomic<int> _flowWindowSize { udt::MAX_PACKETS_IN_FLIGHT }; // Flow control window size (number of packets that can be on wire)
mutable QReadWriteLock _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend