From f802f354caf5357abb22558ffc5ba5e73c527c4d Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Fri, 21 Aug 2015 15:36:03 +0200 Subject: [PATCH 1/9] prefer scoped locker to unlocks --- libraries/networking/src/PacketReceiver.cpp | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/libraries/networking/src/PacketReceiver.cpp b/libraries/networking/src/PacketReceiver.cpp index a086949ac8..74a8ef8ff1 100644 --- a/libraries/networking/src/PacketReceiver.cpp +++ b/libraries/networking/src/PacketReceiver.cpp @@ -206,12 +206,13 @@ void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object, void PacketReceiver::unregisterListener(QObject* listener) { Q_ASSERT_X(listener, "PacketReceiver::unregisterListener", "No listener to unregister"); - QMutexLocker packetListenerLocker(&_packetListenerLock); - std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap), - [&listener](const ObjectMethodPair& pair) { - return pair.first == listener; - }); - packetListenerLocker.unlock(); + { + QMutexLocker packetListenerLocker(&_packetListenerLock); + std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap), + [&listener](const ObjectMethodPair& pair) { + return pair.first == listener; + }); + } QMutexLocker directConnectSetLocker(&_directConnectSetMutex); _directlyConnectedObjects.remove(listener); @@ -454,7 +455,6 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr packet) { // if it exists, remove the listener from _directlyConnectedObjects QMutexLocker locker(&_directConnectSetMutex); _directlyConnectedObjects.remove(listener.first); - locker.unlock(); } } else if (it == _packetListenerMap.end()) { @@ -463,7 +463,4 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr packet) { // insert a dummy listener so we don't print this again _packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() }); } - - packetListenerLocker.unlock(); - } From 373d4b894482d9651917fb4fc5d8cb8fda05c9b1 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 15:53:49 +0200 Subject: [PATCH 2/9] Remove connection from socket when innactive If innactive for more than 5 seconds, remove it from hash --- libraries/networking/src/udt/Connection.cpp | 5 + libraries/networking/src/udt/Connection.h | 2 + libraries/networking/src/udt/SendQueue.cpp | 223 ++++++++++---------- libraries/networking/src/udt/SendQueue.h | 18 +- libraries/networking/src/udt/Socket.cpp | 10 +- libraries/networking/src/udt/Socket.h | 1 + 6 files changed, 143 insertions(+), 116 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 9f8b1eb3ee..c640392263 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -72,6 +72,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); + QObject::connect(_sendQueue.get(), &SendQueue::queueInnactive, this, &Connection::queueInnactive); // set defaults on the send queue from our congestion control object _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -81,6 +82,10 @@ SendQueue& Connection::getSendQueue() { return *_sendQueue; } +void Connection::queueInnactive() { + emit connectionInnactive(_destination); +} + void Connection::sendReliablePacket(std::unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); getSendQueue().queuePacket(std::move(packet)); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 307c90eda5..fd7cd5ef88 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -73,10 +73,12 @@ public: signals: void packetSent(); + void connectionInnactive(HifiSockAddr sockAdrr); private slots: void recordSentPackets(int payload, int total); void recordRetransmission(); + void queueInnactive(); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 5e0e8312f2..65e5f9f7ba 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -12,6 +12,7 @@ #include "SendQueue.h" #include +#include #include #include @@ -194,8 +195,8 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, // Insert the packet we have just sent in the sent list QWriteLocker locker(&_sentLock); _sentPackets[newPacket->getSequenceNumber()].swap(newPacket); - Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); } + Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); emit packetSent(packetSize, payloadSize); } @@ -204,110 +205,18 @@ void SendQueue::run() { _isRunning = true; while (_isRunning) { - // Record timing - _lastSendTimestamp = high_resolution_clock::now(); + // Record how long the loop takes to execute + auto loopStartTimestamp = high_resolution_clock::now(); - bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs + bool sentAPacket = maybeResendPacket(); + bool flowWindowFull = false; - bool resentPacket = false; - - // the following while makes sure that we find a packet to re-send, if there is one - while (!resentPacket) { - std::unique_lock nakLocker(_naksLock); - - if (_naks.getLength() > 0) { - naksEmpty = _naks.getLength() > 1; - - // pull the sequence number we need to re-send - SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); - nakLocker.unlock(); - - // pull the packet to re-send from the sent packets list - QReadLocker sentLocker(&_sentLock); - - // see if we can find the packet to re-send - auto it = _sentPackets.find(resendNumber); - - if (it != _sentPackets.end()) { - // we found the packet - grab it - auto& resendPacket = *(it->second); - - // unlock the sent packets - sentLocker.unlock(); - - // send it off - sendPacket(resendPacket); - emit packetRetransmitted(); - - // mark that we did resend a packet - resentPacket = true; - - // break out of our while now that we have re-sent a packet - break; - } else { - // we didn't find this packet in the sentPackets queue - assume this means it was ACKed - // we'll fire the loop again to see if there is another to re-send - continue; - } - } else { - naksEmpty = true; - } - - // break from the while, we didn't resend a packet - break; - } - - bool packetsEmpty = false; // used after processing to check if we should wait for packets - bool sentPacket = false; - // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet - if (!resentPacket - && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { - - // we didn't re-send a packet, so time to send a new one - std::unique_lock locker(_packetsLock); - - if (_packets.size() > 0) { - - SequenceNumber nextNumber = getNextSequenceNumber(); - - // grab the first packet we will send - std::unique_ptr firstPacket; - firstPacket.swap(_packets.front()); - _packets.pop_front(); - - std::unique_ptr secondPacket; - - if (((uint32_t) nextNumber & 0xF) == 0) { - // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets - // pull off a second packet if we can before we unlock - if (_packets.size() > 0) { - secondPacket.swap(_packets.front()); - _packets.pop_front(); - } - } - - packetsEmpty = _packets.size() == 0; - - // unlock the packets, we're done pulling - locker.unlock(); - - sentPacket = true; - - // definitely send the first packet - sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); - - // do we have a second in a pair to send as well? - if (secondPacket) { - nextNumber = getNextSequenceNumber(); - sendNewPacketAndAddToSentList(move(secondPacket), nextNumber); - } - - } else { - packetsEmpty = true; - locker.unlock(); - } + if (!sentAPacket) { + flowWindowFull = (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > + _flowWindowSize); + sentAPacket = maybeSendNewPacket(); } // since we're a while loop, give the thread a chance to process events @@ -318,17 +227,24 @@ void SendQueue::run() { break; } - if (packetsEmpty && naksEmpty) { - // During our processing above the loss list and packet list were both empty. + if (!sentAPacket && !flowWindowFull) { + // During our processing above we didn't send any packets and the flow window is not full. // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock DoubleLock doubleLock(_packetsLock, _naksLock); // The packets queue and loss list mutexes are now both locked - check if they're still both empty - if (_packets.empty() && _naks.getLength() == 0) { + if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { // both are empty - let's use a condition_variable_any to wait - _emptyCondition.wait(doubleLock); + static const seconds CONSIDER_INNACTIVE_AFTER { 5 }; + auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER); + + + // Check if we've been innactive for too long + if (cvStatus == std::cv_status::timeout) { + emit queueInnactive(); + } // we have the double lock again - it'll be unlocked once it goes out of scope // skip to the next iteration @@ -336,12 +252,99 @@ void SendQueue::run() { } } - // sleep as long as we need until next packet send, if we can - auto now = high_resolution_clock::now(); - auto microsecondDuration = duration_cast((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now); + auto loopEndTimestamp = high_resolution_clock::now(); - if (microsecondDuration.count() > 0) { - usleep(microsecondDuration.count()); + // sleep as long as we need until next packet send, if we can + auto timeToSleep = (loopStartTimestamp + microseconds(_packetSendPeriod)) - loopEndTimestamp; + if (timeToSleep > timeToSleep.zero()) { + std::this_thread::sleep_for(timeToSleep); } } } + +bool SendQueue::maybeSendNewPacket() { + // we didn't re-send a packet, so time to send a new one + std::unique_lock locker(_packetsLock); + + if (_packets.size() > 0) { + SequenceNumber nextNumber = getNextSequenceNumber(); + + // grab the first packet we will send + std::unique_ptr firstPacket; + firstPacket.swap(_packets.front()); + _packets.pop_front(); + + std::unique_ptr secondPacket; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + if (_packets.size() > 0) { + secondPacket.swap(_packets.front()); + _packets.pop_front(); + } + } + + // unlock the packets, we're done pulling + locker.unlock(); + + // definitely send the first packet + sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); + + // do we have a second in a pair to send as well? + if (secondPacket) { + sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); + } + + // We sent our packet(s), return here + return true; + } + + // No packets were sent + return false; +} + +bool SendQueue::maybeResendPacket() { + // the following while makes sure that we find a packet to re-send, if there is one + while (true) { + std::unique_lock naksLocker(_naksLock); + + if (_naks.getLength() > 0) { + // pull the sequence number we need to re-send + SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); + naksLocker.unlock(); + + // pull the packet to re-send from the sent packets list + QReadLocker sentLocker(&_sentLock); + + // see if we can find the packet to re-send + auto it = _sentPackets.find(resendNumber); + + if (it != _sentPackets.end()) { + // we found the packet - grab it + auto& resendPacket = *(it->second); + + // unlock the sent packets + sentLocker.unlock(); + + // send it off + sendPacket(resendPacket); + emit packetRetransmitted(); + + // Signal that we did resend a packet + return true; + } else { + // we didn't find this packet in the sentPackets queue - assume this means it was ACKed + // we'll fire the loop again to see if there is another to re-send + continue; + } + } + + // break from the while, we didn't resend a packet + break; + } + + // No packet was resent + return false; +} + diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 618cf17009..1d3d0e4c51 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -43,17 +43,23 @@ class SendQueue : public QObject { public: + // This class is not thread-safe class DoubleLock { public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); } + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }; ~DoubleLock() { unlock(); } DoubleLock(const DoubleLock&) = delete; DoubleLock& operator=(const DoubleLock&) = delete; - void lock() { std::lock(_mutex1, _mutex2); } - void unlock() { _mutex1.unlock(); _mutex2.unlock(); } + bool locked() { return _locked; } + + bool try_lock() { _locked = (std::try_lock(_mutex1, _mutex2) == -1); return _locked; } + void lock() { std::lock(_mutex1, _mutex2); _locked = true; } + void unlock() { if (locked()) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } + private: + std::atomic _locked { false }; std::mutex& _mutex1; std::mutex& _mutex2; }; @@ -81,6 +87,8 @@ signals: void packetSent(int dataSize, int payloadSize); void packetRetransmitted(); + void queueInnactive(); + private slots: void run(); @@ -92,6 +100,9 @@ private: void sendPacket(const Packet& packet); void sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); + bool maybeSendNewPacket(); // Figures out what packet to send next + bool maybeResendPacket(); // Determines whether to resend a packet and wich one + // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); MessageNumber getNextMessageNumber(); @@ -109,7 +120,6 @@ private: std::atomic _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC - std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure std::atomic _isRunning { false }; std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 0aac7f8f99..f423353836 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -147,12 +147,18 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { if (it == _connectionsHash.end()) { auto connection = std::unique_ptr(new Connection(this, sockAddr, _ccFactory->create())); + QObject::connect(connection.get(), &Connection::connectionInnactive, this, &Socket::cleanupConnection); it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection))); } return *it->second; } +void Socket::cleanupConnection(HifiSockAddr sockAddr) { + qDebug() << "Cleaned up" << sockAddr; + _connectionsHash.erase(sockAddr); +} + void Socket::messageReceived(std::unique_ptr packetList) { if (_packetListHandler) { _packetListHandler(std::move(packetList)); @@ -205,8 +211,8 @@ void Socket::readPendingDatagrams() { // if this was a reliable packet then signal the matching connection with the sequence number auto& connection = findOrCreateConnection(senderSockAddr); connection.processReceivedSequenceNumber(packet->getSequenceNumber(), - packet->getDataSize(), - packet->getPayloadSize()); + packet->getDataSize(), + packet->getPayloadSize()); } if (packet->isPartOfMessage()) { diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 0c4276a767..266c0be6a5 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -76,6 +76,7 @@ public: private slots: void readPendingDatagrams(); void rateControlSync(); + void cleanupConnection(HifiSockAddr sockAddr); private: void setSystemBufferSizes(); From c96632d4b6a20f4c6deacf8bba806bcf8ddfb00b Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 16:07:47 +0200 Subject: [PATCH 3/9] Cleanup connection on node kills --- libraries/networking/src/LimitedNodeList.cpp | 4 ++++ libraries/networking/src/udt/Socket.h | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index e2bae6b5e8..b7a0cee44e 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -428,6 +428,10 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) { qCDebug(networking) << "Killed" << *node; node->stopPingTimer(); emit nodeKilled(node); + + if (auto activeSocket = node->getActiveSocket()) { + _nodeSocket.cleanupConnection(*activeSocket); + } } SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 266c0be6a5..0014a97e2b 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -73,10 +73,12 @@ public: ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination); std::vector getConnectionSockAddrs(); +public slots: + void cleanupConnection(HifiSockAddr sockAddr); + private slots: void readPendingDatagrams(); void rateControlSync(); - void cleanupConnection(HifiSockAddr sockAddr); private: void setSystemBufferSizes(); From 8049819beb14f3933278001157d5a7337af4efc2 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 17:11:32 +0200 Subject: [PATCH 4/9] Tweak DoubleLock --- libraries/networking/src/udt/SendQueue.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 1d3d0e4c51..936aa1a3f1 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -54,9 +54,9 @@ public: bool locked() { return _locked; } - bool try_lock() { _locked = (std::try_lock(_mutex1, _mutex2) == -1); return _locked; } + bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); } void lock() { std::lock(_mutex1, _mutex2); _locked = true; } - void unlock() { if (locked()) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } + void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } private: std::atomic _locked { false }; From 3184dee10ad1ab87b9f08fcc4a9cc80498cb0b7a Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 17:49:17 +0200 Subject: [PATCH 5/9] Emit queueInnactive when flow window is full for too long --- libraries/networking/src/udt/SendQueue.cpp | 77 +++++++++++++++------- libraries/networking/src/udt/SendQueue.h | 27 ++------ 2 files changed, 61 insertions(+), 43 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 65e5f9f7ba..1ee8cce48a 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -25,7 +25,27 @@ #include "Socket.h" using namespace udt; -using namespace std::chrono; + +// This class is not thread-safe +class DoubleLock { +public: + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }; + ~DoubleLock() { unlock(); } + + DoubleLock(const DoubleLock&) = delete; + DoubleLock& operator=(const DoubleLock&) = delete; + + bool locked() { return _locked; } + + bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); } + void lock() { std::lock(_mutex1, _mutex2); _locked = true; } + void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } + +private: + std::atomic _locked { false }; + std::mutex& _mutex1; + std::mutex& _mutex2; +}; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination) { auto queue = std::unique_ptr(new SendQueue(socket, destination)); @@ -206,7 +226,7 @@ void SendQueue::run() { while (_isRunning) { // Record how long the loop takes to execute - auto loopStartTimestamp = high_resolution_clock::now(); + auto loopStartTimestamp = clock::now(); bool sentAPacket = maybeResendPacket(); bool flowWindowFull = false; @@ -219,6 +239,12 @@ void SendQueue::run() { sentAPacket = maybeSendNewPacket(); } + // Keep track of how long the flow window has been full for + if (flowWindowFull && !_flowWindowWasFull) { + _flowWindowFullSince = loopStartTimestamp; + } + _flowWindowWasFull = flowWindowFull; + // since we're a while loop, give the thread a chance to process events QCoreApplication::processEvents(); @@ -227,35 +253,42 @@ void SendQueue::run() { break; } - if (!sentAPacket && !flowWindowFull) { - // During our processing above we didn't send any packets and the flow window is not full. + if (!sentAPacket) { + static const std::chrono::seconds CONSIDER_INNACTIVE_AFTER { 5 }; - // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. - // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock - DoubleLock doubleLock(_packetsLock, _naksLock); - - // The packets queue and loss list mutexes are now both locked - check if they're still both empty - if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { - // both are empty - let's use a condition_variable_any to wait - static const seconds CONSIDER_INNACTIVE_AFTER { 5 }; - auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER); + if (flowWindowFull && (clock::now() - _flowWindowFullSince) > CONSIDER_INNACTIVE_AFTER) { + // If the flow window has been full for over CONSIDER_INNACTIVE_AFTER, + // then signal the queue is innactive + emit queueInnactive(); + } else { + // During our processing above we didn't send any packets and the flow window is not full. + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packetsLock, _naksLock); - // Check if we've been innactive for too long - if (cvStatus == std::cv_status::timeout) { - emit queueInnactive(); + // The packets queue and loss list mutexes are now both locked - check if they're still both empty + if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { + // both are empty - let's use a condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER); + + + // Check if we've been innactive for too long + if (cvStatus == std::cv_status::timeout) { + emit queueInnactive(); + } + + // we have the double lock again - it'll be unlocked once it goes out of scope + // skip to the next iteration + continue; } - - // we have the double lock again - it'll be unlocked once it goes out of scope - // skip to the next iteration - continue; } } - auto loopEndTimestamp = high_resolution_clock::now(); + auto loopEndTimestamp = clock::now(); // sleep as long as we need until next packet send, if we can - auto timeToSleep = (loopStartTimestamp + microseconds(_packetSendPeriod)) - loopEndTimestamp; + auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; if (timeToSleep > timeToSleep.zero()) { std::this_thread::sleep_for(timeToSleep); } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 936aa1a3f1..cf20e03f20 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -42,27 +42,8 @@ class SendQueue : public QObject { Q_OBJECT public: - - // This class is not thread-safe - class DoubleLock { - public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }; - ~DoubleLock() { unlock(); } - - DoubleLock(const DoubleLock&) = delete; - DoubleLock& operator=(const DoubleLock&) = delete; - - bool locked() { return _locked; } - - bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); } - void lock() { std::lock(_mutex1, _mutex2); _locked = true; } - void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } - - private: - std::atomic _locked { false }; - std::mutex& _mutex1; - std::mutex& _mutex2; - }; + using clock = std::chrono::high_resolution_clock; + using time_point = clock::time_point; static std::unique_ptr create(Socket* socket, HifiSockAddr destination); @@ -124,6 +105,10 @@ private: std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC + // Used to detect when the connection becomes innactive for too long + bool _flowWindowWasFull = false; + time_point _flowWindowFullSince; + mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend From 7a5ed244850a389ea24f6128951d970b65d0d1b8 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 19:31:33 +0200 Subject: [PATCH 6/9] typo (I can't spell inactive correctly) --- libraries/networking/src/udt/Connection.cpp | 6 +++--- libraries/networking/src/udt/Connection.h | 4 ++-- libraries/networking/src/udt/SendQueue.cpp | 16 ++++++++-------- libraries/networking/src/udt/SendQueue.h | 4 ++-- libraries/networking/src/udt/Socket.cpp | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index c640392263..c44c0f4221 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -72,7 +72,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); - QObject::connect(_sendQueue.get(), &SendQueue::queueInnactive, this, &Connection::queueInnactive); + QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); // set defaults on the send queue from our congestion control object _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -82,8 +82,8 @@ SendQueue& Connection::getSendQueue() { return *_sendQueue; } -void Connection::queueInnactive() { - emit connectionInnactive(_destination); +void Connection::queueInactive() { + emit connectionInactive(_destination); } void Connection::sendReliablePacket(std::unique_ptr packet) { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index fd7cd5ef88..ef713ea0c5 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -73,12 +73,12 @@ public: signals: void packetSent(); - void connectionInnactive(HifiSockAddr sockAdrr); + void connectionInactive(HifiSockAddr sockAdrr); private slots: void recordSentPackets(int payload, int total); void recordRetransmission(); - void queueInnactive(); + void queueInactive(); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 1ee8cce48a..93212770e5 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -254,12 +254,12 @@ void SendQueue::run() { } if (!sentAPacket) { - static const std::chrono::seconds CONSIDER_INNACTIVE_AFTER { 5 }; + static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; - if (flowWindowFull && (clock::now() - _flowWindowFullSince) > CONSIDER_INNACTIVE_AFTER) { - // If the flow window has been full for over CONSIDER_INNACTIVE_AFTER, - // then signal the queue is innactive - emit queueInnactive(); + if (flowWindowFull && (clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive + emit queueInactive(); } else { // During our processing above we didn't send any packets and the flow window is not full. @@ -270,12 +270,12 @@ void SendQueue::run() { // The packets queue and loss list mutexes are now both locked - check if they're still both empty if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { // both are empty - let's use a condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INNACTIVE_AFTER); + auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER); - // Check if we've been innactive for too long + // Check if we've been inactive for too long if (cvStatus == std::cv_status::timeout) { - emit queueInnactive(); + emit queueInactive(); } // we have the double lock again - it'll be unlocked once it goes out of scope diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index cf20e03f20..8e8f3e4c89 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -68,7 +68,7 @@ signals: void packetSent(int dataSize, int payloadSize); void packetRetransmitted(); - void queueInnactive(); + void queueInactive(); private slots: void run(); @@ -105,7 +105,7 @@ private: std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC - // Used to detect when the connection becomes innactive for too long + // Used to detect when the connection becomes inactive for too long bool _flowWindowWasFull = false; time_point _flowWindowFullSince; diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index f423353836..7bdfc55398 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -147,7 +147,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { if (it == _connectionsHash.end()) { auto connection = std::unique_ptr(new Connection(this, sockAddr, _ccFactory->create())); - QObject::connect(connection.get(), &Connection::connectionInnactive, this, &Socket::cleanupConnection); + QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection); it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection))); } From cd8d6df2877d1b8e60a931e81830ac7be9589cd5 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 19:36:18 +0200 Subject: [PATCH 7/9] Use high_resolution_clock instead of clock --- libraries/networking/src/udt/SendQueue.cpp | 6 +++--- libraries/networking/src/udt/SendQueue.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 93212770e5..691e65eea1 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -226,7 +226,7 @@ void SendQueue::run() { while (_isRunning) { // Record how long the loop takes to execute - auto loopStartTimestamp = clock::now(); + auto loopStartTimestamp = high_resolution_clock::now(); bool sentAPacket = maybeResendPacket(); bool flowWindowFull = false; @@ -256,7 +256,7 @@ void SendQueue::run() { if (!sentAPacket) { static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; - if (flowWindowFull && (clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { + if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, // then signal the queue is inactive emit queueInactive(); @@ -285,7 +285,7 @@ void SendQueue::run() { } } - auto loopEndTimestamp = clock::now(); + auto loopEndTimestamp = high_resolution_clock::now(); // sleep as long as we need until next packet send, if we can auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 8e8f3e4c89..ebc6702ac5 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -42,8 +42,8 @@ class SendQueue : public QObject { Q_OBJECT public: - using clock = std::chrono::high_resolution_clock; - using time_point = clock::time_point; + using high_resolution_clock = std::chrono::high_resolution_clock; + using time_point = high_resolution_clock::time_point; static std::unique_ptr create(Socket* socket, HifiSockAddr destination); From b1fa12cc431eea38df35c80dc01a9dea4ed118c0 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 19:38:24 +0200 Subject: [PATCH 8/9] Better debug message --- libraries/networking/src/udt/Socket.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 7bdfc55398..d5e57df60b 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -155,7 +155,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { } void Socket::cleanupConnection(HifiSockAddr sockAddr) { - qDebug() << "Cleaned up" << sockAddr; + qCDebug(networking) << "Socket::cleanupConnection called for connection to" << sockAddr; _connectionsHash.erase(sockAddr); } From 5b5dc94335f67355d8c9ebe23481eb465a8f2bb6 Mon Sep 17 00:00:00 2001 From: Atlante45 Date: Wed, 26 Aug 2015 20:03:04 +0200 Subject: [PATCH 9/9] Make DoubleLock thread-safe but not self-unlocked --- libraries/networking/src/udt/SendQueue.cpp | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 691e65eea1..9ba7b76a9c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -26,23 +26,23 @@ using namespace udt; -// This class is not thread-safe class DoubleLock { public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { }; - ~DoubleLock() { unlock(); } + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } DoubleLock(const DoubleLock&) = delete; DoubleLock& operator=(const DoubleLock&) = delete; - bool locked() { return _locked; } + // Either locks all the mutexes or none of them + bool try_lock() { return (std::try_lock(_mutex1, _mutex2) == -1); } - bool try_lock() { return _locked = (std::try_lock(_mutex1, _mutex2) == -1); } - void lock() { std::lock(_mutex1, _mutex2); _locked = true; } - void unlock() { if (_locked) { _mutex1.unlock(); _mutex2.unlock(); _locked = false; } } + // Locks all the mutexes + void lock() { std::lock(_mutex1, _mutex2); } + + // Undefined behavior if not locked + void unlock() { _mutex1.unlock(); _mutex2.unlock(); } private: - std::atomic _locked { false }; std::mutex& _mutex1; std::mutex& _mutex2; }; @@ -271,14 +271,15 @@ void SendQueue::run() { if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { // both are empty - let's use a condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER); - + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); // Check if we've been inactive for too long if (cvStatus == std::cv_status::timeout) { emit queueInactive(); } - // we have the double lock again - it'll be unlocked once it goes out of scope // skip to the next iteration continue; }