diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index c1d0cc2215..1d8908845f 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -666,7 +666,7 @@ void AudioMixer::run() { connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit); domainHandler.requestDomainSettings(); loop.exec(); - + if (domainHandler.getSettingsObject().isEmpty()) { qDebug() << "Failed to retreive settings object from domain-server. Bailing on assignment."; setFinished(true); diff --git a/libraries/networking/src/DomainHandler.cpp b/libraries/networking/src/DomainHandler.cpp index 62b00a8c98..df024b361d 100644 --- a/libraries/networking/src/DomainHandler.cpp +++ b/libraries/networking/src/DomainHandler.cpp @@ -227,6 +227,9 @@ void DomainHandler::setIsConnected(bool isConnected) { } void DomainHandler::requestDomainSettings() { + // TODO: the nodes basically lock if they don't get a response - add a timeout to this so that they at least restart + // if they can't get settings + NodeType_t owningNodeType = DependencyManager::get<NodeList>()->getOwnerType(); if (owningNodeType == NodeType::Agent) { // for now the agent nodes don't need any settings - this allows local assignment-clients @@ -248,40 +251,6 @@ void DomainHandler::requestDomainSettings() { } } -const int MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS = 5; - -void DomainHandler::settingsRequestFinished() { - QNetworkReply* settingsReply = reinterpret_cast<QNetworkReply*>(sender()); - - int replyCode = settingsReply->attribute(QNetworkRequest::HttpStatusCodeAttribute).toInt(); - - if (settingsReply->error() == QNetworkReply::NoError && replyCode != 301 && replyCode != 302) { - // parse the JSON to a QJsonObject and save it - _settingsObject = QJsonDocument::fromJson(settingsReply->readAll()).object(); - - qCDebug(networking) << "Received domain settings."; - emit settingsReceived(_settingsObject); - - // reset failed settings requests to 0, we got them - _failedSettingsRequests = 0; - } else { - // error grabbing the settings - in some cases this means we are stuck - // so we should retry until we get it - qCDebug(networking) << "Error getting domain settings -" << settingsReply->errorString() << "- retrying"; - - if (++_failedSettingsRequests >= MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS) { - qCDebug(networking) << "Failed to retreive domain-server settings" << MAX_SETTINGS_REQUEST_FAILED_ATTEMPTS - << "times. Re-setting connection to domain."; - clearSettings(); - clearConnectionInfo(); - emit settingsReceiveFail(); - } else { - requestDomainSettings(); - } - } - settingsReply->deleteLater(); -} - void DomainHandler::processSettingsPacketList(QSharedPointer<NLPacketList> packetList) { auto data = packetList->getMessage(); diff --git a/libraries/networking/src/DomainHandler.h b/libraries/networking/src/DomainHandler.h index 349b3934eb..9dd4254c30 100644 --- a/libraries/networking/src/DomainHandler.h +++ b/libraries/networking/src/DomainHandler.h @@ -95,7 +95,6 @@ public slots: private slots: void completedHostnameLookup(const QHostInfo& hostInfo); void completedIceServerHostnameLookup(); - void settingsRequestFinished(); signals: void hostnameChanged(const QString& hostname); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index cfb969a186..5e0e8312f2 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -56,8 +56,11 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : void SendQueue::queuePacket(std::unique_ptr<Packet> packet) { { - QWriteLocker locker(&_packetsLock); + std::lock_guard<std::mutex> locker(_packetsLock); _packets.push_back(std::move(packet)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); } if (!this->thread()->isRunning()) { this->thread()->start(); @@ -95,9 +98,12 @@ void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) { } } - QWriteLocker locker(&_packetsLock); + std::lock_guard<std::mutex> locker(_packetsLock); _packets.splice(_packets.end(), packetList->_packets); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); } if (!this->thread()->isRunning()) { @@ -126,7 +132,7 @@ void SendQueue::ack(SequenceNumber ack) { } { // remove any sequence numbers equal to or lower than this ACK in the loss list - QWriteLocker nakLocker(&_naksLock); + std::lock_guard<std::mutex> nakLocker(_naksLock); if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) { _naks.remove(_naks.getFirstSequenceNumber(), ack); @@ -137,12 +143,15 @@ void SendQueue::ack(SequenceNumber ack) { } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { - QWriteLocker locker(&_naksLock); + std::lock_guard<std::mutex> nakLocker(_naksLock); _naks.insert(start, end); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send + _emptyCondition.notify_one(); } void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { - QWriteLocker locker(&_naksLock); + std::lock_guard<std::mutex> nakLocker(_naksLock); _naks.clear(); SequenceNumber first, second; @@ -156,6 +165,9 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { _naks.append(first, second); } } + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send + _emptyCondition.notify_one(); } SequenceNumber SendQueue::getNextSequenceNumber() { @@ -195,16 +207,20 @@ void SendQueue::run() { // Record timing _lastSendTimestamp = high_resolution_clock::now(); + bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs + bool resentPacket = false; // the following while makes sure that we find a packet to re-send, if there is one while (!resentPacket) { - QWriteLocker naksLocker(&_naksLock); + std::unique_lock<std::mutex> nakLocker(_naksLock); if (_naks.getLength() > 0) { + naksEmpty = _naks.getLength() > 1; + // pull the sequence number we need to re-send SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); - naksLocker.unlock(); + nakLocker.unlock(); // pull the packet to re-send from the sent packets list QReadLocker sentLocker(&_sentLock); @@ -233,21 +249,27 @@ void SendQueue::run() { // we'll fire the loop again to see if there is another to re-send continue; } + } else { + naksEmpty = true; } // break from the while, we didn't resend a packet break; } + bool packetsEmpty = false; // used after processing to check if we should wait for packets + bool sentPacket = false; + // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (!resentPacket && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { // we didn't re-send a packet, so time to send a new one - QWriteLocker locker(&_packetsLock); + std::unique_lock<std::mutex> locker(_packetsLock); if (_packets.size() > 0) { + SequenceNumber nextNumber = getNextSequenceNumber(); // grab the first packet we will send @@ -266,9 +288,13 @@ void SendQueue::run() { } } + packetsEmpty = _packets.size() == 0; + // unlock the packets, we're done pulling locker.unlock(); + sentPacket = true; + // definitely send the first packet sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); @@ -279,6 +305,7 @@ void SendQueue::run() { } } else { + packetsEmpty = true; locker.unlock(); } } @@ -291,6 +318,24 @@ void SendQueue::run() { break; } + if (packetsEmpty && naksEmpty) { + // During our processing above the loss list and packet list were both empty. + + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packetsLock, _naksLock); + + // The packets queue and loss list mutexes are now both locked - check if they're still both empty + if (_packets.empty() && _naks.getLength() == 0) { + // both are empty - let's use a condition_variable_any to wait + _emptyCondition.wait(doubleLock); + + // we have the double lock again - it'll be unlocked once it goes out of scope + // skip to the next iteration + continue; + } + } + // sleep as long as we need until next packet send, if we can auto now = high_resolution_clock::now(); auto microsecondDuration = duration_cast<microseconds>((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 901a9f7a87..618cf17009 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -13,7 +13,9 @@ #define hifi_SendQueue_h #include <chrono> +#include <condition_variable> #include <list> +#include <mutex> #include <unordered_map> #include <QtCore/QObject> @@ -40,11 +42,26 @@ class SendQueue : public QObject { Q_OBJECT public: + + class DoubleLock { + public: + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); } + ~DoubleLock() { unlock(); } + + DoubleLock(const DoubleLock&) = delete; + DoubleLock& operator=(const DoubleLock&) = delete; + + void lock() { std::lock(_mutex1, _mutex2); } + void unlock() { _mutex1.unlock(); _mutex2.unlock(); } + private: + std::mutex& _mutex1; + std::mutex& _mutex2; + }; + static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination); void queuePacket(std::unique_ptr<Packet> packet); void queuePacketList(std::unique_ptr<PacketList> packetList); - int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); } SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } @@ -79,29 +96,31 @@ private: SequenceNumber getNextSequenceNumber(); MessageNumber getNextMessageNumber(); - mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list. + mutable std::mutex _packetsLock; // Protects the packets to be sent list. std::list<std::unique_ptr<Packet>> _packets; // List of packets to be sent Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr - std::atomic<uint32_t> _lastACKSequenceNumber; // Last ACKed sequence number + std::atomic<uint32_t> _lastACKSequenceNumber { 0 }; // Last ACKed sequence number MessageNumber _currentMessageNumber { 0 }; SequenceNumber _currentSequenceNumber; // Last sequence number sent out - std::atomic<uint32_t> _atomicCurrentSequenceNumber;// Atomic for last sequence number sent out + std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out - std::atomic<int> _packetSendPeriod; // Interval between two packet send event in microseconds, set from CC + std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure std::atomic<bool> _isRunning { false }; - std::atomic<int> _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC + std::atomic<int> _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC - mutable QReadWriteLock _naksLock; // Protects the naks list. + mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend mutable QReadWriteLock _sentLock; // Protects the sent packet list std::unordered_map<SequenceNumber, std::unique_ptr<Packet>> _sentPackets; // Packets waiting for ACK. + + std::condition_variable_any _emptyCondition; }; }