Merge pull request #15 from birarda/wait-condition

add a wait condition in SendQueue if there's nothing to do
This commit is contained in:
Clément Brisset 2015-08-26 14:57:16 +02:00
commit 2745cc5d12
5 changed files with 83 additions and 51 deletions

View file

@ -666,7 +666,7 @@ void AudioMixer::run() {
connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit);
domainHandler.requestDomainSettings();
loop.exec();
if (domainHandler.getSettingsObject().isEmpty()) {
qDebug() << "Failed to retreive settings object from domain-server. Bailing on assignment.";
setFinished(true);

View file

@ -227,6 +227,9 @@ void DomainHandler::setIsConnected(bool isConnected) {
}
void DomainHandler::requestDomainSettings() {
// TODO: the nodes basically lock if they don't get a response - add a timeout to this so that they at least restart
// if they can't get settings
NodeType_t owningNodeType = DependencyManager::get<NodeList>()->getOwnerType();
if (owningNodeType == NodeType::Agent) {
// for now the agent nodes don't need any settings - this allows local assignment-clients
@ -248,40 +251,6 @@ void DomainHandler::requestDomainSettings() {
}
}
const int MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS = 5;
void DomainHandler::settingsRequestFinished() {
QNetworkReply* settingsReply = reinterpret_cast<QNetworkReply*>(sender());
int replyCode = settingsReply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt();
if (settingsReply->error() == QNetworkReply::NoError && replyCode != 301 && replyCode != 302) {
// parse the JSON to a QJsonObject and save it
_settingsObject = QJsonDocument::fromJson(settingsReply->readAll()).object();
qCDebug(networking) << "Received domain settings.";
emit settingsReceived(_settingsObject);
// reset failed settings requests to 0, we got them
_failedSettingsRequests = 0;
} else {
// error grabbing the settings - in some cases this means we are stuck
// so we should retry until we get it
qCDebug(networking) << "Error getting domain settings -" << settingsReply->errorString() << "- retrying";
if (++_failedSettingsRequests >= MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS) {
qCDebug(networking) << "Failed to retreive domain-server settings" << MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS
<< "times. Re-setting connection to domain.";
clearSettings();
clearConnectionInfo();
emit settingsReceiveFail();
} else {
requestDomainSettings();
}
}
settingsReply->deleteLater();
}
void DomainHandler::processSettingsPacketList(QSharedPointer<NLPacketList> packetList) {
auto data = packetList->getMessage();

View file

@ -95,7 +95,6 @@ public slots:
private slots:
void completedHostnameLookup(const QHostInfo& hostInfo);
void completedIceServerHostnameLookup();
void settingsRequestFinished();
signals:
void hostnameChanged(const QString& hostname);

View file

@ -56,8 +56,11 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) :
void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
{
QWriteLocker locker(&_packetsLock);
std::lock_guard<std::mutex> locker(_packetsLock);
_packets.push_back(std::move(packet));
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
if (!this->thread()->isRunning()) {
this->thread()->start();
@ -95,9 +98,12 @@ void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
}
}
QWriteLocker locker(&_packetsLock);
std::lock_guard<std::mutex> locker(_packetsLock);
_packets.splice(_packets.end(), packetList->_packets);
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
if (!this->thread()->isRunning()) {
@ -126,7 +132,7 @@ void SendQueue::ack(SequenceNumber ack) {
}
{ // remove any sequence numbers equal to or lower than this ACK in the loss list
QWriteLocker nakLocker(&_naksLock);
std::lock_guard<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) {
_naks.remove(_naks.getFirstSequenceNumber(), ack);
@ -137,12 +143,15 @@ void SendQueue::ack(SequenceNumber ack) {
}
void SendQueue::nak(SequenceNumber start, SequenceNumber end) {
QWriteLocker locker(&_naksLock);
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.insert(start, end);
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
}
void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
QWriteLocker locker(&_naksLock);
std::lock_guard<std::mutex> nakLocker(_naksLock);
_naks.clear();
SequenceNumber first, second;
@ -156,6 +165,9 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) {
_naks.append(first, second);
}
}
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send
_emptyCondition.notify_one();
}
SequenceNumber SendQueue::getNextSequenceNumber() {
@ -195,16 +207,20 @@ void SendQueue::run() {
// Record timing
_lastSendTimestamp = high_resolution_clock::now();
bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs
bool resentPacket = false;
// the following while makes sure that we find a packet to re-send, if there is one
while (!resentPacket) {
QWriteLocker naksLocker(&_naksLock);
std::unique_lock<std::mutex> nakLocker(_naksLock);
if (_naks.getLength() > 0) {
naksEmpty = _naks.getLength() > 1;
// pull the sequence number we need to re-send
SequenceNumber resendNumber = _naks.popFirstSequenceNumber();
naksLocker.unlock();
nakLocker.unlock();
// pull the packet to re-send from the sent packets list
QReadLocker sentLocker(&_sentLock);
@ -233,21 +249,27 @@ void SendQueue::run() {
// we'll fire the loop again to see if there is another to re-send
continue;
}
} else {
naksEmpty = true;
}
// break from the while, we didn't resend a packet
break;
}
bool packetsEmpty = false; // used after processing to check if we should wait for packets
bool sentPacket = false;
// if we didn't find a packet to re-send AND we think we can fit a new packet on the wire
// (this is according to the current flow window size) then we send out a new packet
if (!resentPacket
&& seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) {
// we didn't re-send a packet, so time to send a new one
QWriteLocker locker(&_packetsLock);
std::unique_lock<std::mutex> locker(_packetsLock);
if (_packets.size() > 0) {
SequenceNumber nextNumber = getNextSequenceNumber();
// grab the first packet we will send
@ -266,9 +288,13 @@ void SendQueue::run() {
}
}
packetsEmpty = _packets.size() == 0;
// unlock the packets, we're done pulling
locker.unlock();
sentPacket = true;
// definitely send the first packet
sendNewPacketAndAddToSentList(move(firstPacket), nextNumber);
@ -279,6 +305,7 @@ void SendQueue::run() {
}
} else {
packetsEmpty = true;
locker.unlock();
}
}
@ -291,6 +318,24 @@ void SendQueue::run() {
break;
}
if (packetsEmpty && naksEmpty) {
// During our processing above the loss list and packet list were both empty.
// If that is still the case we should use a condition_variable_any to sleep until we have data to handle.
// To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock
DoubleLock doubleLock(_packetsLock, _naksLock);
// The packets queue and loss list mutexes are now both locked - check if they're still both empty
if (_packets.empty() && _naks.getLength() == 0) {
// both are empty - let's use a condition_variable_any to wait
_emptyCondition.wait(doubleLock);
// we have the double lock again - it'll be unlocked once it goes out of scope
// skip to the next iteration
continue;
}
}
// sleep as long as we need until next packet send, if we can
auto now = high_resolution_clock::now();
auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now);

View file

@ -13,7 +13,9 @@
#define hifi_SendQueue_h
#include <chrono>
#include <condition_variable>
#include <list>
#include <mutex>
#include <unordered_map>
#include <QtCore/QObject>
@ -40,11 +42,26 @@ class SendQueue : public QObject {
Q_OBJECT
public:
class DoubleLock {
public:
DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); }
~DoubleLock() { unlock(); }
DoubleLock(const DoubleLock&) = delete;
DoubleLock& operator=(const DoubleLock&) = delete;
void lock() { std::lock(_mutex1, _mutex2); }
void unlock() { _mutex1.unlock(); _mutex2.unlock(); }
private:
std::mutex& _mutex1;
std::mutex& _mutex2;
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
void queuePacketList(std::unique_ptr<PacketList> packetList);
int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); }
SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); }
@ -79,29 +96,31 @@ private:
SequenceNumber getNextSequenceNumber();
MessageNumber getNextMessageNumber();
mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list.
mutable std::mutex _packetsLock; // Protects the packets to be sent list.
std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent
Socket* _socket { nullptr }; // Socket to send packet on
HifiSockAddr _destination; // Destination addr
std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number
std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number
MessageNumber _currentMessageNumber { 0 };
SequenceNumber _currentSequenceNumber; // Last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber;// Atomic for last sequence number sent out
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod; // Interval between two packet send event in microseconds, set from CC
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure
std::atomic<bool> _isRunning { false };
std::atomic<int> _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC
std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC
mutable QReadWriteLock _naksLock; // Protects the naks list.
mutable std::mutex _naksLock; // Protects the naks list.
LossList _naks; // Sequence numbers of packets to resend
mutable QReadWriteLock _sentLock; // Protects the sent packet list
std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK.
std::condition_variable_any _emptyCondition;
};
}