diff --git a/libraries/networking/src/udt/PacketList.h b/libraries/networking/src/udt/PacketList.h index 7978e77ad7..c873e53711 100644 --- a/libraries/networking/src/udt/PacketList.h +++ b/libraries/networking/src/udt/PacketList.h @@ -70,6 +70,7 @@ protected: private: friend class ::LimitedNodeList; + friend class PacketQueue; friend class SendQueue; friend class Socket; diff --git a/libraries/networking/src/udt/PacketQueue.cpp b/libraries/networking/src/udt/PacketQueue.cpp new file mode 100644 index 0000000000..f5c1b4a9a0 --- /dev/null +++ b/libraries/networking/src/udt/PacketQueue.cpp @@ -0,0 +1,71 @@ +// +// PacketQueue.cpp +// libraries/networking/src/udt +// +// Created by Clement on 9/16/15. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include "PacketQueue.h" + +#include "Packet.h" +#include "PacketList.h" + +using namespace udt; + + +MessageNumber PacketQueue::getNextMessageNumber() { + static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; + _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; + return _currentMessageNumber; +} + +bool PacketQueue::isEmpty() const { + LockGuard locker(_packetsLock); + return _packets.empty(); +} + +PacketQueue::PacketPointer PacketQueue::takeFront() { + LockGuard locker(_packetsLock); + if (!_packets.empty()) { + auto packet = std::move(_packets.front()); + _packets.pop_front(); + return std::move(packet); + } + + return PacketPointer(); +} + +void PacketQueue::queuePacket(PacketPointer packet) { + LockGuard locker(_packetsLock); + _packets.push_back(std::move(packet)); +} + +void PacketQueue::queuePacketList(PacketListPointer packetList) { + Q_ASSERT(packetList->_packets.size() > 0); + + auto messageNumber = getNextMessageNumber(); + auto markPacket = [&messageNumber](const PacketPointer& packet, Packet::PacketPosition position) { + packet->setPacketPosition(position); + packet->writeMessageNumber(messageNumber); + }; + + if (packetList->_packets.size() == 1) { + markPacket(packetList->_packets.front(), Packet::PacketPosition::ONLY); + } else { + const auto second = ++packetList->_packets.begin(); + const auto last = --packetList->_packets.end(); + std::for_each(second, last, [&](const PacketPointer& packet) { + markPacket(packet, Packet::PacketPosition::MIDDLE); + }); + + markPacket(packetList->_packets.front(), Packet::PacketPosition::FIRST); + markPacket(packetList->_packets.back(), Packet::PacketPosition::LAST); + } + + LockGuard locker(_packetsLock); + _packets.splice(_packets.end(), packetList->_packets); +} \ No newline at end of file diff --git a/libraries/networking/src/udt/PacketQueue.h b/libraries/networking/src/udt/PacketQueue.h new file mode 100644 index 0000000000..b305e9b560 --- /dev/null +++ b/libraries/networking/src/udt/PacketQueue.h @@ -0,0 +1,53 @@ +// +// PacketQueue.h +// libraries/networking/src/udt +// +// Created by Clement on 9/16/15. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_PacketQueue_h +#define hifi_PacketQueue_h + +#include +#include +#include + +namespace udt { + +class Packet; +class PacketList; + +using MessageNumber = uint32_t; + +class PacketQueue { + using Mutex = std::recursive_mutex; + using LockGuard = std::lock_guard; + using PacketPointer = std::unique_ptr; + using PacketListPointer = std::unique_ptr; + using PacketList = std::list; + +public: + void queuePacket(PacketPointer packet); + void queuePacketList(PacketListPointer packetList); + + bool isEmpty() const; + PacketPointer takeFront(); + + MessageNumber getNextMessageNumber(); + Mutex& getLock() { return _packetsLock; } + +private: + MessageNumber _currentMessageNumber { 0 }; + + mutable Mutex _packetsLock; // Protects the packets to be sent list. + PacketList _packets; // List of packets to be sent +}; + +} + + +#endif // hifi_PacketQueue_h \ No newline at end of file diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index de056b6850..44fa51bca8 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -30,7 +30,7 @@ using namespace udt; class DoubleLock { public: - DoubleLock(std::mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } + DoubleLock(std::recursive_mutex& mutex1, std::mutex& mutex2) : _mutex1(mutex1), _mutex2(mutex2) { } DoubleLock(const DoubleLock&) = delete; DoubleLock& operator=(const DoubleLock&) = delete; @@ -45,7 +45,7 @@ public: void unlock() { _mutex1.unlock(); _mutex2.unlock(); } private: - std::mutex& _mutex1; + std::recursive_mutex& _mutex1; std::mutex& _mutex2; }; @@ -76,17 +76,10 @@ SendQueue::SendQueue(Socket* socket, HifiSockAddr dest) : } void SendQueue::queuePacket(std::unique_ptr packet) { - { - std::unique_lock locker(_packetsLock); - - _packets.push_back(std::move(packet)); - - // unlock the mutex before we notify - locker.unlock(); - - // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets - _emptyCondition.notify_one(); - } + _packets.queuePacket(std::move(packet)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); if (!this->thread()->isRunning() && _state == State::NotStarted) { this->thread()->start(); @@ -94,46 +87,10 @@ void SendQueue::queuePacket(std::unique_ptr packet) { } void SendQueue::queuePacketList(std::unique_ptr packetList) { - Q_ASSERT(packetList->_packets.size() > 0); - - { - auto messageNumber = getNextMessageNumber(); - - if (packetList->_packets.size() == 1) { - auto& packet = packetList->_packets.front(); - - packet->setPacketPosition(Packet::PacketPosition::ONLY); - packet->writeMessageNumber(messageNumber); - } else { - bool haveMarkedFirstPacket = false; - auto end = packetList->_packets.end(); - auto lastElement = --packetList->_packets.end(); - for (auto it = packetList->_packets.begin(); it != end; ++it) { - auto& packet = *it; - - if (!haveMarkedFirstPacket) { - packet->setPacketPosition(Packet::PacketPosition::FIRST); - haveMarkedFirstPacket = true; - } else if (it == lastElement) { - packet->setPacketPosition(Packet::PacketPosition::LAST); - } else { - packet->setPacketPosition(Packet::PacketPosition::MIDDLE); - } - - packet->writeMessageNumber(messageNumber); - } - } - - std::unique_lock locker(_packetsLock); - - _packets.splice(_packets.end(), packetList->_packets); - - // unlock the mutex so we can notify - locker.unlock(); - - // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets - _emptyCondition.notify_one(); - } + _packets.queuePacketList(std::move(packetList)); + + // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets + _emptyCondition.notify_one(); if (!this->thread()->isRunning() && _state == State::NotStarted) { this->thread()->start(); @@ -144,10 +101,8 @@ void SendQueue::stop() { _state = State::Stopped; - // in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner + // Notify all conditions in case we're waiting somewhere _handshakeACKCondition.notify_one(); - - // in case the empty condition is waiting for packets/loss release it now so that the queue is cleaned up _emptyCondition.notify_one(); } @@ -254,12 +209,6 @@ SequenceNumber SendQueue::getNextSequenceNumber() { return _currentSequenceNumber; } -uint32_t SendQueue::getNextMessageNumber() { - static const MessageNumber MAX_MESSAGE_NUMBER = MessageNumber(1) << MESSAGE_NUMBER_BITS; - _currentMessageNumber = (_currentMessageNumber + 1) % MAX_MESSAGE_NUMBER; - return _currentMessageNumber; -} - void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, SequenceNumber sequenceNumber) { // write the sequence number and send the packet newPacket->writeSequenceNumber(sequenceNumber); @@ -313,167 +262,67 @@ void SendQueue::run() { // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (!sentAPacket) { - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { - sentAPacket = maybeSendNewPacket(); - } + sentAPacket = maybeSendNewPacket(); } // since we're a while loop, give the thread a chance to process events - QCoreApplication::sendPostedEvents(this, 0); + QCoreApplication::sendPostedEvents(this); // we just processed events so check now if we were just told to stop - if (_state != State::Running) { - return; + // If the send queue has been innactive, skip the sleep for + // Either _isRunning will have been set to false and we'll break + // Or something happened and we'll keep going + if (_state != State::Running || isInactive(sentAPacket)) { + continue; } - if (!sentAPacket) { - // check if it is time to break this connection - - // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been - // at least 5 seconds - - static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; - static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; - - auto sinceEpochNow = QDateTime::currentMSecsSinceEpoch(); - - if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE - && (sinceEpochNow - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { - // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, - // then signal the queue is inactive and return so it can be cleaned up - -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" - << "and 10s before receiving any ACK/NAK and is now inactive. Stopping."; -#endif - - deactivate(); - - return; - } else { - // During our processing above we didn't send any packets - - // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. - // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock - DoubleLock doubleLock(_packetsLock, _naksLock); - - if (doubleLock.try_lock()) { - // The packets queue and loss list mutexes are now both locked - check if they're still both empty - - if (_packets.empty() && _naks.getLength() == 0) { - if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { - // we've sent the client as much data as we have (and they've ACKed it) - // either wait for new data to send or 5 seconds before cleaning up the queue - static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - if (cvStatus == std::cv_status::timeout) { -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" - << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() - << "seconds and receiver has ACKed all packets." - << "The queue is now inactive and will be stopped."; -#endif - - deactivate(); - - return; - } - } else { - // We think the client is still waiting for data (based on the sequence number gap) - // Let's wait either for a response from the client or until the estimated timeout - auto waitDuration = std::chrono::microseconds(_estimatedTimeout); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); - - if (cvStatus == std::cv_status::timeout) { - // increase the number of timeouts - ++_timeoutExpiryCount; - - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { - // after a timeout if we still have sent packets that the client hasn't ACKed we - // add them to the loss list - - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); - } - } - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - // skip to the next iteration - continue; - } - } else { - // we got the try_lock but failed the other conditionals so we need to unlock - doubleLock.unlock(); - } - } - } - } - - const auto loopEndTimestamp = p_high_resolution_clock::now(); - // sleep as long as we need until next packet send, if we can + const auto loopEndTimestamp = p_high_resolution_clock::now(); const auto timeToSleep = (loopStartTimestamp + std::chrono::microseconds(_packetSendPeriod)) - loopEndTimestamp; std::this_thread::sleep_for(timeToSleep); } } bool SendQueue::maybeSendNewPacket() { - // we didn't re-send a packet, so time to send a new one - std::unique_lock locker(_packetsLock); - - if (_packets.size() > 0) { - SequenceNumber nextNumber = getNextSequenceNumber(); + if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { + // we didn't re-send a packet, so time to send a new one - // grab the first packet we will send - std::unique_ptr firstPacket; - firstPacket.swap(_packets.front()); - _packets.pop_front(); - std::unique_ptr secondPacket; - bool shouldSendPairTail = false; - - if (((uint32_t) nextNumber & 0xF) == 0) { - // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets - // pull off a second packet if we can before we unlock - shouldSendPairTail = true; + if (!_packets.isEmpty()) { + SequenceNumber nextNumber = getNextSequenceNumber(); - if (_packets.size() > 0) { - secondPacket.swap(_packets.front()); - _packets.pop_front(); + // grab the first packet we will send + std::unique_ptr firstPacket = _packets.takeFront(); + + std::unique_ptr secondPacket; + bool shouldSendPairTail = false; + + if (((uint32_t) nextNumber & 0xF) == 0) { + // the first packet is the first in a probe pair - every 16 (rightmost 16 bits = 0) packets + // pull off a second packet if we can before we unlock + shouldSendPairTail = true; + + secondPacket = _packets.takeFront(); } + + // definitely send the first packet + sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); + + // do we have a second in a pair to send as well? + if (secondPacket) { + sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); + } else if (shouldSendPairTail) { + // we didn't get a second packet to send in the probe pair + // send a control packet of type ProbePairTail so the receiver can still do + // proper bandwidth estimation + static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); + _socket->writeBasePacket(*pairTailPacket, _destination); + } + + // We sent our packet(s), return here + return true; } - - // unlock the packets, we're done pulling - locker.unlock(); - - // definitely send the first packet - sendNewPacketAndAddToSentList(move(firstPacket), nextNumber); - - // do we have a second in a pair to send as well? - if (secondPacket) { - sendNewPacketAndAddToSentList(move(secondPacket), getNextSequenceNumber()); - } else if (shouldSendPairTail) { - // we didn't get a second packet to send in the probe pair - // send a control packet of type ProbePairTail so the receiver can still do - // proper bandwidth estimation - static auto pairTailPacket = ControlPacket::create(ControlPacket::ProbeTail); - _socket->writeBasePacket(*pairTailPacket, _destination); - } - - // We sent our packet(s), return here - return true; } - // No packets were sent return false; } @@ -524,6 +373,92 @@ bool SendQueue::maybeResendPacket() { return false; } +bool SendQueue::isInactive(bool sentAPacket) { + if (!sentAPacket) { + // check if it is time to break this connection + + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 5 seconds + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + static const int MIN_SECONDS_BEFORE_INACTIVE_MS = 5 * 1000; + if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE && + (QDateTime::currentMSecsSinceEpoch() - _lastReceiverResponse) > MIN_SECONDS_BEFORE_INACTIVE_MS) { + // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, + // then signal the queue is inactive and return so it can be cleaned up + +#ifdef UDT_CONNECTION_DEBUG + qCDebug(networking) << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts" + << "and 5s before receiving any ACK/NAK and is now inactive. Stopping."; +#endif + + deactivate(); + + return true; + } + + // During our processing above we didn't send any packets + + // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. + // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock + DoubleLock doubleLock(_packets.getLock(), _naksLock); + std::unique_lock locker(doubleLock, std::try_to_lock); + + if (locker.owns_lock() && _packets.isEmpty() && _naks.isEmpty()) { + // The packets queue and loss list mutexes are now both locked and they're both empty + + if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { + // we've sent the client as much data as we have (and they've ACKed it) + // either wait for new data to send or 5 seconds before cleaning up the queue + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(locker, EMPTY_QUEUES_INACTIVE_TIMEOUT); + + // we have the lock again - Make sure to unlock it + locker.unlock(); + + if (cvStatus == std::cv_status::timeout) { +#ifdef UDT_CONNECTION_DEBUG + qCDebug(networking) << "SendQueue to" << _destination << "has been empty for" + << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() + << "seconds and receiver has ACKed all packets." + << "The queue is now inactive and will be stopped."; +#endif + + // Deactivate queue + deactivate(); + return true; + } + } else { + // We think the client is still waiting for data (based on the sequence number gap) + // Let's wait either for a response from the client or until the estimated timeout + auto waitDuration = std::chrono::microseconds(_estimatedTimeout); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(locker, waitDuration); + + if (cvStatus == std::cv_status::timeout) { + // increase the number of timeouts + ++_timeoutExpiryCount; + + if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list + + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } + } + + // skip to the next iteration + return true; + } + } + } + + return false; +} + void SendQueue::deactivate() { // this queue is inactive - emit that signal and stop the while emit queueInactive(); diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 39d18a544d..96de12a971 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -28,6 +28,7 @@ #include "../HifiSockAddr.h" #include "Constants.h" +#include "PacketQueue.h" #include "SequenceNumber.h" #include "LossList.h" @@ -38,8 +39,6 @@ class ControlPacket; class Packet; class PacketList; class Socket; - -using MessageNumber = uint32_t; class SendQueue : public QObject { Q_OBJECT @@ -95,21 +94,19 @@ private: bool maybeSendNewPacket(); // Figures out what packet to send next bool maybeResendPacket(); // Determines whether to resend a packet and which one + bool isInactive(bool sentAPacket); void deactivate(); // makes the queue inactive and cleans it up // Increments current sequence number and return it SequenceNumber getNextSequenceNumber(); - MessageNumber getNextMessageNumber(); - mutable std::mutex _packetsLock; // Protects the packets to be sent list. - std::list> _packets; // List of packets to be sent + PacketQueue _packets; Socket* _socket { nullptr }; // Socket to send packet on HifiSockAddr _destination; // Destination addr std::atomic _lastACKSequenceNumber { 0 }; // Last ACKed sequence number - MessageNumber _currentMessageNumber { 0 }; SequenceNumber _currentSequenceNumber; // Last sequence number sent out std::atomic _atomicCurrentSequenceNumber { 0 }; // Atomic for last sequence number sent out