correct locking races in SendQueue/Connection cleanup

This commit is contained in:
Stephen Birarda 2015-09-17 14:13:24 -07:00
parent fb38fd05ed
commit 4758dd2a53
5 changed files with 90 additions and 28 deletions

View file

@ -56,14 +56,15 @@ void Connection::stopSendQueue() {
// grab the send queue thread so we can wait on it
QThread* sendQueueThread = _sendQueue->thread();
// since we're stopping the send queue we should consider our handshake ACK not receieved
_hasReceivedHandshakeACK = false;
// tell the send queue to stop and be deleted
_sendQueue->stop();
_sendQueue->deleteLater();
_sendQueue.release();
// since we're stopping the send queue we should consider our handshake ACK not receieved
_hasReceivedHandshakeACK = false;
// wait on the send queue thread so we know the send queue is gone
sendQueueThread->quit();
sendQueueThread->wait();
@ -111,7 +112,7 @@ void Connection::queueInactive() {
qCDebug(networking) << "Connection SendQueue to" << _destination << "stopped and no data is being received - stopping connection.";
#endif
emit connectionInactive(_destination);
deactivate();
}
}
@ -170,7 +171,9 @@ void Connection::sync() {
qCDebug(networking) << "Connection to" << _destination << "no longer receiving any data and there is currently no send queue - stopping connection.";
#endif
emit connectionInactive(_destination);
deactivate();
return;
}
}
@ -207,7 +210,9 @@ void Connection::sync() {
<< CONNECTION_NOT_USED_EXPIRY_SECONDS << "seconds - stopping connection.";
#endif
emit connectionInactive(_destination);
deactivate();
return;
}
}
}
@ -728,11 +733,14 @@ void Connection::processHandshake(std::unique_ptr<ControlPacket> controlPacket)
}
void Connection::processHandshakeACK(std::unique_ptr<ControlPacket> controlPacket) {
// hand off this handshake ACK to the send queue so it knows it can start sending
getSendQueue().handshakeACK();
// indicate that handshake ACK was received
_hasReceivedHandshakeACK = true;
// if we've decided to clean up the send queue then this handshake ACK should be ignored, it's useless
if (_sendQueue) {
// hand off this handshake ACK to the send queue so it knows it can start sending
getSendQueue().handshakeACK();
// indicate that handshake ACK was received
_hasReceivedHandshakeACK = true;
}
}
void Connection::processTimeoutNAK(std::unique_ptr<ControlPacket> controlPacket) {

View file

@ -71,6 +71,8 @@ public:
void queueReceivedMessagePacket(std::unique_ptr<Packet> packet);
ConnectionStats::Stats sampleStats() { return _stats.sample(); }
bool isActive() const { return _isActive; }
signals:
void packetSent();
@ -100,6 +102,8 @@ private:
void resetReceiveState();
void resetRTT();
void deactivate() { _isActive = false; emit connectionInactive(_destination); }
SendQueue& getSendQueue();
SequenceNumber nextACK() const;
void updateRTT(int rtt);
@ -123,6 +127,7 @@ private:
p_high_resolution_clock::time_point _lastReceiveTime; // holds the last time we received anything from sender
bool _isReceivingData { false }; // flag used for expiry of receipt portion of connection
bool _isActive { true }; // flag used for inactivity of connection
LossList _lossList; // List of all missing packets
SequenceNumber _lastReceivedSequenceNumber; // The largest sequence number received from the peer

View file

@ -65,6 +65,7 @@ std::unique_ptr<SendQueue> SendQueue::create(Socket* socket, HifiSockAddr destin
// Move queue to private thread and start it
queue->moveToThread(thread);
thread->start();
return std::move(queue);
@ -89,7 +90,8 @@ void SendQueue::queuePacket(std::unique_ptr<Packet> packet) {
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
if (!this->thread()->isRunning()) {
if (!this->thread()->isRunning() && _state == State::NotStarted) {
this->thread()->start();
}
}
@ -135,14 +137,15 @@ void SendQueue::queuePacketList(std::unique_ptr<PacketList> packetList) {
// call notify_one on the condition_variable_any in case the send thread is sleeping waiting for packets
_emptyCondition.notify_one();
}
if (!this->thread()->isRunning()) {
this->thread()->start();
}
}
void SendQueue::stop() {
_isRunning = false;
_state = State::Stopped;
// in case we're waiting to send another handshake, release the condition_variable now so we cleanup sooner
_handshakeACKCondition.notify_one();
@ -268,9 +271,23 @@ void SendQueue::sendNewPacketAndAddToSentList(std::unique_ptr<Packet> newPacket,
}
void SendQueue::run() {
_isRunning = true;
if (_state == State::Stopped) {
// we've already been asked to stop before we even got a chance to start
// don't start now
#ifdef UDT_CONNECTION_DEBUG
qDebug() << "SendQueue asked to run after being told to stop. Will not run.";
#endif
return;
} else if (_state == State::Running) {
#ifdef UDT_CONNECTION_DEBUG
qDebug() << "SendQueue asked to run but is already running (according to state). Will not re-run.";
#endif
return;
}
while (_isRunning) {
_state = State::Running;
while (_state == State::Running) {
// Record how long the loop takes to execute
auto loopStartTimestamp = p_high_resolution_clock::now();
@ -314,11 +331,11 @@ void SendQueue::run() {
}
// since we're a while loop, give the thread a chance to process events
QCoreApplication::processEvents();
QCoreApplication::sendPostedEvents(this, 0);
// we just processed events so check now if we were just told to stop
if (!_isRunning) {
break;
if (_state != State::Running) {
return;
}
if (_hasReceivedHandshakeACK && !sentAPacket) {
@ -525,5 +542,5 @@ void SendQueue::deactivate() {
// this queue is inactive - emit that signal and stop the while
emit queueInactive();
_isRunning = false;
_state = State::Stopped;
}

View file

@ -45,6 +45,12 @@ class SendQueue : public QObject {
Q_OBJECT
public:
enum class State {
NotStarted,
Running,
Stopped
};
static std::unique_ptr<SendQueue> create(Socket* socket, HifiSockAddr destination);
void queuePacket(std::unique_ptr<Packet> packet);
@ -106,7 +112,7 @@ private:
std::atomic<uint32_t> _atomicCurrentSequenceNumber { 0 };// Atomic for last sequence number sent out
std::atomic<int> _packetSendPeriod { 0 }; // Interval between two packet send event in microseconds, set from CC
std::atomic<bool> _isRunning { false };
std::atomic<State> _state { State::NotStarted };
std::atomic<int> _estimatedTimeout { 0 }; // Estimated timeout, set from CC
std::atomic<int> _timeoutExpiryCount { 0 }; // The number of times the timeout has expired without response from client

View file

@ -183,8 +183,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) {
auto connection = std::unique_ptr<Connection>(new Connection(this, sockAddr, _ccFactory->create()));
// we queue the connection to cleanup connection in case it asks for it during its own rate control sync
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection,
Qt::QueuedConnection);
QObject::connect(connection.get(), &Connection::connectionInactive, this, &Socket::cleanupConnection);
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Creating new connection to" << sockAddr;
@ -208,11 +207,15 @@ void Socket::clearConnections() {
}
void Socket::cleanupConnection(HifiSockAddr sockAddr) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
#endif
auto it = _connectionsHash.find(sockAddr);
_connectionsHash.erase(sockAddr);
if (it != _connectionsHash.end()) {
#ifdef UDT_CONNECTION_DEBUG
qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr;
#endif
_connectionsHash.erase(sockAddr);
}
}
void Socket::messageReceived(std::unique_ptr<PacketList> packetList) {
@ -297,8 +300,31 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r
void Socket::rateControlSync() {
// enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control
// this way we do this is a little funny looking - we need to avoid the case where we call sync and
// (because of our Qt direct connection to the Connection's signal that it has been deactivated)
// an iterator on _connectionsHash would be invalidated by our own call to cleanupConnection
// collect the sockets for all connections in a vector
std::vector<HifiSockAddr> sockAddrVector;
sockAddrVector.reserve(_connectionsHash.size());
for (auto& connection : _connectionsHash) {
connection.second->sync();
sockAddrVector.emplace_back(connection.first);
}
// enumerate that vector of HifiSockAddr objects
for (auto& sockAddr : sockAddrVector) {
// pull out the respective connection via a quick find on the unordered_map
auto it = _connectionsHash.find(sockAddr);
if (it != _connectionsHash.end()) {
// if the connection is erased while calling sync since we are not holding an iterator that was invalidated
// we're good to go
auto& connection = _connectionsHash[sockAddr];
connection->sync();
}
}
if (_synTimer->interval() != _synInterval) {