From fc80745c08b7836e594ffbb994700f37722bc620 Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Sun, 30 Aug 2015 20:03:15 -0700 Subject: [PATCH 1/4] Update message handling to use insertion sort --- libraries/networking/src/udt/Connection.cpp | 25 ++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index b8dbc7c62e..e2cdbb8651 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -825,30 +825,35 @@ void PendingReceivedMessage::enqueuePacket(std::unique_ptr packet) { return; } + auto sequenceNumber = packet->getSequenceNumber(); + if (packet->getPacketPosition() == Packet::PacketPosition::FIRST) { _hasFirstSequenceNumber = true; - _firstSequenceNumber = packet->getSequenceNumber(); + _firstSequenceNumber = sequenceNumber; } else if (packet->getPacketPosition() == Packet::PacketPosition::LAST) { _hasLastSequenceNumber = true; - _lastSequenceNumber = packet->getSequenceNumber(); + _lastSequenceNumber = sequenceNumber; } else if (packet->getPacketPosition() == Packet::PacketPosition::ONLY) { _hasFirstSequenceNumber = true; _hasLastSequenceNumber = true; - _firstSequenceNumber = packet->getSequenceNumber(); - _lastSequenceNumber = packet->getSequenceNumber(); + _firstSequenceNumber = sequenceNumber; + _lastSequenceNumber = sequenceNumber; } - _packets.push_back(std::move(packet)); + // Insert into the packets list in sorted order. Because we generally expect to receive packets in order, begin + // searching from the end of the list. + auto it = _packets.rbegin(); + for (auto rend = _packets.rend(); it != rend; ++it) { + if (sequenceNumber > (*it)->getSequenceNumber()) { + break; + } + } + _packets.insert(it.base(), std::move(packet)); if (_hasFirstSequenceNumber && _hasLastSequenceNumber) { auto numPackets = udt::seqlen(_firstSequenceNumber, _lastSequenceNumber); if (uint64_t(numPackets) == _packets.size()) { _isComplete = true; - - // Sort packets by sequence number - _packets.sort([](std::unique_ptr& a, std::unique_ptr& b) { - return a->getSequenceNumber() < b->getSequenceNumber(); - }); } } } From de2bfd0d0d04226954c705b1caba8edf61b03388 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 31 Aug 2015 11:30:31 -0700 Subject: [PATCH 2/4] some QThread cleanup and fix in Socket --- interface/src/Application.cpp | 12 ++++++++---- libraries/networking/src/AssetClient.cpp | 5 +++++ libraries/networking/src/LimitedNodeList.cpp | 2 +- libraries/networking/src/udt/SendQueue.cpp | 2 +- libraries/networking/src/udt/Socket.cpp | 8 ++++---- libraries/networking/src/udt/Socket.h | 3 ++- libraries/shared/src/GenericThread.cpp | 3 ++- 7 files changed, 23 insertions(+), 12 deletions(-) diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 1daba493d9..2474668fe9 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -457,8 +457,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) : audioThread->start(); - - QThread* assetThread = new QThread(); + QThread* assetThread = new QThread; assetThread->setObjectName("Asset Thread"); auto assetClient = DependencyManager::get(); @@ -467,7 +466,6 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) : assetThread->start(); - const DomainHandler& domainHandler = nodeList->getDomainHandler(); connect(&domainHandler, SIGNAL(hostnameChanged(const QString&)), SLOT(domainChanged(const QString&))); @@ -878,6 +876,12 @@ Application::~Application() { DependencyManager::destroy(); DependencyManager::destroy(); DependencyManager::destroy(); + + // cleanup the AssetClient thread + QThread* assetThread = DependencyManager::get()->thread(); + DependencyManager::destroy(); + assetThread->quit(); + assetThread->wait(); QThread* nodeThread = DependencyManager::get()->thread(); @@ -887,7 +891,7 @@ Application::~Application() { // ask the node thread to quit and wait until it is done nodeThread->quit(); nodeThread->wait(); - + Leapmotion::destroy(); RealSense::destroy(); ConnexionClient::getInstance().destroy(); diff --git a/libraries/networking/src/AssetClient.cpp b/libraries/networking/src/AssetClient.cpp index 4d228fc6bd..2d89a7259f 100644 --- a/libraries/networking/src/AssetClient.cpp +++ b/libraries/networking/src/AssetClient.cpp @@ -24,6 +24,11 @@ MessageID AssetClient::_currentID = 0; AssetClient::AssetClient() { + + setCustomDeleter([](Dependency* dependency){ + static_cast(dependency)->deleteLater(); + }); + auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerListener(PacketType::AssetGetInfoReply, this, "handleAssetGetInfoReply"); packetReceiver.registerMessageListener(PacketType::AssetGetReply, this, "handleAssetGetReply"); diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index e5923b059f..2d1c98287c 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -66,7 +66,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short firstCall = false; } - + qRegisterMetaType("ConnectionStep"); _nodeSocket.bind(QHostAddress::AnyIPv4, socketListenPort); diff --git a/libraries/networking/src/udt/SendQueue.cpp b/libraries/networking/src/udt/SendQueue.cpp index e088cb8d8b..8210848fec 100644 --- a/libraries/networking/src/udt/SendQueue.cpp +++ b/libraries/networking/src/udt/SendQueue.cpp @@ -53,7 +53,7 @@ std::unique_ptr SendQueue::create(Socket* socket, HifiSockAddr destin Q_ASSERT_X(socket, "SendQueue::create", "Must be called with a valid Socket*"); // Setup queue private thread - QThread* thread = new QThread(); + QThread* thread = new QThread; thread->setObjectName("Networking: SendQueue " + destination.objectName()); // Name thread for easier debug connect(thread, &QThread::started, queue.get(), &SendQueue::run); diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index f3ce095b50..1c3d6ca557 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -30,10 +30,10 @@ Socket::Socket(QObject* parent) : connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); // make sure our synchronization method is called every SYN interval - connect(&_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); + connect(_synTimer, &QTimer::timeout, this, &Socket::rateControlSync); // start our timer for the synchronization time interval - _synTimer.start(_synInterval); + _synTimer->start(_synInterval); } void Socket::rebind() { @@ -262,10 +262,10 @@ void Socket::rateControlSync() { connection.second->sync(); } - if (_synTimer.interval() != _synInterval) { + if (_synTimer->interval() != _synInterval) { // if the _synTimer interval doesn't match the current _synInterval (changes when the CC factory is changed) // then restart it now with the right interval - _synTimer.start(_synInterval); + _synTimer->start(_synInterval); } } diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 23dd313462..564fba6539 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -43,6 +43,7 @@ class Socket : public QObject { Q_OBJECT public: Socket(QObject* object = 0); + ~Socket(); quint16 localPort() const { return _udpSocket.localPort(); } @@ -97,7 +98,7 @@ private: QMutex _connectionsMutex; // guards concurrent access to connections hashs int _synInterval = 10; // 10ms - QTimer _synTimer; + QTimer* _synTimer; std::unique_ptr _ccFactory { new CongestionControlFactory() }; }; diff --git a/libraries/shared/src/GenericThread.cpp b/libraries/shared/src/GenericThread.cpp index be984d5899..18f5224229 100644 --- a/libraries/shared/src/GenericThread.cpp +++ b/libraries/shared/src/GenericThread.cpp @@ -19,6 +19,7 @@ GenericThread::GenericThread(QObject* parent) : _stopThread(false), _isThreaded(false) // assume non-threaded, must call initialize() { + } GenericThread::~GenericThread() { @@ -32,7 +33,7 @@ void GenericThread::initialize(bool isThreaded, QThread::Priority priority) { _isThreaded = isThreaded; if (_isThreaded) { _thread = new QThread(this); - + // match the thread name to our object name _thread->setObjectName(objectName()); From 46d90b4f3196d042747c1a362b6cb3b1fff381d2 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 31 Aug 2015 11:40:06 -0700 Subject: [PATCH 3/4] =?UTF-8?q?make=20sure=20syn=20timer=20is=C2=A0moved?= =?UTF-8?q?=20to=20Socket=20thread?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libraries/networking/src/udt/Socket.cpp | 3 ++- libraries/networking/src/udt/Socket.h | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/udt/Socket.cpp b/libraries/networking/src/udt/Socket.cpp index 1c3d6ca557..c9ae878cee 100644 --- a/libraries/networking/src/udt/Socket.cpp +++ b/libraries/networking/src/udt/Socket.cpp @@ -25,7 +25,8 @@ using namespace udt; Socket::Socket(QObject* parent) : - QObject(parent) + QObject(parent), + _synTimer(new QTimer(this)) { connect(&_udpSocket, &QUdpSocket::readyRead, this, &Socket::readPendingDatagrams); diff --git a/libraries/networking/src/udt/Socket.h b/libraries/networking/src/udt/Socket.h index 564fba6539..faf8aeedf7 100644 --- a/libraries/networking/src/udt/Socket.h +++ b/libraries/networking/src/udt/Socket.h @@ -43,7 +43,6 @@ class Socket : public QObject { Q_OBJECT public: Socket(QObject* object = 0); - ~Socket(); quint16 localPort() const { return _udpSocket.localPort(); } From e8fba991fa97785639acd8b675105e894ee595e0 Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Mon, 31 Aug 2015 12:35:41 -0700 Subject: [PATCH 4/4] Update raw loop to use find_if for message packet sorting --- libraries/networking/src/udt/Connection.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/udt/Connection.cpp b/libraries/networking/src/udt/Connection.cpp index e2cdbb8651..368a6d459e 100644 --- a/libraries/networking/src/udt/Connection.cpp +++ b/libraries/networking/src/udt/Connection.cpp @@ -842,11 +842,8 @@ void PendingReceivedMessage::enqueuePacket(std::unique_ptr packet) { // Insert into the packets list in sorted order. Because we generally expect to receive packets in order, begin // searching from the end of the list. - auto it = _packets.rbegin(); - for (auto rend = _packets.rend(); it != rend; ++it) { - if (sequenceNumber > (*it)->getSequenceNumber()) { - break; - } + auto it = find_if(_packets.rbegin(), _packets.rend(), + [&](const std::unique_ptr& packet) { return sequenceNumber > packet->getSequenceNumber(); }); } _packets.insert(it.base(), std::move(packet));