Move OctreeSendThread to the OctreeServer

This commit is contained in:
Atlante45 2015-12-16 10:04:20 -08:00
parent 51ec7ae2ed
commit a541fdd2df
6 changed files with 40 additions and 95 deletions

View file

@ -20,52 +20,8 @@
#include "OctreeSendThread.h"
OctreeQueryNode::~OctreeQueryNode() {
_isShuttingDown = true;
if (_octreeSendThread) {
forceNodeShutdown();
}
}
void OctreeQueryNode::nodeKilled() {
_isShuttingDown = true;
if (_octreeSendThread) {
// just tell our thread we want to shutdown, this is asynchronous, and fast, we don't need or want it to block
// while the thread actually shuts down
_octreeSendThread->setIsShuttingDown();
}
}
void OctreeQueryNode::forceNodeShutdown() {
_isShuttingDown = true;
if (_octreeSendThread) {
// we really need to force our thread to shutdown, this is synchronous, we will block while the thread actually
// shuts down because we really need it to shutdown, and it's ok if we wait for it to complete
OctreeSendThread* sendThread = _octreeSendThread;
_octreeSendThread = NULL;
sendThread->setIsShuttingDown();
sendThread->terminate();
delete sendThread;
}
}
void OctreeQueryNode::sendThreadFinished() {
// We've been notified by our thread that it is shutting down. So we can clean up our reference to it, and
// delete the actual thread object. Cleaning up our thread will correctly unroll all refereces to shared
// pointers to our node as well as the octree server assignment
if (_octreeSendThread) {
OctreeSendThread* sendThread = _octreeSendThread;
_octreeSendThread = NULL;
delete sendThread;
}
}
void OctreeQueryNode::initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) {
_octreeSendThread = new OctreeSendThread(myServer, node);
// we want to be notified when the thread finishes
connect(_octreeSendThread, &GenericThread::finished, this, &OctreeQueryNode::sendThreadFinished);
_octreeSendThread->initialize(true);
}
bool OctreeQueryNode::packetIsDuplicate() const {

View file

@ -29,8 +29,8 @@ class OctreeServer;
class OctreeQueryNode : public OctreeQuery {
Q_OBJECT
public:
OctreeQueryNode() {}
virtual ~OctreeQueryNode();
OctreeQueryNode() = default;
virtual ~OctreeQueryNode() = default;
void init(); // called after creation to set up some virtual items
virtual PacketType getMyPacketType() const = 0;
@ -79,9 +79,6 @@ public:
OctreeSceneStats stats;
void initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node);
bool isOctreeSendThreadInitalized() { return _octreeSendThread; }
void dumpOutOfView();
quint64 getLastRootTimestamp() const { return _lastRootTimestamp; }
@ -92,7 +89,6 @@ public:
void sceneStart(quint64 sceneSendStartTime) { _sceneSendStartTime = sceneSendStartTime; }
void nodeKilled();
void forceNodeShutdown();
bool isShuttingDown() const { return _isShuttingDown; }
void octreePacketSent() { packetSent(*_octreePacket); }
@ -104,9 +100,6 @@ public:
bool hasNextNackedPacket() const;
const NLPacket* getNextNackedPacket();
private slots:
void sendThreadFinished();
private:
OctreeQueryNode(const OctreeQueryNode &);
OctreeQueryNode& operator= (const OctreeQueryNode&);

View file

@ -31,6 +31,8 @@ public:
virtual ~OctreeSendThread();
void setIsShuttingDown();
bool isShuttingDown() { return _isShuttingDown; }
QUuid getNodeUuid() const { return _nodeUuid; }
static AtomicUIntStat _totalBytes;

View file

@ -868,16 +868,28 @@ void OctreeServer::parsePayload() {
}
}
void OctreeServer::removeSendThread() {
auto sendThread = static_cast<OctreeSendThread*>(sender());
// This deletes the unique_ptr, so sendThread is destructed after that line
_sendThreads.erase(sendThread->getNodeUuid());
}
void OctreeServer::handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
if (!_isFinished) {
if (!_isFinished && !_isShuttingDown) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
auto nodeList = DependencyManager::get<NodeList>();
nodeList->updateNodeWithDataFromPacket(message, senderNode);
OctreeQueryNode* nodeData = dynamic_cast<OctreeQueryNode*>(senderNode->getLinkedData());
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, senderNode);
auto it = _sendThreads.find(senderNode->getUUID());
if (it == _sendThreads.end() || it->second->isShuttingDown()) {
auto sendThread = std::unique_ptr<OctreeSendThread>(new OctreeSendThread(this, senderNode));
// we want to be notified when the thread finishes
connect(sendThread.get(), &GenericThread::finished, this, &OctreeServer::removeSendThread);
sendThread->initialize(true);
_sendThreads.emplace(senderNode->getUUID(), std::move(sendThread));
}
}
}
@ -1157,6 +1169,12 @@ void OctreeServer::nodeAdded(SharedNodePointer node) {
void OctreeServer::nodeKilled(SharedNodePointer node) {
quint64 start = usecTimestampNow();
// Shutdown send thread
auto it = _sendThreads.find(node->getUUID());
if (it != _sendThreads.end()) {
it->second->setIsShuttingDown();
}
// calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!!
_octreeInboundPacketProcessor->nodeKilled(node);
@ -1178,24 +1196,6 @@ void OctreeServer::nodeKilled(SharedNodePointer node) {
trackViewerGone(node->getUUID());
}
void OctreeServer::forceNodeShutdown(SharedNodePointer node) {
quint64 start = usecTimestampNow();
qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node;
OctreeQueryNode* nodeData = dynamic_cast<OctreeQueryNode*>(node->getLinkedData());
if (nodeData) {
nodeData->forceNodeShutdown(); // tell our node data and sending threads that we'd like to shut down
} else {
qDebug() << qPrintable(_safeServerName) << "server node missing linked data node:" << *node;
}
quint64 end = usecTimestampNow();
quint64 usecsElapsed = (end - start);
qDebug() << qPrintable(_safeServerName) << "server forceNodeShutdown() took: "
<< usecsElapsed << " usecs for node:" << *node;
}
void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish...";
@ -1204,9 +1204,8 @@ void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
// we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects.
// This ensures that when we forceNodeShutdown below for each node we don't get any more newly connecting nodes
auto nodeList = DependencyManager::get<NodeList>();
nodeList->linkedDataCreateCallback = NULL;
// This ensures that we don't get any more newly connecting nodes
DependencyManager::get<NodeList>()->linkedDataCreateCallback = NULL;
if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->terminating();
@ -1216,27 +1215,20 @@ void OctreeServer::aboutToFinish() {
_jurisdictionSender->terminating();
}
QSet<SharedNodePointer> nodesToShutdown;
// Force a shutdown of all of our OctreeSendThreads.
// At this point it has to be impossible for a linkedDataCreateCallback to be called for a new node
nodeList->eachNode([&nodesToShutdown](const SharedNodePointer& node) {
nodesToShutdown << node;
});
// What follows is a hack to force OctreeSendThreads to cleanup before the OctreeServer is gone.
// I would prefer to allow the SharedNodePointer ref count drop to zero to do this automatically
// but that isn't possible as long as the OctreeSendThread has an OctreeServer* that it uses.
for (auto& node : nodesToShutdown) {
qDebug() << qPrintable(_safeServerName) << "server about to finish while node still connected node:" << *node;
forceNodeShutdown(node);
// Shut down all the send threads
for (auto it = _sendThreads.begin(); it != _sendThreads.end(); ++it) {
it->second->disconnect(this); // Disconnect so that removeSendThread doesn't get called later
it->second->setIsShuttingDown();
}
// Wait on all send threads to be done before continuing
_sendThreads.clear();
if (_persistThread) {
_persistThread->aboutToFinish();
_persistThread->terminating();
}
qDebug() << qPrintable(_safeServerName) << "server ENDING about to finish...";
}

View file

@ -124,7 +124,6 @@ public:
bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler);
virtual void aboutToFinish();
void forceNodeShutdown(SharedNodePointer node);
public slots:
/// runs the octree server assignment
@ -138,6 +137,7 @@ private slots:
void handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleOctreeDataNackPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleJurisdictionRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void removeSendThread();
protected:
virtual OctreePointer createTree() = 0;
@ -190,6 +190,9 @@ protected:
time_t _started;
quint64 _startedUSecs;
QString _safeServerName;
using SendThreads = std::unordered_map<QUuid, std::unique_ptr<OctreeSendThread>>;
SendThreads _sendThreads;
static int _clientCount;
static SimpleMovingAverage _averageLoopTime;

View file

@ -15,7 +15,6 @@
GenericThread::GenericThread() :
QObject(),
_stopThread(false),
_isThreaded(false) // assume non-threaded, must call initialize()
{