diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 1b0d93e2c5..47e5a00b48 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -513,6 +513,7 @@ void AvatarMixer::run() { qDebug() << "Waiting for domain settings from domain-server."; // block until we get the settingsRequestComplete signal + QEventLoop loop; connect(&domainHandler, &DomainHandler::settingsReceived, &loop, &QEventLoop::quit); connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit); diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index 59c0618a83..1d053633a7 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -77,24 +77,20 @@ void Connection::resetRTT() { SendQueue& Connection::getSendQueue() { if (!_sendQueue) { - std::unique_lock locker { _sendQueueMutex }; + // Lasily create send queue + _sendQueue = SendQueue::create(_parentSocket, _destination); - if (!_sendQueue) { - // Lasily create send queue - _sendQueue = SendQueue::create(_parentSocket, _destination); - - qDebug() << "Created SendQueue for connection to" << _destination; - - QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); - QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); - QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); - QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); - - // set defaults on the send queue from our congestion control object and estimatedTimeout() - _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); - _sendQueue->setEstimatedTimeout(estimatedTimeout()); - _sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); - } + qDebug() << "Created SendQueue for connection to" << _destination; + + QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::packetSent); + QObject::connect(_sendQueue.get(), &SendQueue::packetSent, this, &Connection::recordSentPackets); + QObject::connect(_sendQueue.get(), &SendQueue::packetRetransmitted, this, &Connection::recordRetransmission); + QObject::connect(_sendQueue.get(), &SendQueue::queueInactive, this, &Connection::queueInactive); + + // set defaults on the send queue from our congestion control object and estimatedTimeout() + _sendQueue->setPacketSendPeriod(_congestionControl->_packetSendPeriod); + _sendQueue->setEstimatedTimeout(estimatedTimeout()); + _sendQueue->setFlowWindowSize(std::min(_flowWindowSize, (int) _congestionControl->_congestionWindowSize)); } return *_sendQueue; diff --git a/libraries/networking/src/udt/Connection.h b/libraries/networking/src/udt/Connection.h index 39474ccc8e..5895aac167 100644 --- a/libraries/networking/src/udt/Connection.h +++ b/libraries/networking/src/udt/Connection.h @@ -149,7 +149,6 @@ private: std::unique_ptr _congestionControl; std::unique_ptr _sendQueue; - std::mutex _sendQueueMutex; // Guards the creation of SendQueue so it only happens once std::map _pendingReceivedMessages; diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index ee86560a9f..628804ed9f 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -24,9 +24,15 @@ using namespace udt; +Q_DECLARE_METATYPE(Packet*); +Q_DECLARE_METATYPE(PacketList*); + Socket::Socket(QObject* parent) : QObject(parent) { + qRegisterMetaType(); + qRegisterMetaType(); + connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); // make sure our synchronization method is called every SYN interval @@ -97,8 +103,20 @@ qint64 Socket::writePacket(const Packet& packet, const HifiSockAddr& sockAddr) { } qint64 Socket::writePacket(std::unique_ptr packet, const HifiSockAddr& sockAddr) { + if (packet->isReliable()) { - findOrCreateConnection(sockAddr).sendReliablePacket(move(packet)); + // hand this packet off to writeReliablePacket + // because Qt can't invoke with the unique_ptr we have to release it here and re-construct in writeReliablePacket + + if (QThread::currentThread() != thread()) { + qDebug() << "About to invoke with" << packet.get(); + QMetaObject::invokeMethod(this, "writeReliablePacket", Qt::QueuedConnection, + Q_ARG(Packet*, packet.release()), + Q_ARG(HifiSockAddr, sockAddr)); + } else { + writeReliablePacket(packet.release(), sockAddr); + } + return 0; } @@ -107,9 +125,17 @@ qint64 Socket::writePacket(std::unique_ptr packet, const HifiSockAddr& s qint64 Socket::writePacketList(std::unique_ptr packetList, const HifiSockAddr& sockAddr) { if (packetList->isReliable()) { - // Reliable and Ordered - // Reliable and Unordered - findOrCreateConnection(sockAddr).sendReliablePacketList(move(packetList)); + // hand this packetList off to writeReliablePacketList + // because Qt can't invoke with the unique_ptr we have to release it here and re-construct in writeReliablePacketList + + if (QThread::currentThread() != thread()) { + QMetaObject::invokeMethod(this, "writeReliablePacketList", Qt::QueuedConnection, + Q_ARG(PacketList*, packetList.release()), + Q_ARG(HifiSockAddr, sockAddr)); + } else { + writeReliablePacketList(packetList.release(), sockAddr); + } + return 0; } @@ -122,6 +148,14 @@ qint64 Socket::writePacketList(std::unique_ptr packetList, const Hif return totalBytesSent; } +void Socket::writeReliablePacket(Packet* packet, const HifiSockAddr& sockAddr) { + findOrCreateConnection(sockAddr).sendReliablePacket(std::unique_ptr(packet)); +} + +void Socket::writeReliablePacketList(PacketList* packetList, const HifiSockAddr& sockAddr) { + findOrCreateConnection(sockAddr).sendReliablePacketList(std::unique_ptr(packetList)); +} + qint64 Socket::writeDatagram(const char* data, qint64 size, const HifiSockAddr& sockAddr) { return writeDatagram(QByteArray::fromRawData(data, size), sockAddr); } @@ -143,8 +177,6 @@ qint64 Socket::writeDatagram(const QByteArray& datagram, const HifiSockAddr& soc } Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { - QWriteLocker locker(&_connectionsMutex); - auto it = _connectionsHash.find(sockAddr); if (it == _connectionsHash.end()) { @@ -157,7 +189,10 @@ Connection& Socket::findOrCreateConnection(const HifiSockAddr& sockAddr) { } void Socket::clearConnections() { - QWriteLocker locker(&_connectionsMutex); + if (QThread::currentThread() != thread()) { + QMetaObject::invokeMethod(this, "clearConnections", Qt::BlockingQueuedConnection); + return; + } // clear all of the current connections in the socket qDebug() << "Clearing all remaining connections in Socket."; @@ -165,8 +200,6 @@ void Socket::clearConnections() { } void Socket::cleanupConnection(HifiSockAddr sockAddr) { - QWriteLocker locker(&_connectionsMutex); - qCDebug(networking) << "Socket::cleanupConnection called for UDT connection to" << sockAddr; _connectionsHash.erase(sockAddr); } @@ -244,8 +277,6 @@ void Socket::readPendingDatagrams() { } void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot) { - QReadLocker readLocker(&_connectionsMutex); - auto it = _connectionsHash.find(destinationAddr); if (it != _connectionsHash.end()) { connect(it->second.get(), SIGNAL(packetSent()), receiver, slot); @@ -254,15 +285,11 @@ void Socket::connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* r void Socket::rateControlSync() { - QWriteLocker writeLocker(&_connectionsMutex); - // enumerate our list of connections and ask each of them to send off periodic ACK packet for rate control for (auto& connection : _connectionsHash) { connection.second->sync(); } - writeLocker.unlock(); - if (_synTimer.interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) // then restart it now with the right interval @@ -279,8 +306,6 @@ void Socket::setCongestionControlFactory(std::unique_ptrsecond->sampleStats(); @@ -289,9 +314,7 @@ ConnectionStats::Stats Socket::sampleStatsForConnection(const HifiSockAddr& dest } } -std::vector Socket::getConnectionSockAddrs() { - QReadLocker readLocker(&_connectionsMutex); - +std::vector Socket::getConnectionSockAddrs() { std::vector addr; addr.reserve(_connectionsHash.size()); diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 6e639d842d..4bcee9efbf 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -32,6 +32,7 @@ class ControlSender; class Packet; class PacketList; class SequenceNumber; +class UDTTest; using PacketFilterOperator = std::function; @@ -65,13 +66,8 @@ public: { _unfilteredHandlers[senderSockAddr] = handler; } void setCongestionControlFactory(std::unique_ptr ccFactory); - - void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot); void messageReceived(std::unique_ptr packetList); - - ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination); - std::vector getConnectionSockAddrs(); public slots: void cleanupConnection(HifiSockAddr sockAddr); @@ -84,6 +80,14 @@ private slots: private: void setSystemBufferSizes(); Connection& findOrCreateConnection(const HifiSockAddr& sockAddr); + + // privatized methods used by UDTTest - they are private since they must be called on the Socket thread + ConnectionStats::Stats sampleStatsForConnection(const HifiSockAddr& destination); + std::vector getConnectionSockAddrs(); + void connectToSendSignal(const HifiSockAddr& destinationAddr, QObject* receiver, const char* slot); + + Q_INVOKABLE void writeReliablePacket(Packet* packet, const HifiSockAddr& sockAddr); + Q_INVOKABLE void writeReliablePacketList(PacketList* packetList, const HifiSockAddr& sockAddr); QUdpSocket _udpSocket { this }; PacketFilterOperator _packetFilterOperator; @@ -94,12 +98,12 @@ private: std::unordered_map _unreliableSequenceNumbers; std::unordered_map> _connectionsHash; - QReadWriteLock _connectionsMutex { QReadWriteLock::Recursive }; // guards concurrent access to connections hashs - int _synInterval = 10; // 10ms QTimer _synTimer; std::unique_ptr _ccFactory { new CongestionControlFactory() }; + + friend class UDTTest; }; } // namespace udt