From d908cd4a530b9298307a279e0d88e471b0f22f10 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 06:35:02 -0700 Subject: [PATCH 01/13] use NotStarted instead of Unsent, return NotFound for Files --- .../networking/src/FileResourceRequest.cpp | 17 ++++++++++++----- .../networking/src/HTTPResourceRequest.cpp | 2 +- libraries/networking/src/ResourceRequest.cpp | 2 +- libraries/networking/src/ResourceRequest.h | 4 ++-- 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/libraries/networking/src/FileResourceRequest.cpp b/libraries/networking/src/FileResourceRequest.cpp index 261b1028f3..1c5e0a1977 100644 --- a/libraries/networking/src/FileResourceRequest.cpp +++ b/libraries/networking/src/FileResourceRequest.cpp @@ -15,14 +15,21 @@ void FileResourceRequest::doSend() { QString filename = _url.toLocalFile(); + QFile file(filename); + _state = Finished; - if (file.open(QFile::ReadOnly)) { - _data = file.readAll(); - _result = ResourceRequest::Success; - emit finished(); + if (file.exists()) { + if (file.open(QFile::ReadOnly)) { + _data = file.readAll(); + _result = ResourceRequest::Success; + emit finished(); + } else { + _result = ResourceRequest::AccessDenied; + emit finished(); + } } else { - _result = ResourceRequest::AccessDenied; + _result = ResourceRequest::NotFound; emit finished(); } } diff --git a/libraries/networking/src/HTTPResourceRequest.cpp b/libraries/networking/src/HTTPResourceRequest.cpp index c61fe50f23..b122369c30 100644 --- a/libraries/networking/src/HTTPResourceRequest.cpp +++ b/libraries/networking/src/HTTPResourceRequest.cpp @@ -84,7 +84,7 @@ void HTTPResourceRequest::onDownloadProgress(qint64 bytesReceived, qint64 bytesT } void HTTPResourceRequest::onTimeout() { - Q_ASSERT(_state != Unsent); + Q_ASSERT(_state != NotStarted); if (_state == InProgress) { qCDebug(networking) << "Timed out loading " << _url; diff --git a/libraries/networking/src/ResourceRequest.cpp b/libraries/networking/src/ResourceRequest.cpp index d10f50264f..c6880636ea 100644 --- a/libraries/networking/src/ResourceRequest.cpp +++ b/libraries/networking/src/ResourceRequest.cpp @@ -17,7 +17,7 @@ ResourceRequest::ResourceRequest(QObject* parent, const QUrl& url) : } void ResourceRequest::send() { - Q_ASSERT(_state == Unsent); + Q_ASSERT(_state == NotStarted); _state = InProgress; doSend(); diff --git a/libraries/networking/src/ResourceRequest.h b/libraries/networking/src/ResourceRequest.h index 055d32c1a5..c5e56e4a85 100644 --- a/libraries/networking/src/ResourceRequest.h +++ b/libraries/networking/src/ResourceRequest.h @@ -21,7 +21,7 @@ public: ResourceRequest(QObject* parent, const QUrl& url); enum State { - Unsent = 0, + NotStarted = 0, InProgress, Finished }; @@ -51,7 +51,7 @@ protected: virtual void doSend() = 0; QUrl _url; - State _state { Unsent }; + State _state { NotStarted }; Result _result; QByteArray _data; bool _cacheEnabled { true }; From 28d9610bd494c07edb23a18bf9284d94e5695e83 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 07:31:17 -0700 Subject: [PATCH 02/13] add a listen port option to ACs --- assignment-client/src/AssignmentClient.cpp | 6 +++--- assignment-client/src/AssignmentClient.h | 1 + assignment-client/src/AssignmentClientApp.cpp | 16 +++++++++++++--- assignment-client/src/AssignmentClientApp.h | 6 +++--- .../src/AssignmentClientMonitor.cpp | 4 ++-- assignment-client/src/AssignmentClientMonitor.h | 2 +- 6 files changed, 23 insertions(+), 12 deletions(-) diff --git a/assignment-client/src/AssignmentClient.cpp b/assignment-client/src/AssignmentClient.cpp index 3780655536..708589c32f 100644 --- a/assignment-client/src/AssignmentClient.cpp +++ b/assignment-client/src/AssignmentClient.cpp @@ -42,8 +42,8 @@ const long long ASSIGNMENT_REQUEST_INTERVAL_MSECS = 1 * 1000; int hifiSockAddrMeta = qRegisterMetaType("HifiSockAddr"); AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool, - QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort, - quint16 assignmentMonitorPort) : + quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname, + quint16 assignmentServerPort, quint16 assignmentMonitorPort) : _assignmentServerHostname(DEFAULT_ASSIGNMENT_SERVER_HOSTNAME) { LogUtils::init(); @@ -53,7 +53,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri auto addressManager = DependencyManager::set(); // create a NodeList as an unassigned client, must be after addressManager - auto nodeList = DependencyManager::set(NodeType::Unassigned); + auto nodeList = DependencyManager::set(NodeType::Unassigned, listenPort); auto animationCache = DependencyManager::set(); auto entityScriptingInterface = DependencyManager::set(); diff --git a/assignment-client/src/AssignmentClient.h b/assignment-client/src/AssignmentClient.h index 73eaa3dc9f..9d2c816861 100644 --- a/assignment-client/src/AssignmentClient.h +++ b/assignment-client/src/AssignmentClient.h @@ -23,6 +23,7 @@ class AssignmentClient : public QObject { Q_OBJECT public: AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool, + quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort, quint16 assignmentMonitorPort); ~AssignmentClient(); diff --git a/assignment-client/src/AssignmentClientApp.cpp b/assignment-client/src/AssignmentClientApp.cpp index 2edae340c3..3b9f8af868 100644 --- a/assignment-client/src/AssignmentClientApp.cpp +++ b/assignment-client/src/AssignmentClientApp.cpp @@ -59,6 +59,10 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) : const QCommandLineOption poolOption(ASSIGNMENT_POOL_OPTION, "set assignment pool", "pool-name"); parser.addOption(poolOption); + + const QCommandLineOption portOption(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION, + "UDP port for this assignment client (or monitor)", "port"); + parser.addOption(portOption); const QCommandLineOption walletDestinationOption(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION, "set wallet destination", "wallet-uuid"); @@ -158,12 +162,18 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) : // check for an overriden assignment server port quint16 assignmentServerPort = DEFAULT_DOMAIN_SERVER_PORT; if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) { - assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toString().toUInt(); + assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toUInt(); } if (parser.isSet(assignmentServerPortOption)) { assignmentServerPort = parser.value(assignmentServerPortOption).toInt(); } + + // check for an overidden listen port + quint16 listenPort = 0; + if (argumentVariantMap.contains(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION)) { + listenPort = argumentVariantMap.value(ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION).toUInt(); + } if (parser.isSet(numChildsOption)) { if (minForks && minForks > numForks) { @@ -185,12 +195,12 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) : if (numForks || minForks || maxForks) { AssignmentClientMonitor* monitor = new AssignmentClientMonitor(numForks, minForks, maxForks, requestAssignmentType, assignmentPool, - walletUUID, assignmentServerHostname, + listenPort, walletUUID, assignmentServerHostname, assignmentServerPort); monitor->setParent(this); connect(this, &QCoreApplication::aboutToQuit, monitor, &AssignmentClientMonitor::aboutToQuit); } else { - AssignmentClient* client = new AssignmentClient(requestAssignmentType, assignmentPool, + AssignmentClient* client = new AssignmentClient(requestAssignmentType, assignmentPool, listenPort, walletUUID, assignmentServerHostname, assignmentServerPort, monitorPort); client->setParent(this); diff --git a/assignment-client/src/AssignmentClientApp.h b/assignment-client/src/AssignmentClientApp.h index bbc60256a7..ab7e8ed304 100644 --- a/assignment-client/src/AssignmentClientApp.h +++ b/assignment-client/src/AssignmentClientApp.h @@ -17,15 +17,15 @@ const QString ASSIGNMENT_TYPE_OVERRIDE_OPTION = "t"; const QString ASSIGNMENT_POOL_OPTION = "pool"; +const QString ASSIGNMENT_CLIENT_LISTEN_PORT_OPTION = "p"; const QString ASSIGNMENT_WALLET_DESTINATION_ID_OPTION = "wallet"; -const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "a"; -const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "p"; +const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "i"; +const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "server-port"; const QString ASSIGNMENT_NUM_FORKS_OPTION = "n"; const QString ASSIGNMENT_MIN_FORKS_OPTION = "min"; const QString ASSIGNMENT_MAX_FORKS_OPTION = "max"; const QString ASSIGNMENT_CLIENT_MONITOR_PORT_OPTION = "monitor-port"; - class AssignmentClientApp : public QCoreApplication { Q_OBJECT public: diff --git a/assignment-client/src/AssignmentClientMonitor.cpp b/assignment-client/src/AssignmentClientMonitor.cpp index ddea6cc702..5bb31c8c6f 100644 --- a/assignment-client/src/AssignmentClientMonitor.cpp +++ b/assignment-client/src/AssignmentClientMonitor.cpp @@ -28,7 +28,7 @@ AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmen const unsigned int minAssignmentClientForks, const unsigned int maxAssignmentClientForks, Assignment::Type requestAssignmentType, QString assignmentPool, - QUuid walletUUID, QString assignmentServerHostname, + quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort) : _numAssignmentClientForks(numAssignmentClientForks), _minAssignmentClientForks(minAssignmentClientForks), @@ -50,7 +50,7 @@ AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmen // create a NodeList so we can receive stats from children DependencyManager::registerInheritance(); auto addressManager = DependencyManager::set(); - auto nodeList = DependencyManager::set(); + auto nodeList = DependencyManager::set(listenPort); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerListener(PacketType::AssignmentClientStatus, this, "handleChildStatusPacket"); diff --git a/assignment-client/src/AssignmentClientMonitor.h b/assignment-client/src/AssignmentClientMonitor.h index 8463498d7d..93fc9361ad 100644 --- a/assignment-client/src/AssignmentClientMonitor.h +++ b/assignment-client/src/AssignmentClientMonitor.h @@ -28,7 +28,7 @@ class AssignmentClientMonitor : public QObject { public: AssignmentClientMonitor(const unsigned int numAssignmentClientForks, const unsigned int minAssignmentClientForks, const unsigned int maxAssignmentClientForks, Assignment::Type requestAssignmentType, - QString assignmentPool, QUuid walletUUID, QString assignmentServerHostname, + QString assignmentPool, quint16 listenPort, QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort); ~AssignmentClientMonitor(); From cf98d4a8f74616ca08f55eceb80e1e01d9decb00 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 07:31:44 -0700 Subject: [PATCH 03/13] add a control packet type for LightACK --- libraries/networking/src/udt/Connection.cpp | 12 ++++++------ libraries/networking/src/udt/ControlPacket.h | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index de4b76a972..a502c714e3 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -237,7 +237,7 @@ void Connection::sendLightACK() { // create the light ACK packet, make it static so we can re-use it static const int LIGHT_ACK_PACKET_PAYLOAD_BYTES = sizeof(SequenceNumber); - static auto lightACKPacket = ControlPacket::create(ControlPacket::ACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES); + static auto lightACKPacket = ControlPacket::create(ControlPacket::LightACK, LIGHT_ACK_PACKET_PAYLOAD_BYTES); // reset the lightACKPacket before we go to write the ACK to it lightACKPacket->reset(); @@ -407,13 +407,13 @@ void Connection::processControl(std::unique_ptr controlPacket) { switch (controlPacket->getType()) { case ControlPacket::ACK: if (_hasReceivedHandshakeACK) { - if (controlPacket->getPayloadSize() == sizeof(SequenceNumber)) { - processLightACK(move(controlPacket)); - } else { - processACK(move(controlPacket)); - } + processACK(move(controlPacket)); } break; + case ControlPacket::LightACK: + if (_hasReceivedHandshakeACK) { + processLightACK(move(controlPacket)); + } case ControlPacket::ACK2: if (_hasReceivedHandshake) { processACK2(move(controlPacket)); diff --git a/libraries/networking/src/udt/ControlPacket.h b/libraries/networking/src/udt/ControlPacket.h index bcc559f4f6..1976899c14 100644 --- a/libraries/networking/src/udt/ControlPacket.h +++ b/libraries/networking/src/udt/ControlPacket.h @@ -29,6 +29,7 @@ public: enum Type : uint16_t { ACK, ACK2, + LightACK, NAK, TimeoutNAK, Handshake, From 77aeae7dc03a5dd2d83c24dc3750e714de4086c6 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:10:06 -0700 Subject: [PATCH 04/13] change to timeout behaviour to re-send unACKed packets --- libraries/networking/src/udt/Connection.cpp | 7 +- libraries/networking/src/udt/SendQueue.cpp | 87 +++++++++++++++------ libraries/networking/src/udt/SendQueue.h | 7 +- 3 files changed, 73 insertions(+), 28 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index a502c714e3..74ad707b11 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -727,9 +727,12 @@ void Connection::updateCongestionControlAndSendQueue(std::function cong // fire congestion control callback congestionCallback(); + auto& sendQueue = getSendQueue(); + // now that we've updated the congestion control, update the packet send period and flow window size - getSendQueue().setPacketSendPeriod(_congestionControl->_packetSendPeriod); - getSendQueue().setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); + sendQueue.setPacketSendPeriod(_congestionControl->_packetSendPeriod); + sendQueue.setEstimatedTimeout(estimatedTimeout()); + sendQueue.setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); // record connection stats _stats.recordPacketSendPeriod(_congestionControl->_packetSendPeriod); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 36ebd2a6b0..7db24a63a3 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -154,6 +154,9 @@ void SendQueue::sendPacket(const Packet& packet) { } void SendQueue::ack(SequenceNumber ack) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + if (_lastACKSequenceNumber == (uint32_t) ack) { return; } @@ -177,6 +180,9 @@ void SendQueue::ack(SequenceNumber ack) { } void SendQueue::nak(SequenceNumber start, SequenceNumber end) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + std::unique_lock nakLocker(_naksLock); _naks.insert(start, end); @@ -189,6 +195,9 @@ void SendQueue::nak(SequenceNumber start, SequenceNumber end) { } void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { + // this is a response from the client, re-set our timeout expiry + _timeoutExpiryCount = 0; + std::unique_lock nakLocker(_naksLock); _naks.clear(); @@ -212,7 +221,7 @@ void SendQueue::overrideNAKListFromPacket(ControlPacket& packet) { } void SendQueue::handshakeACK() { - std::unique_lock locker(_handshakeMutex); + std::unique_lock locker { _handshakeMutex }; _hasReceivedHandshakeACK = true; @@ -258,7 +267,7 @@ void SendQueue::run() { while (_isRunning) { // Record how long the loop takes to execute auto loopStartTimestamp = high_resolution_clock::now(); - + std::unique_lock handshakeLock { _handshakeMutex }; if (!_hasReceivedHandshakeACK) { @@ -305,12 +314,6 @@ void SendQueue::run() { sentAPacket = maybeSendNewPacket(); } - // Keep track of how long the flow window has been full for - if (flowWindowFull && !_flowWindowWasFull) { - _flowWindowFullSince = loopStartTimestamp; - } - _flowWindowWasFull = flowWindowFull; - // since we're a while loop, give the thread a chance to process events QCoreApplication::processEvents(); @@ -320,15 +323,22 @@ void SendQueue::run() { } if (_hasReceivedHandshakeACK && !sentAPacket) { - static const std::chrono::seconds CONSIDER_INACTIVE_AFTER { 5 }; + // check if it is time to break this connection - if (flowWindowFull && (high_resolution_clock::now() - _flowWindowFullSince) > CONSIDER_INACTIVE_AFTER) { + // that will be the case if we have had 16 timeouts since hearing back from the client, and it has been + // at least 10 seconds + + static const int NUM_TIMEOUTS_BEFORE_INACTIVE = 16; + + if (_timeoutExpiryCount >= NUM_TIMEOUTS_BEFORE_INACTIVE) { // If the flow window has been full for over CONSIDER_INACTIVE_AFTER, // then signal the queue is inactive and return so it can be cleaned up + qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is" + << "considered inactive. It is now being stopped."; emit queueInactive(); return; } else { - // During our processing above we didn't send any packets and the flow window is not full. + // During our processing above we didn't send any packets // If that is still the case we should use a condition_variable_any to sleep until we have data to handle. // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock @@ -337,21 +347,49 @@ void SendQueue::run() { // The packets queue and loss list mutexes are now both locked - check if they're still both empty if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { - // both are empty - let's use a condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, CONSIDER_INACTIVE_AFTER); - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); + if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { + // we've sent the client as much data as we have (and they've ACKed it) + // either wait for new data to send or 5 seconds before cleaning up the queue + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT_US = std::chrono::microseconds(5 * 1000 * 1000); - if (cvStatus == std::cv_status::timeout) { - // the wait_for released because we've been inactive for too long - // so emit our inactive signal and return so the send queue can be cleaned up - emit queueInactive(); - return; + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT_US); + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + if (cvStatus == std::cv_status::timeout) { + qDebug() << "SendQueue to" << _destination << "has been empty for" + << std::chrono::duration_cast(EMPTY_QUEUES_INACTIVE_TIMEOUT_US).count() + << "and receiver has ACKed all packets. The queue is considered inactive and will be stopped"; + + // this queue is inactive - emit that signal and stop the while + emit queueInactive(); + return; + } + } else { + // We think the client is still waiting for data (based on the sequence number gap) + // Let's wait either for a response from the client or until the estimated timeout + auto waitDuration = std::chrono::microseconds(_estimatedTimeout); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); + + if (cvStatus == std::cv_status::timeout) { + // increase the number of timeouts + ++_timeoutExpiryCount; + + // Add all of the packets above the last received ACKed sequence number to the loss list + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + // skip to the next iteration + continue; } - - // skip to the next iteration - continue; } } } @@ -452,4 +490,3 @@ bool SendQueue::maybeResendPacket() { // No packet was resent return false; } - diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 70493f2054..22d52d28f0 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -57,6 +57,8 @@ public: int getPacketSendPeriod() const { return _packetSendPeriod; } void setPacketSendPeriod(int newPeriod) { _packetSendPeriod = newPeriod; } + void setEstimatedTimeout(int estimatedTimeout) { _estimatedTimeout = estimatedTimeout; } + public slots: void stop(); @@ -104,6 +106,9 @@ private: std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC std::atomic _isRunning { false }; + std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC + std::atomic _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client + std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC // Used to detect when the connection becomes inactive for too long @@ -117,7 +122,7 @@ private: std::unordered_map> _sentPackets; // Packets waiting for ACK. std::mutex _handshakeMutex; // Protects the handshake ACK condition_variable - bool _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client + std::atomic _hasReceivedHandshakeACK { false }; // flag for receipt of handshake ACK from client std::condition_variable _handshakeACKCondition; std::condition_variable_any _emptyCondition; From 539108dd454a13cfb8ed345b2a47683aca42cfac Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:26:32 -0700 Subject: [PATCH 05/13] repairs for new timeout code --- libraries/networking/src/udt/Connection.cpp | 3 ++- libraries/networking/src/udt/SendQueue.cpp | 13 +++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 74ad707b11..93c13874ea 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -78,8 +78,9 @@ SendQueue& Connection::getSendQueue() { QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); - // set defaults on the send queue from our congestion control object + // set defaults on the send queue from our congestion control object and estimatedTimeout() _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); + _sendQueue->setEstimatedTimeout(estimatedTimeout()); _sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 7db24a63a3..d627937761 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -361,7 +361,8 @@ void SendQueue::run() { if (cvStatus == std::cv_status::timeout) { qDebug() << "SendQueue to" << _destination << "has been empty for" << std::chrono::duration_cast(EMPTY_QUEUES_INACTIVE_TIMEOUT_US).count() - << "and receiver has ACKed all packets. The queue is considered inactive and will be stopped"; + << "seconds and receiver has ACKed all packets." + << "The queue is considered inactive and will be stopped."; // this queue is inactive - emit that signal and stop the while emit queueInactive(); @@ -379,9 +380,13 @@ void SendQueue::run() { // increase the number of timeouts ++_timeoutExpiryCount; - // Add all of the packets above the last received ACKed sequence number to the loss list - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list + + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } } // we have the double lock again - Make sure to unlock it From 21c80e45c2ef9823ccc9ae5ed0813095f19ada96 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:42:52 -0700 Subject: [PATCH 06/13] guard connection addition by a mutex in Socket --- libraries/networking/src/udt/SendQueue.cpp | 7 +++++++ libraries/networking/src/udt/Socket.cpp | 6 ++++++ libraries/networking/src/udt/Socket.h | 2 ++ 3 files changed, 15 insertions(+) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index d627937761..3ee4d48d1d 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -335,7 +335,11 @@ void SendQueue::run() { // then signal the queue is inactive and return so it can be cleaned up qDebug() << "SendQueue to" << _destination << "reached" << NUM_TIMEOUTS_BEFORE_INACTIVE << "timeouts and is" << "considered inactive. It is now being stopped."; + emit queueInactive(); + + _isRunning = false; + return; } else { // During our processing above we didn't send any packets @@ -366,6 +370,9 @@ void SendQueue::run() { // this queue is inactive - emit that signal and stop the while emit queueInactive(); + + _isRunning = false; + return; } } else { diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 4b1f5f8a83..c9a681abfc 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -143,6 +143,8 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc } Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { + QWriteLocker locker(&_connectionsMutex); + auto it = _connectionsHash.find(sockAddr); if (it == _connectionsHash.end()) { @@ -160,12 +162,16 @@ void Socket::clearConnections() { return; } + QWriteLocker locker(&_connectionsMutex); + // clear all of the current connections in the socket qDebug() << "Clearing all remaining connections in Socket."; _connectionsHash.clear(); } void Socket::cleanupConnection(HifiSockAddr sockAddr) { + QWriteLocker locker(&_connectionsMutex); + qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; _connectionsHash.erase(sockAddr); } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index f421b98288..fa6bf874a8 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -94,6 +94,8 @@ private: std::unordered_map _unreliableSequenceNumbers; std::unordered_map> _connectionsHash; + QReadWriteLock _connectionsMutex; // guards concurrent access to connections hashs + int _synInterval = 10; // 10ms QTimer _synTimer; From dcd5a4aec2e9efe9f0ae9301d40d260d61431217 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:47:25 -0700 Subject: [PATCH 07/13] address comments in code review --- libraries/networking/src/FileResourceRequest.cpp | 5 ++--- libraries/networking/src/udt/SendQueue.cpp | 13 ++++++------- libraries/networking/src/udt/SendQueue.h | 4 ---- 3 files changed, 8 insertions(+), 14 deletions(-) diff --git a/libraries/networking/src/FileResourceRequest.cpp b/libraries/networking/src/FileResourceRequest.cpp index 1c5e0a1977..ffaaa0d193 100644 --- a/libraries/networking/src/FileResourceRequest.cpp +++ b/libraries/networking/src/FileResourceRequest.cpp @@ -23,13 +23,12 @@ void FileResourceRequest::doSend() { if (file.open(QFile::ReadOnly)) { _data = file.readAll(); _result = ResourceRequest::Success; - emit finished(); } else { _result = ResourceRequest::AccessDenied; - emit finished(); } } else { _result = ResourceRequest::NotFound; - emit finished(); } + + emit finished(); } diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 3ee4d48d1d..9a371749e2 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -304,14 +304,13 @@ void SendQueue::run() { handshakeLock.unlock(); bool sentAPacket = maybeResendPacket(); - bool flowWindowFull = false; // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (_hasReceivedHandshakeACK && !sentAPacket) { - flowWindowFull = (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > - _flowWindowSize); - sentAPacket = maybeSendNewPacket(); + if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > _flowWindowSize) { + sentAPacket = maybeSendNewPacket(); + } } // since we're a while loop, give the thread a chance to process events @@ -354,17 +353,17 @@ void SendQueue::run() { if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { // we've sent the client as much data as we have (and they've ACKed it) // either wait for new data to send or 5 seconds before cleaning up the queue - static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT_US = std::chrono::microseconds(5 * 1000 * 1000); + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5 * 1000 * 1000); // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT_US); + auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); // we have the double lock again - Make sure to unlock it doubleLock.unlock(); if (cvStatus == std::cv_status::timeout) { qDebug() << "SendQueue to" << _destination << "has been empty for" - << std::chrono::duration_cast(EMPTY_QUEUES_INACTIVE_TIMEOUT_US).count() + << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() << "seconds and receiver has ACKed all packets." << "The queue is considered inactive and will be stopped."; diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 22d52d28f0..c6668f1d09 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -111,10 +111,6 @@ private: std::atomic _flowWindowSize { 0 }; // Flow control window size (number of packets that can be on wire) - set from CC - // Used to detect when the connection becomes inactive for too long - bool _flowWindowWasFull = false; - time_point _flowWindowFullSince; - mutable std::mutex _naksLock; // Protects the naks list. LossList _naks; // Sequence numbers of packets to resend From e662209754f3f7f7463f75ce322a841306e245e9 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:48:55 -0700 Subject: [PATCH 08/13] use a QMutex instead of QReadWriteMutex --- libraries/networking/src/udt/Socket.cpp | 6 +++--- libraries/networking/src/udt/Socket.h | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index c9a681abfc..f3ce095b50 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -143,7 +143,7 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc } Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { - QWriteLocker locker(&_connectionsMutex); + QMutexLocker locker(&_connectionsMutex); auto it = _connectionsHash.find(sockAddr); @@ -162,7 +162,7 @@ void Socket::clearConnections() { return; } - QWriteLocker locker(&_connectionsMutex); + QMutexLocker locker(&_connectionsMutex); // clear all of the current connections in the socket qDebug() << "Clearing all remaining connections in Socket."; @@ -170,7 +170,7 @@ void Socket::clearConnections() { } void Socket::cleanupConnection(HifiSockAddr sockAddr) { - QWriteLocker locker(&_connectionsMutex); + QMutexLocker locker(&_connectionsMutex); qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; _connectionsHash.erase(sockAddr); diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index fa6bf874a8..23dd313462 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -94,7 +94,7 @@ private: std::unordered_map _unreliableSequenceNumbers; std::unordered_map> _connectionsHash; - QReadWriteLock _connectionsMutex; // guards concurrent access to connections hashs + QMutex _connectionsMutex; // guards concurrent access to connections hashs int _synInterval = 10; // 10ms QTimer _synTimer; From 577b6bf62f5b966f02fc7435c54a2747eb9e0941 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:52:33 -0700 Subject: [PATCH 09/13] correct the check for flow window size --- libraries/networking/src/udt/SendQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 9a371749e2..686701be9c 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -308,7 +308,7 @@ void SendQueue::run() { // if we didn't find a packet to re-send AND we think we can fit a new packet on the wire // (this is according to the current flow window size) then we send out a new packet if (_hasReceivedHandshakeACK && !sentAPacket) { - if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) > _flowWindowSize) { + if (seqlen(SequenceNumber { (uint32_t) _lastACKSequenceNumber }, _currentSequenceNumber) <= _flowWindowSize) { sentAPacket = maybeSendNewPacket(); } } From d66375bb0983ee28efff0a2c81852671d3acd7d6 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 09:54:51 -0700 Subject: [PATCH 10/13] fix queue timeout for empty queue --- libraries/networking/src/udt/SendQueue.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 686701be9c..7fcf5517b1 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -353,7 +353,7 @@ void SendQueue::run() { if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { // we've sent the client as much data as we have (and they've ACKed it) // either wait for new data to send or 5 seconds before cleaning up the queue - static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5 * 1000 * 1000); + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); // use our condition_variable_any to wait auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); From ac3a1d54b1d5dc1e4ace971802b94aceb1176098 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 10:07:01 -0700 Subject: [PATCH 11/13] don't lock in while for maybeResendPacket --- libraries/networking/src/udt/SendQueue.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 7fcf5517b1..1de2b6febc 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -459,9 +459,10 @@ bool SendQueue::maybeSendNewPacket() { } bool SendQueue::maybeResendPacket() { + std::unique_lock naksLocker(_naksLock); + // the following while makes sure that we find a packet to re-send, if there is one while (true) { - std::unique_lock naksLocker(_naksLock); if (_naks.getLength() > 0) { // pull the sequence number we need to re-send From 54cd430be99d3c51e05da7d77f1ee49b8fd63c40 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 10:35:29 -0700 Subject: [PATCH 12/13] unlock if double lock succeeds but queues not empty --- libraries/networking/src/udt/SendQueue.cpp | 97 ++++++++++++---------- 1 file changed, 51 insertions(+), 46 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 1de2b6febc..1bfefa12e1 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -347,60 +347,65 @@ void SendQueue::run() { // To confirm that the queue of packets and the NAKs list are still both empty we'll need to use the DoubleLock DoubleLock doubleLock(_packetsLock, _naksLock); - // The packets queue and loss list mutexes are now both locked - check if they're still both empty - if (doubleLock.try_lock() && _packets.empty() && _naks.getLength() == 0) { + if (doubleLock.try_lock()) { + // The packets queue and loss list mutexes are now both locked - check if they're still both empty - if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { - // we've sent the client as much data as we have (and they've ACKed it) - // either wait for new data to send or 5 seconds before cleaning up the queue - static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - if (cvStatus == std::cv_status::timeout) { - qDebug() << "SendQueue to" << _destination << "has been empty for" + if (_packets.empty() && _naks.getLength() == 0) { + if (uint32_t(_lastACKSequenceNumber) == uint32_t(_currentSequenceNumber)) { + // we've sent the client as much data as we have (and they've ACKed it) + // either wait for new data to send or 5 seconds before cleaning up the queue + static const auto EMPTY_QUEUES_INACTIVE_TIMEOUT = std::chrono::seconds(5); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, EMPTY_QUEUES_INACTIVE_TIMEOUT); + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + if (cvStatus == std::cv_status::timeout) { + qDebug() << "SendQueue to" << _destination << "has been empty for" << EMPTY_QUEUES_INACTIVE_TIMEOUT.count() << "seconds and receiver has ACKed all packets." << "The queue is considered inactive and will be stopped."; - - // this queue is inactive - emit that signal and stop the while - emit queueInactive(); - - _isRunning = false; - - return; - } - } else { - // We think the client is still waiting for data (based on the sequence number gap) - // Let's wait either for a response from the client or until the estimated timeout - auto waitDuration = std::chrono::microseconds(_estimatedTimeout); - - // use our condition_variable_any to wait - auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); - - if (cvStatus == std::cv_status::timeout) { - // increase the number of timeouts - ++_timeoutExpiryCount; - - if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { - // after a timeout if we still have sent packets that the client hasn't ACKed we - // add them to the loss list - // Note that thanks to the DoubleLock we have the _naksLock right now - _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + // this queue is inactive - emit that signal and stop the while + emit queueInactive(); + + _isRunning = false; + + return; } + } else { + // We think the client is still waiting for data (based on the sequence number gap) + // Let's wait either for a response from the client or until the estimated timeout + auto waitDuration = std::chrono::microseconds(_estimatedTimeout); + + // use our condition_variable_any to wait + auto cvStatus = _emptyCondition.wait_for(doubleLock, waitDuration); + + if (cvStatus == std::cv_status::timeout) { + // increase the number of timeouts + ++_timeoutExpiryCount; + + if (SequenceNumber(_lastACKSequenceNumber) < _currentSequenceNumber) { + // after a timeout if we still have sent packets that the client hasn't ACKed we + // add them to the loss list + + // Note that thanks to the DoubleLock we have the _naksLock right now + _naks.append(SequenceNumber(_lastACKSequenceNumber) + 1, _currentSequenceNumber); + } + } + + // we have the double lock again - Make sure to unlock it + doubleLock.unlock(); + + // skip to the next iteration + continue; } - - // we have the double lock again - Make sure to unlock it - doubleLock.unlock(); - - // skip to the next iteration - continue; } + + // we got the try_lock but failed the other conditionals so we need to unlock + doubleLock.unlock(); } } } From 9575b47e4e7f8a6e7ca7e2d94e02bd8d53b1ae2d Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 28 Aug 2015 10:38:15 -0700 Subject: [PATCH 13/13] don't unlock double lock outside conditional --- libraries/networking/src/udt/SendQueue.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index 1bfefa12e1..e088cb8d8b 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -402,10 +402,10 @@ void SendQueue::run() { // skip to the next iteration continue; } - } - - // we got the try_lock but failed the other conditionals so we need to unlock - doubleLock.unlock(); + } else { + // we got the try_lock but failed the other conditionals so we need to unlock + doubleLock.unlock(); + } } } }