diff --git a/assignment-client/src/octree/OctreeQueryNode.cpp b/assignment-client/src/octree/OctreeQueryNode.cpp index 58fd39f73f..044b44a165 100644 --- a/assignment-client/src/octree/OctreeQueryNode.cpp +++ b/assignment-client/src/octree/OctreeQueryNode.cpp @@ -20,52 +20,8 @@ #include "OctreeSendThread.h" -OctreeQueryNode::~OctreeQueryNode() { - _isShuttingDown = true; - if (_octreeSendThread) { - forceNodeShutdown(); - } -} - void OctreeQueryNode::nodeKilled() { _isShuttingDown = true; - if (_octreeSendThread) { - // just tell our thread we want to shutdown, this is asynchronous, and fast, we don't need or want it to block - // while the thread actually shuts down - _octreeSendThread->setIsShuttingDown(); - } -} - -void OctreeQueryNode::forceNodeShutdown() { - _isShuttingDown = true; - if (_octreeSendThread) { - // we really need to force our thread to shutdown, this is synchronous, we will block while the thread actually - // shuts down because we really need it to shutdown, and it's ok if we wait for it to complete - OctreeSendThread* sendThread = _octreeSendThread; - _octreeSendThread = NULL; - sendThread->setIsShuttingDown(); - sendThread->terminate(); - delete sendThread; - } -} - -void OctreeQueryNode::sendThreadFinished() { - // We've been notified by our thread that it is shutting down. So we can clean up our reference to it, and - // delete the actual thread object. Cleaning up our thread will correctly unroll all refereces to shared - // pointers to our node as well as the octree server assignment - if (_octreeSendThread) { - OctreeSendThread* sendThread = _octreeSendThread; - _octreeSendThread = NULL; - delete sendThread; - } -} - -void OctreeQueryNode::initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) { - _octreeSendThread = new OctreeSendThread(myServer, node); - - // we want to be notified when the thread finishes - connect(_octreeSendThread, &GenericThread::finished, this, &OctreeQueryNode::sendThreadFinished); - _octreeSendThread->initialize(true); } bool OctreeQueryNode::packetIsDuplicate() const { diff --git a/assignment-client/src/octree/OctreeQueryNode.h b/assignment-client/src/octree/OctreeQueryNode.h index 130e7031d1..4313d101fd 100644 --- a/assignment-client/src/octree/OctreeQueryNode.h +++ b/assignment-client/src/octree/OctreeQueryNode.h @@ -29,8 +29,8 @@ class OctreeServer; class OctreeQueryNode : public OctreeQuery { Q_OBJECT public: - OctreeQueryNode() {} - virtual ~OctreeQueryNode(); + OctreeQueryNode() = default; + virtual ~OctreeQueryNode() = default; void init(); // called after creation to set up some virtual items virtual PacketType getMyPacketType() const = 0; @@ -79,9 +79,6 @@ public: OctreeSceneStats stats; - void initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node); - bool isOctreeSendThreadInitalized() { return _octreeSendThread; } - void dumpOutOfView(); quint64 getLastRootTimestamp() const { return _lastRootTimestamp; } @@ -92,7 +89,6 @@ public: void sceneStart(quint64 sceneSendStartTime) { _sceneSendStartTime = sceneSendStartTime; } void nodeKilled(); - void forceNodeShutdown(); bool isShuttingDown() const { return _isShuttingDown; } void octreePacketSent() { packetSent(*_octreePacket); } @@ -104,9 +100,6 @@ public: bool hasNextNackedPacket() const; const NLPacket* getNextNackedPacket(); -private slots: - void sendThreadFinished(); - private: OctreeQueryNode(const OctreeQueryNode &); OctreeQueryNode& operator= (const OctreeQueryNode&); diff --git a/assignment-client/src/octree/OctreeSendThread.h b/assignment-client/src/octree/OctreeSendThread.h index c77a327e70..38f5de722f 100644 --- a/assignment-client/src/octree/OctreeSendThread.h +++ b/assignment-client/src/octree/OctreeSendThread.h @@ -31,6 +31,8 @@ public: virtual ~OctreeSendThread(); void setIsShuttingDown(); + bool isShuttingDown() { return _isShuttingDown; } + QUuid getNodeUuid() const { return _nodeUuid; } static AtomicUIntStat _totalBytes; diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index 4f572760b1..f42431589e 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -868,16 +868,28 @@ void OctreeServer::parsePayload() { } } +void OctreeServer::removeSendThread() { + auto sendThread = static_cast(sender()); + + // This deletes the unique_ptr, so sendThread is destructed after that line + _sendThreads.erase(sendThread->getNodeUuid()); +} + void OctreeServer::handleOctreeQueryPacket(QSharedPointer message, SharedNodePointer senderNode) { - if (!_isFinished) { + if (!_isFinished && !_isShuttingDown) { // If we got a query packet, then we're talking to an agent, and we // need to make sure we have it in our nodeList. auto nodeList = DependencyManager::get(); nodeList->updateNodeWithDataFromPacket(message, senderNode); - OctreeQueryNode* nodeData = dynamic_cast(senderNode->getLinkedData()); - if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { - nodeData->initializeOctreeSendThread(this, senderNode); + auto it = _sendThreads.find(senderNode->getUUID()); + if (it == _sendThreads.end() || it->second->isShuttingDown()) { + auto sendThread = std::unique_ptr(new OctreeSendThread(this, senderNode)); + + // we want to be notified when the thread finishes + connect(sendThread.get(), &GenericThread::finished, this, &OctreeServer::removeSendThread); + sendThread->initialize(true); + _sendThreads.emplace(senderNode->getUUID(), std::move(sendThread)); } } } @@ -1157,6 +1169,12 @@ void OctreeServer::nodeAdded(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) { quint64 start = usecTimestampNow(); + + // Shutdown send thread + auto it = _sendThreads.find(node->getUUID()); + if (it != _sendThreads.end()) { + it->second->setIsShuttingDown(); + } // calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!! _octreeInboundPacketProcessor->nodeKilled(node); @@ -1178,24 +1196,6 @@ void OctreeServer::nodeKilled(SharedNodePointer node) { trackViewerGone(node->getUUID()); } -void OctreeServer::forceNodeShutdown(SharedNodePointer node) { - quint64 start = usecTimestampNow(); - - qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node; - OctreeQueryNode* nodeData = dynamic_cast(node->getLinkedData()); - if (nodeData) { - nodeData->forceNodeShutdown(); // tell our node data and sending threads that we'd like to shut down - } else { - qDebug() << qPrintable(_safeServerName) << "server node missing linked data node:" << *node; - } - - quint64 end = usecTimestampNow(); - quint64 usecsElapsed = (end - start); - qDebug() << qPrintable(_safeServerName) << "server forceNodeShutdown() took: " - << usecsElapsed << " usecs for node:" << *node; -} - - void OctreeServer::aboutToFinish() { qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish..."; @@ -1204,9 +1204,8 @@ void OctreeServer::aboutToFinish() { qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down..."; // we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects. - // This ensures that when we forceNodeShutdown below for each node we don't get any more newly connecting nodes - auto nodeList = DependencyManager::get(); - nodeList->linkedDataCreateCallback = NULL; + // This ensures that we don't get any more newly connecting nodes + DependencyManager::get()->linkedDataCreateCallback = NULL; if (_octreeInboundPacketProcessor) { _octreeInboundPacketProcessor->terminating(); @@ -1216,27 +1215,20 @@ void OctreeServer::aboutToFinish() { _jurisdictionSender->terminating(); } - QSet nodesToShutdown; - - // Force a shutdown of all of our OctreeSendThreads. - // At this point it has to be impossible for a linkedDataCreateCallback to be called for a new node - nodeList->eachNode([&nodesToShutdown](const SharedNodePointer& node) { - nodesToShutdown << node; - }); - - // What follows is a hack to force OctreeSendThreads to cleanup before the OctreeServer is gone. - // I would prefer to allow the SharedNodePointer ref count drop to zero to do this automatically - // but that isn't possible as long as the OctreeSendThread has an OctreeServer* that it uses. - for (auto& node : nodesToShutdown) { - qDebug() << qPrintable(_safeServerName) << "server about to finish while node still connected node:" << *node; - forceNodeShutdown(node); + // Shut down all the send threads + for (auto it = _sendThreads.begin(); it != _sendThreads.end(); ++it) { + it->second->disconnect(this); // Disconnect so that removeSendThread doesn't get called later + it->second->setIsShuttingDown(); } + + // Wait on all send threads to be done before continuing + _sendThreads.clear(); if (_persistThread) { _persistThread->aboutToFinish(); _persistThread->terminating(); } - + qDebug() << qPrintable(_safeServerName) << "server ENDING about to finish..."; } diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 51fe6d46c3..98f6c79893 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -124,7 +124,6 @@ public: bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler); virtual void aboutToFinish(); - void forceNodeShutdown(SharedNodePointer node); public slots: /// runs the octree server assignment @@ -138,6 +137,7 @@ private slots: void handleOctreeQueryPacket(QSharedPointer message, SharedNodePointer senderNode); void handleOctreeDataNackPacket(QSharedPointer message, SharedNodePointer senderNode); void handleJurisdictionRequestPacket(QSharedPointer message, SharedNodePointer senderNode); + void removeSendThread(); protected: virtual OctreePointer createTree() = 0; @@ -190,6 +190,9 @@ protected: time_t _started; quint64 _startedUSecs; QString _safeServerName; + + using SendThreads = std::unordered_map>; + SendThreads _sendThreads; static int _clientCount; static SimpleMovingAverage _averageLoopTime; diff --git a/libraries/shared/src/GenericThread.cpp b/libraries/shared/src/GenericThread.cpp index 4fdd2eb1d2..c1c31ab50a 100644 --- a/libraries/shared/src/GenericThread.cpp +++ b/libraries/shared/src/GenericThread.cpp @@ -15,7 +15,6 @@ GenericThread::GenericThread() : - QObject(), _stopThread(false), _isThreaded(false) // assume non-threaded, must call initialize() {