Emit queueInnactive when flow window is full for too long

This commit is contained in:
Atlante45 2015-08-26 17:49:17 +02:00
parent 8049819beb
commit 3184dee10a
2 changed files with 61 additions and 43 deletions

View file

@ -25,7 +25,27 @@
#include "Socket.h"
using namespace udt;
using namespace std::chrono;
// This class is not thread-safe
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { };
~DoubleLock() { unlock(); }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
bool locked() { return _locked; }
bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); }
void lock() { std::lock(_mutex1, _mutex2); _locked = true; }
void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } }
private:
std::atomic<bool> _locked { false };
std::mutex& _mutex1;
std::mutex& _mutex2;
};
std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destination) {
auto queue = std::unique_ptr<SendQueue>(new SendQueue(socket, destination));
@ -206,7 +226,7 @@ void SendQueue::run() {
while (_isRunning) {
// Record how long the loop takes to execute
auto loopStartTimestamp = high_resolution_clock::now();
auto loopStartTimestamp = clock::now();
bool sentAPacket = maybeResendPacket();
bool flowWindowFull = false;
@ -219,6 +239,12 @@ void SendQueue::run() {
sentAPacket = maybeSendNewPacket();
}
// Keep track of how long the flow window has been full for
if (flowWindowFull && !_flowWindowWasFull) {
_flowWindowFullSince = loopStartTimestamp;
}
_flowWindowWasFull = flowWindowFull;
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
@ -227,35 +253,42 @@ void SendQueue::run() {
break;
}
if (!sentAPacket && !flowWindowFull) {
// During our processing above we didn't send any packets and the flow window is not full.
if (!sentAPacket) {
static const std::chrono::seconds CONSIDER_INNACTIVE_AFTER { 5 };
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
static const seconds CONSIDER_INNACTIVE_AFTER { 5 };
auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER);
if (flowWindowFull && (clock::now() - _flowWindowFullSince) > CONSIDER_INNACTIVE_AFTER) {
// If the flow window has been full for over CONSIDER_INNACTIVE_AFTER,
// then signal the queue is innactive
emit queueInnactive();
} else {
// During our processing above we didn't send any packets and the flow window is not full.
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// Check if we've been innactive for too long
if (cvStatus == std::cv_status::timeout) {
emit queueInnactive();
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER);
// Check if we've been innactive for too long
if (cvStatus == std::cv_status::timeout) {
emit queueInnactive();
}
// we have the double lock again - it'll be unlocked once it goes out of scope
// skip to the next iteration
continue;
}
// we have the double lock again - it'll be unlocked once it goes out of scope
// skip to the next iteration
continue;
}
}
auto loopEndTimestamp = high_resolution_clock::now();
auto loopEndTimestamp = clock::now();
// sleep as long as we need until next packet send, if we can
auto timeToSleep = (loopStartTimestamp + microseconds(_packetSendPeriod)) - loopEndTimestamp;
auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp;
if (timeToSleep > timeToSleep.zero()) {
std::this_thread::sleep_for(timeToSleep);
}

View file

@ -42,27 +42,8 @@ class SendQueue : public QObject {
Q_OBJECT
public:
// This class is not thread-safe
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { };
~DoubleLock() { unlock(); }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
bool locked() { return _locked; }
bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); }
void lock() { std::lock(_mutex1, _mutex2); _locked = true; }
void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } }
private:
std::atomic<bool> _locked { false };
std::mutex& _mutex1;
std::mutex& _mutex2;
};
using clock = std::chrono::high_resolution_clock;
using time_point = clock::time_point;
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
@ -124,6 +105,10 @@ private:
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
// Used to detect when the connection becomes innactive for too long
bool _flowWindowWasFull = false;
time_point _flowWindowFullSince;
mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend