diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 9517332dcb..179014e838 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -74,20 +74,22 @@ void Connection::resetRTT() { SendQueue& Connection::getSendQueue() { if (!_sendQueue) { - // Lasily create send queue - _sendQueue = SendQueue::create(_parentSocket, _destination); - - qDebug() << "Created SendQueue for connection to" << _destination; - - QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); - QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); - QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); - QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); - - // set defaults on the send queue from our congestion control object and estimatedTimeout() - _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); - _sendQueue->setEstimatedTimeout(estimatedTimeout()); - _sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); + std::call_once(_sendQueueCreateFlag, [this](){ + // Lasily create send queue + _sendQueue = SendQueue::create(_parentSocket, _destination); + + qDebug() << "Created SendQueue for connection to" << _destination; + + QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); + QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); + QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); + QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); + + // set defaults on the send queue from our congestion control object and estimatedTimeout() + _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); + _sendQueue->setEstimatedTimeout(estimatedTimeout()); + _sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); + }); } return *_sendQueue; diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 7914fa0676..97e7d36025 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -145,9 +145,10 @@ private: PacketTimeWindow _receiveWindow { 16, 64 }; // Window of interval between packets (16) and probes (64) for bandwidth and receive speed std::unique_ptr _congestionControl; - + std::unique_ptr _sendQueue; - + std::once_flag _sendQueueCreateFlag; // Guards the creation of SendQueue so it only happens once + std::map _pendingReceivedMessages; int _packetsSinceACK { 0 }; // The number of packets that have been received during the current ACK interval diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index f3ce095b50..94b6100dee 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -143,7 +143,7 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc } Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { - QMutexLocker locker(&_connectionsMutex); + QWriteLocker locker(&_connectionsMutex); auto it = _connectionsHash.find(sockAddr); @@ -157,12 +157,7 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { } void Socket::clearConnections() { - if (thread() != QThread::currentThread()) { - QMetaObject::invokeMethod(this, "clearConnections", Qt::BlockingQueuedConnection); - return; - } - - QMutexLocker locker(&_connectionsMutex); + QWriteLocker locker(&_connectionsMutex); // clear all of the current connections in the socket qDebug() << "Clearing all remaining connections in Socket."; @@ -170,7 +165,7 @@ void Socket::clearConnections() { } void Socket::cleanupConnection(HifiSockAddr sockAddr) { - QMutexLocker locker(&_connectionsMutex); + QWriteLocker locker(&_connectionsMutex); qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; _connectionsHash.erase(sockAddr); @@ -249,6 +244,8 @@ void Socket::readPendingDatagrams() { } void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) { + QReadLocker readLocker(&_connectionsMutex); + auto it = _connectionsHash.find(destinationAddr); if (it != _connectionsHash.end()) { connect(it->second.get(), SIGNAL(packetSent()), receiver, slot); @@ -257,11 +254,15 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r void Socket::rateControlSync() { + QReadLocker readLocker(&_connectionsMutex); + // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control for (auto& connection : _connectionsHash) { connection.second->sync(); } + readLocker.unlock(); + if (_synTimer.interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) // then restart it now with the right interval @@ -278,9 +279,7 @@ void Socket::setCongestionControlFactory(std::unique_ptr Socket::getConnectionSockAddrs() { + QReadLocker readLocker(&_connectionsMutex); + std::vector addr; addr.reserve(_connectionsHash.size()); + for (const auto& connectionPair : _connectionsHash) { addr.push_back(connectionPair.first); } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 23dd313462..fa6bf874a8 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -94,7 +94,7 @@ private: std::unordered_map _unreliableSequenceNumbers; std::unordered_map> _connectionsHash; - QMutex _connectionsMutex; // guards concurrent access to connections hashs + QReadWriteLock _connectionsMutex; // guards concurrent access to connections hashs int _synInterval = 10; // 10ms QTimer _synTimer;