From 4758dd2a53975315d8a4f3212ffd92ef4b6d1440 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 17 Sep 2015 14:13:24 -0700 Subject: [PATCH] correct locking races in SendQueue/Connection cleanup --- libraries/networking/src/udt/Connection.cpp | 30 ++++++++++------ libraries/networking/src/udt/Connection.h | 5 +++ libraries/networking/src/udt/SendQueue.cpp | 35 +++++++++++++----- libraries/networking/src/udt/SendQueue.h | 8 ++++- libraries/networking/src/udt/Socket.cpp | 40 +++++++++++++++++---- 5 files changed, 90 insertions(+), 28 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 2fb28f81ee..1bda840a6c 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -56,14 +56,15 @@ void Connection::stopSendQueue() { // grab the send queue thread so we can wait on it QThread* sendQueueThread = _sendQueue->thread(); - // since we're stopping the send queue we should consider our handshake ACK not receieved - _hasReceivedHandshakeACK = false; - // tell the send queue to stop and be deleted + _sendQueue->stop(); _sendQueue->deleteLater(); _sendQueue.release(); + // since we're stopping the send queue we should consider our handshake ACK not receieved + _hasReceivedHandshakeACK = false; + // wait on the send queue thread so we know the send queue is gone sendQueueThread->quit(); sendQueueThread->wait(); @@ -111,7 +112,7 @@ void Connection::queueInactive() { qCDebug(networking) << "Connection SendQueue to" << _destination << "stopped and no data is being received - stopping connection."; #endif - emit connectionInactive(_destination); + deactivate(); } } @@ -170,7 +171,9 @@ void Connection::sync() { qCDebug(networking) << "Connection to" << _destination << "no longer receiving any data and there is currently no send queue - stopping connection."; #endif - emit connectionInactive(_destination); + deactivate(); + + return; } } @@ -207,7 +210,9 @@ void Connection::sync() { << CONNECTION_NOT_USED_EXPIRY_SECONDS << "seconds - stopping connection."; #endif - emit connectionInactive(_destination); + deactivate(); + + return; } } } @@ -728,11 +733,14 @@ void Connection::processHandshake(std::unique_ptr controlPacket) } void Connection::processHandshakeACK(std::unique_ptr controlPacket) { - // hand off this handshake ACK to the send queue so it knows it can start sending - getSendQueue().handshakeACK(); - - // indicate that handshake ACK was received - _hasReceivedHandshakeACK = true; + // if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless + if (_sendQueue) { + // hand off this handshake ACK to the send queue so it knows it can start sending + getSendQueue().handshakeACK(); + + // indicate that handshake ACK was received + _hasReceivedHandshakeACK = true; + } } void Connection::processTimeoutNAK(std::unique_ptr controlPacket) { diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 2b1dec1ae9..13756c12f9 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -71,6 +71,8 @@ public: void queueReceivedMessagePacket(std::unique_ptr packet); ConnectionStats::Stats sampleStats() { return _stats.sample(); } + + bool isActive() const { return _isActive; } signals: void packetSent(); @@ -100,6 +102,8 @@ private: void resetReceiveState(); void resetRTT(); + void deactivate() { _isActive = false; emit connectionInactive(_destination); } + SendQueue& getSendQueue(); SequenceNumber nextACK() const; void updateRTT(int rtt); @@ -123,6 +127,7 @@ private: p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection + bool _isActive { true }; // flag used for inactivity of connection LossList _lossList; // List of all missing packets SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index a09ea6ca9a..31c2f41259 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -65,6 +65,7 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin // Move queue to private thread and start it queue->moveToThread(thread); + thread->start(); return std::move(queue); @@ -89,7 +90,8 @@ void SendQueue::queuePacket(std::unique_ptr packet) { // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets _emptyCondition.notify_one(); } - if (!this->thread()->isRunning()) { + + if (!this->thread()->isRunning() && _state == State::NotStarted) { this->thread()->start(); } } @@ -135,14 +137,15 @@ void SendQueue::queuePacketList(std::unique_ptr packetList) { // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets _emptyCondition.notify_one(); } - + if (!this->thread()->isRunning()) { this->thread()->start(); } } void SendQueue::stop() { - _isRunning = false; + + _state = State::Stopped; // in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner _handshakeACKCondition.notify_one(); @@ -268,9 +271,23 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr newPacket, } void SendQueue::run() { - _isRunning = true; + if (_state == State::Stopped) { + // we've already been asked to stop before we even got a chance to start + // don't start now +#ifdef UDT_CONNECTION_DEBUG + qDebug() << "SendQueue asked to run after being told to stop. Will not run."; +#endif + return; + } else if (_state == State::Running) { +#ifdef UDT_CONNECTION_DEBUG + qDebug() << "SendQueue asked to run but is already running (according to state). Will not re-run."; +#endif + return; + } - while (_isRunning) { + _state = State::Running; + + while (_state == State::Running) { // Record how long the loop takes to execute auto loopStartTimestamp = p_high_resolution_clock::now(); @@ -314,11 +331,11 @@ void SendQueue::run() { } // since we're a while loop, give the thread a chance to process events - QCoreApplication::processEvents(); + QCoreApplication::sendPostedEvents(this, 0); // we just processed events so check now if we were just told to stop - if (!_isRunning) { - break; + if (_state != State::Running) { + return; } if (_hasReceivedHandshakeACK && !sentAPacket) { @@ -525,5 +542,5 @@ void SendQueue::deactivate() { // this queue is inactive - emit that signal and stop the while emit queueInactive(); - _isRunning = false; + _state = State::Stopped; } diff --git a/libraries/networking/src/udt/SendQueue.h b/libraries/networking/src/udt/SendQueue.h index 2e7ec90c45..88b6b045b0 100644 --- a/libraries/networking/src/udt/SendQueue.h +++ b/libraries/networking/src/udt/SendQueue.h @@ -45,6 +45,12 @@ class SendQueue : public QObject { Q_OBJECT public: + enum class State { + NotStarted, + Running, + Stopped + }; + static std::unique_ptr create(Socket* socket, HifiSockAddr destination); void queuePacket(std::unique_ptr packet); @@ -106,7 +112,7 @@ private: std::atomic _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out std::atomic _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC - std::atomic _isRunning { false }; + std::atomic _state { State::NotStarted }; std::atomic _estimatedTimeout { 0 }; // Estimated timeout, set from CC std::atomic _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 56a00c6808..c148374edb 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -183,8 +183,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { auto connection = std::unique_ptr(new Connection(this, sockAddr, _ccFactory->create())); // we queue the connection to cleanup connection in case it asks for it during its own rate control sync - QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection, - Qt::QueuedConnection); + QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection); #ifdef UDT_CONNECTION_DEBUG qCDebug(networking) << "Creating new connection to" << sockAddr; @@ -208,11 +207,15 @@ void Socket::clearConnections() { } void Socket::cleanupConnection(HifiSockAddr sockAddr) { -#ifdef UDT_CONNECTION_DEBUG - qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; -#endif + auto it = _connectionsHash.find(sockAddr); - _connectionsHash.erase(sockAddr); + if (it != _connectionsHash.end()) { +#ifdef UDT_CONNECTION_DEBUG + qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; +#endif + + _connectionsHash.erase(sockAddr); + } } void Socket::messageReceived(std::unique_ptr packetList) { @@ -297,8 +300,31 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r void Socket::rateControlSync() { // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control + + // this way we do this is a little funny looking - we need to avoid the case where we call sync and + // (because of our Qt direct connection to the Connection's signal that it has been deactivated) + // an iterator on _connectionsHash would be invalidated by our own call to cleanupConnection + + // collect the sockets for all connections in a vector + + std::vector sockAddrVector; + sockAddrVector.reserve(_connectionsHash.size()); + for (auto& connection : _connectionsHash) { - connection.second->sync(); + sockAddrVector.emplace_back(connection.first); + } + + // enumerate that vector of HifiSockAddr objects + for (auto& sockAddr : sockAddrVector) { + // pull out the respective connection via a quick find on the unordered_map + auto it = _connectionsHash.find(sockAddr); + + if (it != _connectionsHash.end()) { + // if the connection is erased while calling sync since we are not holding an iterator that was invalidated + // we're good to go + auto& connection = _connectionsHash[sockAddr]; + connection->sync(); + } } if (_synTimer->interval() != _synInterval) {