From e84595af4936327aae823b3984f4ffbdc8686616 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 24 Aug 2015 16:50:54 -0700 Subject: [PATCH] add a wait condition for empty queue and loss list --- libraries/networking/src/udt/SendQueue.cpp | 61 +++++++++++++++++++--- libraries/networking/src/udt/SendQueue.h | 27 ++++++++-- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index cfb969a186..fcede6fe03 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -56,8 +56,11 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : void SendQueue::queuePacket(std::unique_ptr packet) { { - QWriteLocker locker(&_packetsLock); + std::lock_guard locker(_packetsLock); _packets.push_back(std::move(packet)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); } if (!this->thread()->isRunning()) { this->thread()->start(); @@ -95,9 +98,12 @@ void SendQueue::queuePacketList(std::unique_ptr packetList) { } } - QWriteLocker locker(&_packetsLock); + std::lock_guard locker(_packetsLock); _packets.splice(_packets.end(), packetList->_packets); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); } if (!this->thread()->isRunning()) { @@ -126,7 +132,7 @@ void SendQueue::ack(SequenceNumber ack) { } { // remove any sequence numbers equal to or lower than this ACK in the loss list - QWriteLocker nakLocker(&_naksLock); + std::lock_guard nakLocker(_naksLock); if (_naks.getLength() > 0 && _naks.getFirstSequenceNumber() <= ack) { _naks.remove(_naks.getFirstSequenceNumber(), ack); @@ -137,12 +143,15 @@ void SendQueue::ack(SequenceNumber ack) { } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { - QWriteLocker locker(&_naksLock); + std::lock_guard nakLocker(_naksLock); _naks.insert(start, end); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send + _emptyCondition.notify_one(); } void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { - QWriteLocker locker(&_naksLock); + std::lock_guard nakLocker(_naksLock); _naks.clear(); SequenceNumber first, second; @@ -156,6 +165,9 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { _naks.append(first, second); } } + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for losses to re-send + _emptyCondition.notify_one(); } SequenceNumber SendQueue::getNextSequenceNumber() { @@ -195,16 +207,20 @@ void SendQueue::run() { // Record timing _lastSendTimestamp = high_resolution_clock::now(); + bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs + bool resentPacket = false; // the following while makes sure that we find a packet to re-send, if there is one while (!resentPacket) { - QWriteLocker naksLocker(&_naksLock); + std::unique_lock nakLocker(_naksLock); if (_naks.getLength() > 0) { + naksEmpty = _naks.getLength() > 1; + // pull the sequence number we need to re-send SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); - naksLocker.unlock(); + nakLocker.unlock(); // pull the packet to re-send from the sent packets list QReadLocker sentLocker(&_sentLock); @@ -233,21 +249,27 @@ void SendQueue::run() { // we'll fire the loop again to see if there is another to re-send continue; } + } else { + naksEmpty = true; } // break from the while, we didn't resend a packet break; } + bool packetsEmpty = false; // used after processing to check if we should wait for packets + bool sentPacket = false; + // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (!resentPacket && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { // we didn't re-send a packet, so time to send a new one - QWriteLocker locker(&_packetsLock); + std::unique_lock locker(_packetsLock); if (_packets.size() > 0) { + SequenceNumber nextNumber = getNextSequenceNumber(); // grab the first packet we will send @@ -266,9 +288,13 @@ void SendQueue::run() { } } + packetsEmpty = _packets.size() == 0; + // unlock the packets, we're done pulling locker.unlock(); + sentPacket = true; + // definitely send the first packet sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); @@ -279,6 +305,7 @@ void SendQueue::run() { } } else { + packetsEmpty = true; locker.unlock(); } } @@ -291,6 +318,24 @@ void SendQueue::run() { break; } + if (packetsEmpty && naksEmpty) { + // During our processing above the loss list and packet list were both empty. + + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packetsLock, _naksLock); + + // The packets queue and loss list mutexes are now both locked - check if they're still both empty + if (_packets.empty() && _naks.getLength() == 0) { + // both are empty - let's use a condition_variable_any to wait + _emptyCondition.wait(doubleLock); + + // we have the double lock again - it'll be unlock once it goes out of scope + // skip to the next iteration + continue; + } + } + // sleep as long as we need until next packet send, if we can auto now = high_resolution_clock::now(); auto microsecondDuration = duration_cast((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 901a9f7a87..af53ef79ec 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -13,7 +13,9 @@ #define hifi_SendQueue_h #include +#include #include +#include #include #include @@ -40,11 +42,26 @@ class SendQueue : public QObject { Q_OBJECT public: + + class DoubleLock { + public: + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); } + ~DoubleLock() { unlock(); } + + DoubleLock(const DoubleLock&) = delete; + DoubleLock& operator=(const DoubleLock&) = delete; + + void lock() { std::lock(_mutex1, _mutex2); } + void unlock() { _mutex1.unlock(); _mutex2.unlock(); } + private: + std::mutex& _mutex1; + std::mutex& _mutex2; + }; + static std::unique_ptr create(Socket* socket, HifiSockAddr destination); void queuePacket(std::unique_ptr packet); void queuePacketList(std::unique_ptr packetList); - int getQueueSize() const { QReadLocker locker(&_packetsLock); return _packets.size(); } SequenceNumber getCurrentSequenceNumber() const { return SequenceNumber(_atomicCurrentSequenceNumber); } @@ -79,13 +96,13 @@ private: SequenceNumber getNextSequenceNumber(); MessageNumber getNextMessageNumber(); - mutable QReadWriteLock _packetsLock; // Protects the packets to be sent list. + mutable std::mutex _packetsLock; // Protects the packets to be sent list. std::list> _packets; // List of packets to be sent Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr - std::atomic _lastACKSequenceNumber; // Last ACKed sequence number + std::atomic _lastACKSequenceNumber { 0 }; // Last ACKed sequence number MessageNumber _currentMessageNumber { 0 }; SequenceNumber _currentSequenceNumber; // Last sequence number sent out @@ -97,11 +114,13 @@ private: std::atomic _flowWindowSize; // Flow control window size (number of packets that can be on wire) - set from CC - mutable QReadWriteLock _naksLock; // Protects the naks list. + mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend mutable QReadWriteLock _sentLock; // Protects the sent packet list std::unordered_map> _sentPackets; // Packets waiting for ACK. + + std::condition_variable_any _emptyCondition; }; }