diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 1bc5f10069..c342f660a3 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -442,6 +442,10 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) { qCDebug(networking) << "Killed" << *node; node->stopPingTimer(); emit nodeKilled(node); + + if (auto activeSocket = node->getActiveSocket()) { + _nodeSocket.cleanupConnection(*activeSocket); + } } SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, diff --git a/libraries/networking/src/PacketReceiver.cpp b/libraries/networking/src/PacketReceiver.cpp index 002ecc2c6f..c55461e8cf 100644 --- a/libraries/networking/src/PacketReceiver.cpp +++ b/libraries/networking/src/PacketReceiver.cpp @@ -208,12 +208,13 @@ void PacketReceiver::registerVerifiedListener(PacketType type, QObject* object, void PacketReceiver::unregisterListener(QObject* listener) { Q_ASSERT_X(listener, "PacketReceiver::unregisterListener", "No listener to unregister"); - QMutexLocker packetListenerLocker(&_packetListenerLock); - std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap), - [&listener](const ObjectMethodPair& pair) { - return pair.first == listener; - }); - packetListenerLocker.unlock(); + { + QMutexLocker packetListenerLocker(&_packetListenerLock); + std::remove_if(std::begin(_packetListenerMap), std::end(_packetListenerMap), + [&listener](const ObjectMethodPair& pair) { + return pair.first == listener; + }); + } QMutexLocker directConnectSetLocker(&_directConnectSetMutex); _directlyConnectedObjects.remove(listener); @@ -456,7 +457,6 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr packet) { // if it exists, remove the listener from _directlyConnectedObjects QMutexLocker locker(&_directConnectSetMutex); _directlyConnectedObjects.remove(listener.first); - locker.unlock(); } } else if (it == _packetListenerMap.end()) { @@ -465,7 +465,4 @@ void PacketReceiver::handleVerifiedPacket(std::unique_ptr packet) { // insert a dummy listener so we don't print this again _packetListenerMap.insert(nlPacket->getType(), { nullptr, QMetaMethod() }); } - - packetListenerLocker.unlock(); - } diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index f1b69e0039..1fa4111f85 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -76,6 +76,7 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); + QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); // set defaults on the send queue from our congestion control object _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); @@ -85,6 +86,10 @@ SendQueue& Connection::getSendQueue() { return *_sendQueue; } +void Connection::queueInactive() { + emit connectionInactive(_destination); +} + void Connection::sendReliablePacket(std::unique_ptr packet) { Q_ASSERT_X(packet->isReliable(), "Connection::send", "Trying to send an unreliable packet reliably."); getSendQueue().queuePacket(std::move(packet)); diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 0babfb76e1..20306b5515 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -73,10 +73,12 @@ public: signals: void packetSent(); + void connectionInactive(HifiSockAddr sockAdrr); private slots: void recordSentPackets(int payload, int total); void recordRetransmission(); + void queueInactive(); private: void sendACK(bool wasCausedBySyncTimeout = true); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 023e57bb21..da702c7166 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -12,6 +12,7 @@ #include "SendQueue.h" #include +#include #include #include @@ -24,7 +25,27 @@ #include "Socket.h" using namespace udt; -using namespace std::chrono; + +class DoubleLock { +public: + DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } + + DoubleLock(const DoubleLock&) = delete; + DoubleLock& operator=(const DoubleLock&) = delete; + + // Either locks all the mutexes or none of them + bool try_lock() { return (std::try_lock(_mutex1, _mutex2) == -1); } + + // Locks all the mutexes + void lock() { std::lock(_mutex1, _mutex2); } + + // Undefined behavior if not locked + void unlock() { _mutex1.unlock(); _mutex2.unlock(); } + +private: + std::mutex& _mutex1; + std::mutex& _mutex2; +}; std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destination) { auto queue = std::unique_ptr(new SendQueue(socket, destination)); @@ -200,8 +221,8 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, // Insert the packet we have just sent in the sent list QWriteLocker locker(&_sentLock); _sentPackets[newPacket->getSequenceNumber()].swap(newPacket); - Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); } + Q_ASSERT_X(!newPacket, "SendQueue::sendNewPacketAndAddToSentList()", "Overriden packet in sent list"); emit packetSent(packetSize, payloadSize); } @@ -210,8 +231,8 @@ void SendQueue::run() { _isRunning = true; while (_isRunning) { - // Record timing - _lastSendTimestamp = high_resolution_clock::now(); + // Record how long the loop takes to execute + auto loopStartTimestamp = high_resolution_clock::now(); std::unique_lock handshakeLock { _handshakeMutex }; @@ -222,12 +243,13 @@ void SendQueue::run() { // hold the time of last send in a static static auto lastSendHandshake = high_resolution_clock::time_point(); - static const int HANDSHAKE_RESEND_INTERVAL_MS = 100; + static const auto HANDSHAKE_RESEND_INTERVAL_MS = std::chrono::milliseconds(100); // calculation the duration since the last handshake send - auto sinceLastHandshake = duration_cast(high_resolution_clock::now() - lastSendHandshake); + auto sinceLastHandshake = std::chrono::duration_cast(high_resolution_clock::now() + - lastSendHandshake); - if (sinceLastHandshake.count() >= HANDSHAKE_RESEND_INTERVAL_MS) { + if (sinceLastHandshake >= HANDSHAKE_RESEND_INTERVAL_MS) { // it has been long enough since last handshake, send another static auto handshakePacket = ControlPacket::create(ControlPacket::Handshake, 0); @@ -238,7 +260,8 @@ void SendQueue::run() { // we wait for the ACK or the re-send interval to expire _handshakeACKCondition.wait_until(handshakeLock, - high_resolution_clock::now() + milliseconds(HANDSHAKE_RESEND_INTERVAL_MS)); + high_resolution_clock::now() + + HANDSHAKE_RESEND_INTERVAL_MS); // Once we're here we've either received the handshake ACK or it's going to be time to re-send a handshake. // Either way let's continue processing - no packets will be sent if no handshake ACK has been received. @@ -246,141 +269,151 @@ void SendQueue::run() { handshakeLock.unlock(); - bool naksEmpty = true; // used at the end of processing to see if we should wait for NAKs - bool resentPacket = false; + bool sentAPacket = maybeResendPacket(); + bool flowWindowFull = false; - // the following while makes sure that we find a packet to re-send, if there is one - while (!resentPacket) { - std::unique_lock nakLocker(_naksLock); - - if (_naks.getLength() > 0) { - naksEmpty = _naks.getLength() > 1; - - // pull the sequence number we need to re-send - SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); - nakLocker.unlock(); - - // pull the packet to re-send from the sent packets list - QReadLocker sentLocker(&_sentLock); - - // see if we can find the packet to re-send - auto it = _sentPackets.find(resendNumber); - - if (it != _sentPackets.end()) { - // we found the packet - grab it - auto& resendPacket = *(it->second); - - // unlock the sent packets - sentLocker.unlock(); - - // send it off - sendPacket(resendPacket); - emit packetRetransmitted(); - - // mark that we did resend a packet - resentPacket = true; - - // break out of our while now that we have re-sent a packet - break; - } else { - // we didn't find this packet in the sentPackets queue - assume this means it was ACKed - // we'll fire the loop again to see if there is another to re-send - continue; - } - } else { - naksEmpty = true; - } - - // break from the while, we didn't resend a packet - break; - } - - bool packetsEmpty = false; // used after processing to check if we should wait for packets - bool sentPacket = false; - // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet - if (_hasReceivedHandshakeACK - && !resentPacket - && seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { + if (_hasReceivedHandshakeACK && !sentAPacket) { + flowWindowFull = (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > + _flowWindowSize); + sentAPacket = maybeSendNewPacket(); + } + + // Keep track of how long the flow window has been full for + if (flowWindowFull && !_flowWindowWasFull) { + _flowWindowFullSince = loopStartTimestamp; + } + _flowWindowWasFull = flowWindowFull; + + + if (_hasReceivedHandshakeACK && !sentAPacket) { + static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; - // we didn't re-send a packet, so time to send a new one - std::unique_lock locker(_packetsLock); - - if (_packets.size() > 0) { - - SequenceNumber nextNumber = getNextSequenceNumber(); - - // grab the first packet we will send - std::unique_ptr firstPacket; - firstPacket.swap(_packets.front()); - _packets.pop_front(); - - std::unique_ptr secondPacket; - - if (((uint32_t) nextNumber & 0xF) == 0) { - // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets - // pull off a second packet if we can before we unlock - if (_packets.size() > 0) { - secondPacket.swap(_packets.front()); - _packets.pop_front(); - } - } - - packetsEmpty = _packets.size() == 0; - - // unlock the packets, we're done pulling - locker.unlock(); - - sentPacket = true; - - // definitely send the first packet - sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); - - // do we have a second in a pair to send as well? - if (secondPacket) { - nextNumber = getNextSequenceNumber(); - sendNewPacketAndAddToSentList(move(secondPacket), nextNumber); - } - + if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive + emit queueInactive(); } else { - packetsEmpty = true; - locker.unlock(); + // During our processing above we didn't send any packets and the flow window is not full. + + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packetsLock, _naksLock); + + // The packets queue and loss list mutexes are now both locked - check if they're still both empty + if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { + // both are empty - let's use a condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER); + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + // Check if we've been inactive for too long + if (cvStatus == std::cv_status::timeout) { + emit queueInactive(); + } + + // skip to the next iteration + continue; + } } } - // since we're a while loop, give the thread a chance to process events - QCoreApplication::processEvents(); + auto loopEndTimestamp = high_resolution_clock::now(); - // we just processed events so check now if we were just told to stop - if (!_isRunning) { - break; + // sleep as long as we need until next packet send, if we can + auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; + if (timeToSleep > timeToSleep.zero()) { + std::this_thread::sleep_for(timeToSleep); + } + } +} + +bool SendQueue::maybeSendNewPacket() { + // we didn't re-send a packet, so time to send a new one + std::unique_lock locker(_packetsLock); + + if (_packets.size() > 0) { + SequenceNumber nextNumber = getNextSequenceNumber(); + + // grab the first packet we will send + std::unique_ptr firstPacket; + firstPacket.swap(_packets.front()); + _packets.pop_front(); + + std::unique_ptr secondPacket; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + + if (_packets.size() > 0) { + secondPacket.swap(_packets.front()); + _packets.pop_front(); + } } - if (packetsEmpty && naksEmpty) { - // During our processing above the loss list and packet list were both empty. + // unlock the packets, we're done pulling + locker.unlock(); + + // definitely send the first packet + sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); + + // do we have a second in a pair to send as well? + if (secondPacket) { + sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); + } + + // We sent our packet(s), return here + return true; + } + + // No packets were sent + return false; +} + +bool SendQueue::maybeResendPacket() { + // the following while makes sure that we find a packet to re-send, if there is one + while (true) { + std::unique_lock naksLocker(_naksLock); + + if (_naks.getLength() > 0) { + // pull the sequence number we need to re-send + SequenceNumber resendNumber = _naks.popFirstSequenceNumber(); + naksLocker.unlock(); - // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. - // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock - DoubleLock doubleLock(_packetsLock, _naksLock); + // pull the packet to re-send from the sent packets list + QReadLocker sentLocker(&_sentLock); - // The packets queue and loss list mutexes are now both locked - check if they're still both empty - if (_packets.empty() && _naks.getLength() == 0) { - // both are empty - let's use a condition_variable_any to wait - _emptyCondition.wait(doubleLock); + // see if we can find the packet to re-send + auto it = _sentPackets.find(resendNumber); + + if (it != _sentPackets.end()) { + // we found the packet - grab it + auto& resendPacket = *(it->second); - // we have the double lock again - it'll be unlocked once it goes out of scope - // skip to the next iteration + // unlock the sent packets + sentLocker.unlock(); + + // send it off + sendPacket(resendPacket); + emit packetRetransmitted(); + + // Signal that we did resend a packet + return true; + } else { + // we didn't find this packet in the sentPackets queue - assume this means it was ACKed + // we'll fire the loop again to see if there is another to re-send continue; } } - // sleep as long as we need until next packet send, if we can - auto now = high_resolution_clock::now(); - auto microsecondDuration = duration_cast((_lastSendTimestamp + microseconds(_packetSendPeriod)) - now); - - if (microsecondDuration.count() > 0) { - usleep(microsecondDuration.count()); - } + // break from the while, we didn't resend a packet + break; } + + // No packet was resent + return false; } + diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 2a3b2bc4fd..89dc2fae8f 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -42,21 +42,8 @@ class SendQueue : public QObject { Q_OBJECT public: - - class DoubleLock { - public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { lock(); } - ~DoubleLock() { unlock(); } - - DoubleLock(const DoubleLock&) = delete; - DoubleLock& operator=(const DoubleLock&) = delete; - - void lock() { std::lock(_mutex1, _mutex2); } - void unlock() { _mutex1.unlock(); _mutex2.unlock(); } - private: - std::mutex& _mutex1; - std::mutex& _mutex2; - }; + using high_resolution_clock = std::chrono::high_resolution_clock; + using time_point = high_resolution_clock::time_point; static std::unique_ptr create(Socket* socket, HifiSockAddr destination); @@ -82,6 +69,8 @@ signals: void packetSent(int dataSize, int payloadSize); void packetRetransmitted(); + void queueInactive(); + private slots: void run(); @@ -93,6 +82,9 @@ private: void sendPacket(const Packet& packet); void sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber); + bool maybeSendNewPacket(); // Figures out what packet to send next + bool maybeResendPacket(); // Determines whether to resend a packet and wich one + // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); MessageNumber getNextMessageNumber(); @@ -110,11 +102,14 @@ private: std::atomic _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC - std::chrono::high_resolution_clock::time_point _lastSendTimestamp; // Record last time of packet departure std::atomic _isRunning { false }; std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC + // Used to detect when the connection becomes inactive for too long + bool _flowWindowWasFull = false; + time_point _flowWindowFullSince; + mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 0aac7f8f99..d5e57df60b 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -147,12 +147,18 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { if (it == _connectionsHash.end()) { auto connection = std::unique_ptr(new Connection(this, sockAddr, _ccFactory->create())); + QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection); it = _connectionsHash.insert(it, std::make_pair(sockAddr, std::move(connection))); } return *it->second; } +void Socket::cleanupConnection(HifiSockAddr sockAddr) { + qCDebug(networking) << "Socket::cleanupConnection called for connection to" << sockAddr; + _connectionsHash.erase(sockAddr); +} + void Socket::messageReceived(std::unique_ptr packetList) { if (_packetListHandler) { _packetListHandler(std::move(packetList)); @@ -205,8 +211,8 @@ void Socket::readPendingDatagrams() { // if this was a reliable packet then signal the matching connection with the sequence number auto& connection = findOrCreateConnection(senderSockAddr); connection.processReceivedSequenceNumber(packet->getSequenceNumber(), - packet->getDataSize(), - packet->getPayloadSize()); + packet->getDataSize(), + packet->getPayloadSize()); } if (packet->isPartOfMessage()) { diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 0c4276a767..0014a97e2b 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -73,6 +73,9 @@ public: ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination); std::vector getConnectionSockAddrs(); +public slots: + void cleanupConnection(HifiSockAddr sockAddr); + private slots: void readPendingDatagrams(); void rateControlSync();