change to timeout behaviour to re-send unACKed packets

This commit is contained in:
Stephen Birarda 2015-08-28 09:10:06 -07:00
parent cf98d4a8f7
commit 77aeae7dc0
3 changed files with 73 additions and 28 deletions

View file

@ -727,9 +727,12 @@ void Connection::updateCongestionControlAndSendQueue(std::function<void ()> cong
// fire congestion control callback
congestionCallback();
auto& sendQueue = getSendQueue();
// now that we've updated the congestion control, update the packet send period and flow window size
getSendQueue().setPacketSendPeriod(_congestionControl->_packetSendPeriod);
getSendQueue().setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod);
sendQueue.setEstimatedTimeout(estimatedTimeout());
sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize));
// record connection stats
_stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod);

View file

@ -154,6 +154,9 @@ void SendQueue::sendPacket(const Packet& packet) {
}
void SendQueue::ack(SequenceNumber ack) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
if (_lastACKSequenceNumber == (uint32_t) ack) {
return;
}
@ -177,6 +180,9 @@ void SendQueue::ack(SequenceNumber ack) {
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
@ -189,6 +195,9 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
}
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
// this is a response from the client, re-set our timeout expiry
_timeoutExpiryCount = 0;
std::unique_lock<std::mutex> nakLocker(_naksLock);
_naks.clear();
@ -212,7 +221,7 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
}
void SendQueue::handshakeACK() {
std::unique_lock<std::mutex> locker(_handshakeMutex);
std::unique_lock<std::mutex> locker { _handshakeMutex };
_hasReceivedHandshakeACK = true;
@ -258,7 +267,7 @@ void SendQueue::run() {
while (_isRunning) {
// Record how long the loop takes to execute
auto loopStartTimestamp = high_resolution_clock::now();
std::unique_lock<std::mutex> handshakeLock { _handshakeMutex };
if (!_hasReceivedHandshakeACK) {
@ -305,12 +314,6 @@ void SendQueue::run() {
sentAPacket = maybeSendNewPacket();
}
// Keep track of how long the flow window has been full for
if (flowWindowFull && !_flowWindowWasFull) {
_flowWindowFullSince = loopStartTimestamp;
}
_flowWindowWasFull = flowWindowFull;
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
@ -320,15 +323,22 @@ void SendQueue::run() {
}
if (_hasReceivedHandshakeACK && !sentAPacket) {
static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 };
// check if it is time to break this connection
if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) {
// that will be the case if we have had 16 timeouts since hearing back from the client, and it has been
// at least 10 seconds
static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16;
if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE) {
// If the flow window has been full for over CONSIDER_INACTIVE_AFTER,
// then signal the queue is inactive and return so it can be cleaned up
qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is"
<< "considered inactive. It is now being stopped.";
emit queueInactive();
return;
} else {
// During our processing above we didn't send any packets and the flow window is not full.
// During our processing above we didn't send any packets
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
@ -337,21 +347,49 @@ void SendQueue::run() {
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) {
// we've sent the client as much data as we have (and they've ACKed it)
// either wait for new data to send or 5 seconds before cleaning up the queue
static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT_US = std::chrono::microseconds(5 * 1000 * 1000);
if (cvStatus == std::cv_status::timeout) {
// the wait_for released because we've been inactive for too long
// so emit our inactive signal and return so the send queue can be cleaned up
emit queueInactive();
return;
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT_US);
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
if (cvStatus == std::cv_status::timeout) {
qDebug() << "SendQueue to" << _destination << "has been empty for"
<< std::chrono::duration_cast<std::chrono::seconds>(EMPTY_QUEUES_INACTIVE_TIMEOUT_US).count()
<< "and receiver has ACKed all packets. The queue is considered inactive and will be stopped";
// this queue is inactive - emit that signal and stop the while
emit queueInactive();
return;
}
} else {
// We think the client is still waiting for data (based on the sequence number gap)
// Let's wait either for a response from the client or until the estimated timeout
auto waitDuration = std::chrono::microseconds(_estimatedTimeout);
// use our condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration);
if (cvStatus == std::cv_status::timeout) {
// increase the number of timeouts
++_timeoutExpiryCount;
// Add all of the packets above the last received ACKed sequence number to the loss list
// Note that thanks to the DoubleLock we have the _naksLock right now
_naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber);
}
// we have the double lock again - Make sure to unlock it
doubleLock.unlock();
// skip to the next iteration
continue;
}
// skip to the next iteration
continue;
}
}
}
@ -452,4 +490,3 @@ bool SendQueue::maybeResendPacket() {
// No packet was resent
return false;
}

View file

@ -57,6 +57,8 @@ public:
int getPacketSendPeriod() const { return _packetSendPeriod; }
void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; }
void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; }
public slots:
void stop();
@ -104,6 +106,9 @@ private:
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::atomic<bool> _isRunning { false };
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
// Used to detect when the connection becomes inactive for too long
@ -117,7 +122,7 @@ private:
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable
bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
std::atomic<bool> _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client
std::condition_variable _handshakeACKCondition;
std::condition_variable_any _emptyCondition;