Merge pull request #5833 from birarda/master

fix for some locking races in SendQueue/Connection cleanup
This commit is contained in:
Ryan Huffman 2015-09-17 15:04:52 -07:00
commit d0acd74861
5 changed files with 89 additions and 29 deletions

View file

@ -56,14 +56,15 @@ void Connection::stopSendQueue() {
// grab the send queue thread so we can wait on it // grab the send queue thread so we can wait on it
QThread* sendQueueThread = _sendQueue->thread(); QThread* sendQueueThread = _sendQueue->thread();
// since we're stopping the send queue we should consider our handshake ACK not receieved
_hasReceivedHandshakeACK = false;
// tell the send queue to stop and be deleted // tell the send queue to stop and be deleted
_sendQueue->stop(); _sendQueue->stop();
_sendQueue->deleteLater(); _sendQueue->deleteLater();
_sendQueue.release(); _sendQueue.release();
// since we're stopping the send queue we should consider our handshake ACK not receieved
_hasReceivedHandshakeACK = false;
// wait on the send queue thread so we know the send queue is gone // wait on the send queue thread so we know the send queue is gone
sendQueueThread->quit(); sendQueueThread->quit();
sendQueueThread->wait(); sendQueueThread->wait();
@ -111,7 +112,7 @@ void Connection::queueInactive() {
qCDebug(networking) << "Connection SendQueue to" << _destination << "stopped and no data is being received - stopping connection."; qCDebug(networking) << "Connection SendQueue to" << _destination << "stopped and no data is being received - stopping connection.";
#endif #endif
emit connectionInactive(_destination); deactivate();
} }
} }
@ -170,7 +171,9 @@ void Connection::sync() {
qCDebug(networking) << "Connection to" << _destination << "no longer receiving any data and there is currently no send queue - stopping connection."; qCDebug(networking) << "Connection to" << _destination << "no longer receiving any data and there is currently no send queue - stopping connection.";
#endif #endif
emit connectionInactive(_destination); deactivate();
return;
} }
} }
@ -207,7 +210,9 @@ void Connection::sync() {
<< CONNECTION_NOT_USED_EXPIRY_SECONDS << "seconds - stopping connection."; << CONNECTION_NOT_USED_EXPIRY_SECONDS << "seconds - stopping connection.";
#endif #endif
emit connectionInactive(_destination); deactivate();
return;
} }
} }
} }
@ -728,11 +733,14 @@ void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket)
} }
void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) {
// hand off this handshake ACK to the send queue so it knows it can start sending // if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless
getSendQueue().handshakeACK(); if (_sendQueue) {
// hand off this handshake ACK to the send queue so it knows it can start sending
getSendQueue().handshakeACK();
// indicate that handshake ACK was received // indicate that handshake ACK was received
_hasReceivedHandshakeACK = true; _hasReceivedHandshakeACK = true;
}
} }
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) { void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) {

View file

@ -72,6 +72,8 @@ public:
ConnectionStats::Stats sampleStats() { return _stats.sample(); } ConnectionStats::Stats sampleStats() { return _stats.sample(); }
bool isActive() const { return _isActive; }
signals: signals:
void packetSent(); void packetSent();
void connectionInactive(const HifiSockAddr& sockAddr); void connectionInactive(const HifiSockAddr& sockAddr);
@ -100,6 +102,8 @@ private:
void resetReceiveState(); void resetReceiveState();
void resetRTT(); void resetRTT();
void deactivate() { _isActive = false; emit connectionInactive(_destination); }
SendQueue& getSendQueue(); SendQueue& getSendQueue();
SequenceNumber nextACK() const; SequenceNumber nextACK() const;
void updateRTT(int rtt); void updateRTT(int rtt);
@ -123,6 +127,7 @@ private:
p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender
bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection
bool _isActive { true }; // flag used for inactivity of connection
LossList _lossList; // List of all missing packets LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer

View file

@ -65,6 +65,7 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
// Move queue to private thread and start it // Move queue to private thread and start it
queue->moveToThread(thread); queue->moveToThread(thread);
thread->start(); thread->start();
return std::move(queue); return std::move(queue);
@ -89,7 +90,8 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets // call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one(); _emptyCondition.notify_one();
} }
if (!this->thread()->isRunning()) {
if (!this->thread()->isRunning() && _state == State::NotStarted) {
this->thread()->start(); this->thread()->start();
} }
} }
@ -136,13 +138,14 @@ void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
_emptyCondition.notify_one(); _emptyCondition.notify_one();
} }
if (!this->thread()->isRunning()) { if (!this->thread()->isRunning() && _state == State::NotStarted) {
this->thread()->start(); this->thread()->start();
} }
} }
void SendQueue::stop() { void SendQueue::stop() {
_isRunning = false;
_state = State::Stopped;
// in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner // in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner
_handshakeACKCondition.notify_one(); _handshakeACKCondition.notify_one();
@ -268,9 +271,23 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
} }
void SendQueue::run() { void SendQueue::run() {
_isRunning = true; if (_state == State::Stopped) {
// we've already been asked to stop before we even got a chance to start
// don't start now
#ifdef UDT_CONNECTION_DEBUG
qDebug() << "SendQueue asked to run after being told to stop. Will not run.";
#endif
return;
} else if (_state == State::Running) {
#ifdef UDT_CONNECTION_DEBUG
qDebug() << "SendQueue asked to run but is already running (according to state). Will not re-run.";
#endif
return;
}
while (_isRunning) { _state = State::Running;
while (_state == State::Running) {
// Record how long the loop takes to execute // Record how long the loop takes to execute
auto loopStartTimestamp = p_high_resolution_clock::now(); auto loopStartTimestamp = p_high_resolution_clock::now();
@ -314,11 +331,11 @@ void SendQueue::run() {
} }
// since we're a while loop, give the thread a chance to process events // since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents(); QCoreApplication::sendPostedEvents(this, 0);
// we just processed events so check now if we were just told to stop // we just processed events so check now if we were just told to stop
if (!_isRunning) { if (_state != State::Running) {
break; return;
} }
if (_hasReceivedHandshakeACK && !sentAPacket) { if (_hasReceivedHandshakeACK && !sentAPacket) {
@ -525,5 +542,5 @@ void SendQueue::deactivate() {
// this queue is inactive - emit that signal and stop the while // this queue is inactive - emit that signal and stop the while
emit queueInactive(); emit queueInactive();
_isRunning = false; _state = State::Stopped;
} }

View file

@ -45,6 +45,12 @@ class SendQueue : public QObject {
Q_OBJECT Q_OBJECT
public: public:
enum class State {
NotStarted,
Running,
Stopped
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination); static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet); void queuePacket(std::unique_ptr<Packet> packet);
@ -106,7 +112,7 @@ private:
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::atomic<bool> _isRunning { false }; std::atomic<State> _state { State::NotStarted };
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client

View file

@ -183,8 +183,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create())); auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
// we queue the connection to cleanup connection in case it asks for it during its own rate control sync // we queue the connection to cleanup connection in case it asks for it during its own rate control sync
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection, QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection);
Qt::QueuedConnection);
#ifdef UDT_CONNECTION_DEBUG #ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Creating new connection to" << sockAddr; qCDebug(networking) << "Creating new connection to" << sockAddr;
@ -208,11 +207,13 @@ void Socket::clearConnections() {
} }
void Socket::cleanupConnection(HifiSockAddr sockAddr) { void Socket::cleanupConnection(HifiSockAddr sockAddr) {
#ifdef UDT_CONNECTION_DEBUG auto numErased = _connectionsHash.erase(sockAddr);
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
#endif
_connectionsHash.erase(sockAddr); if (numErased > 0) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
#endif
}
} }
void Socket::messageReceived(std::unique_ptr<PacketList> packetList) { void Socket::messageReceived(std::unique_ptr<PacketList> packetList) {
@ -297,8 +298,31 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
void Socket::rateControlSync() { void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
// the way we do this is a little funny looking - we need to avoid the case where we call sync and
// (because of our Qt direct connection to the Connection's signal that it has been deactivated)
// an iterator on _connectionsHash would be invalidated by our own call to cleanupConnection
// collect the sockets for all connections in a vector
std::vector<HifiSockAddr> sockAddrVector;
sockAddrVector.reserve(_connectionsHash.size());
for (auto& connection : _connectionsHash) { for (auto& connection : _connectionsHash) {
connection.second->sync(); sockAddrVector.emplace_back(connection.first);
}
// enumerate that vector of HifiSockAddr objects
for (auto& sockAddr : sockAddrVector) {
// pull out the respective connection via a quick find on the unordered_map
auto it = _connectionsHash.find(sockAddr);
if (it != _connectionsHash.end()) {
// if the connection is erased while calling sync since we are re-using the iterator that was invalidated
// we're good to go
auto& connection = _connectionsHash[sockAddr];
connection->sync();
}
} }
if (_synTimer->interval() != _synInterval) { if (_synTimer->interval() != _synInterval) {