From d476146a56fab23afd4b01e33b86d35af690ca76 Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Thu, 10 May 2018 10:47:49 -0700 Subject: [PATCH] Move OctreePersistThread off GenericThread and add octree negotiation --- assignment-client/src/octree/OctreeServer.cpp | 162 ++++---------- assignment-client/src/octree/OctreeServer.h | 26 +-- libraries/octree/src/OctreePersistThread.cpp | 205 ++++++++++++------ libraries/octree/src/OctreePersistThread.h | 19 +- 4 files changed, 208 insertions(+), 204 deletions(-) diff --git a/assignment-client/src/octree/OctreeServer.cpp b/assignment-client/src/octree/OctreeServer.cpp index 83fbf6f3d8..b8329b4d3e 100644 --- a/assignment-client/src/octree/OctreeServer.cpp +++ b/assignment-client/src/octree/OctreeServer.cpp @@ -231,18 +231,19 @@ void OctreeServer::trackProcessWaitTime(float time) { OctreeServer::OctreeServer(ReceivedMessage& message) : ThreadedAssignment(message), _argc(0), - _argv(NULL), - _parsedArgV(NULL), + _argv(nullptr), + _parsedArgV(nullptr), + _httpManager(nullptr), _statusPort(0), _packetsPerClientPerInterval(10), _packetsTotalPerInterval(DEFAULT_PACKETS_PER_INTERVAL), - _tree(NULL), + _tree(nullptr), _wantPersist(true), _debugSending(false), _debugReceiving(false), _verboseDebug(false), - _octreeInboundPacketProcessor(NULL), - _persistThread(NULL), + _octreeInboundPacketProcessor(nullptr), + _persistManager(nullptr), _started(time(0)), _startedUSecs(usecTimestampNow()) { @@ -265,11 +266,8 @@ OctreeServer::~OctreeServer() { _octreeInboundPacketProcessor->deleteLater(); } - if (_persistThread) { - _persistThread->terminating(); - _persistThread->terminate(); - _persistThread->deleteLater(); - } + qDebug() << "Waiting for persist thread to come down"; + _persistThread.wait(); // cleanup our tree here... qDebug() << qPrintable(_safeServerName) << "server START cleaning up octree... [" << this << "]"; @@ -1117,111 +1115,14 @@ void OctreeServer::run() { } void OctreeServer::domainSettingsRequestComplete() { - if (_state != OctreeServerState::WaitingForDomainSettings) { - qCWarning(octree_server) << "Received domain settings after they have already been received"; - return; - } - auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); - packetReceiver.registerListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket"); packetReceiver.registerListener(PacketType::OctreeDataNack, this, "handleOctreeDataNackPacket"); - - packetReceiver.registerListener(PacketType::OctreeDataFileReply, this, "handleOctreeDataFileReply"); + packetReceiver.registerListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket"); qDebug(octree_server) << "Received domain settings"; readConfiguration(); - _state = OctreeServerState::WaitingForOctreeDataNegotation; - - auto nodeList = DependencyManager::get(); - const DomainHandler& domainHandler = nodeList->getDomainHandler(); - - auto packet = NLPacket::create(PacketType::OctreeDataFileRequest, -1, true, false); - - OctreeUtils::RawOctreeData data; - qCDebug(octree_server) << "Reading octree data from" << _persistAbsoluteFilePath; - if (data.readOctreeDataInfoFromFile(_persistAbsoluteFilePath)) { - qCDebug(octree_server) << "Current octree data: ID(" << data.id << ") DataVersion(" << data.version << ")"; - packet->writePrimitive(true); - auto id = data.id.toRfc4122(); - packet->write(id); - packet->writePrimitive(data.version); - } else { - qCWarning(octree_server) << "No octree data found"; - packet->writePrimitive(false); - } - - qCDebug(octree_server) << "Sending request for octree data to DS"; - nodeList->sendPacket(std::move(packet), domainHandler.getSockAddr()); -} - -void OctreeServer::handleOctreeDataFileReply(QSharedPointer message) { - if (_state != OctreeServerState::WaitingForOctreeDataNegotation) { - qCWarning(octree_server) << "Server received ocree data file reply but is not currently negotiating."; - return; - } - - bool includesNewData; - message->readPrimitive(&includesNewData); - QByteArray replaceData; - if (includesNewData) { - replaceData = message->readAll(); - qDebug() << "Got reply to octree data file request, new data sent"; - } else { - qDebug() << "Got reply to octree data file request, current entity data is sufficient"; - - OctreeUtils::RawEntityData data; - qCDebug(octree_server) << "Reading octree data from" << _persistAbsoluteFilePath; - if (data.readOctreeDataInfoFromFile(_persistAbsoluteFilePath)) { - if (data.id.isNull()) { - qCDebug(octree_server) << "Current octree data has a null id, updating"; - data.resetIdAndVersion(); - - QFile file(_persistAbsoluteFilePath); - if (file.open(QIODevice::WriteOnly)) { - auto entityData = data.toGzippedByteArray(); - file.write(entityData); - file.close(); - } else { - qCDebug(octree_server) << "Failed to update octree data"; - } - } - } - } - - _state = OctreeServerState::Running; - beginRunning(replaceData); -} - -void OctreeServer::beginRunning(QByteArray replaceData) { - if (_state != OctreeServerState::Running) { - qCWarning(octree_server) << "Server is not running"; - return; - } - - auto nodeList = DependencyManager::get(); - - // we need to ask the DS about agents so we can ping/reply with them - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); - - beforeRun(); // after payload has been processed - - connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer))); - connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); - -#ifndef WIN32 - setvbuf(stdout, NULL, _IOLBF, 0); -#endif - - nodeList->linkedDataCreateCallback = [this](Node* node) { - auto queryNodeData = createOctreeQueryNode(); - queryNodeData->init(); - node->setLinkedData(std::move(queryNodeData)); - }; - - srand((unsigned)time(0)); - // if we want Persistence, set up the local file and persist thread if (_wantPersist) { static const QString ENTITY_PERSIST_EXTENSION = ".json.gz"; @@ -1279,10 +1180,42 @@ void OctreeServer::beginRunning(QByteArray replaceData) { auto persistFileDirectory = QFileInfo(_persistAbsoluteFilePath).absolutePath(); // now set up PersistThread - _persistThread = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _persistInterval, _debugTimestampNow, - _persistAsFileType, replaceData); - _persistThread->initialize(true); + _persistManager = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _persistInterval, _debugTimestampNow, + _persistAsFileType); + _persistManager->moveToThread(&_persistThread); + connect(&_persistThread, &QThread::finished, _persistManager, &QObject::deleteLater); + connect(&_persistThread, &QThread::started, _persistManager, &OctreePersistThread::start); + connect(_persistManager, &OctreePersistThread::loadCompleted, this, [this]() { + beginRunning(); + }); + _persistThread.start(); + } else { + beginRunning(); } +} + +void OctreeServer::beginRunning() { + auto nodeList = DependencyManager::get(); + + // we need to ask the DS about agents so we can ping/reply with them + nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + + beforeRun(); // after payload has been processed + + connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer))); + connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); + +#ifndef WIN32 + setvbuf(stdout, nullptr, _IOLBF, 0); +#endif + + nodeList->linkedDataCreateCallback = [this](Node* node) { + auto queryNodeData = createOctreeQueryNode(); + queryNodeData->init(); + node->setLinkedData(std::move(queryNodeData)); + }; + + srand((unsigned)time(0)); // set up our OctreeServerPacketProcessor _octreeInboundPacketProcessor = new OctreeInboundPacketProcessor(this); @@ -1345,7 +1278,7 @@ void OctreeServer::aboutToFinish() { qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down..."; - // we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects. + // we're going down - set the NodeList linkedDataCallback to nullptr so we do not create any more OctreeQueryNode objects. // This ensures that we don't get any more newly connecting nodes DependencyManager::get()->linkedDataCreateCallback = nullptr; @@ -1363,9 +1296,8 @@ void OctreeServer::aboutToFinish() { // which waits on the thread to be done before returning _sendThreads.clear(); // Cleans up all the send threads. - if (_persistThread) { - _persistThread->aboutToFinish(); - _persistThread->terminating(); + if (_persistManager) { + _persistThread.quit(); } qDebug() << qPrintable(_safeServerName) << "server ENDING about to finish..."; diff --git a/assignment-client/src/octree/OctreeServer.h b/assignment-client/src/octree/OctreeServer.h index 3595fe1129..5c157b46b2 100644 --- a/assignment-client/src/octree/OctreeServer.h +++ b/assignment-client/src/octree/OctreeServer.h @@ -33,12 +33,6 @@ Q_DECLARE_LOGGING_CATEGORY(octree_server) const int DEFAULT_PACKETS_PER_INTERVAL = 2000; // some 120,000 packets per second total -enum class OctreeServerState { - WaitingForDomainSettings, - WaitingForOctreeDataNegotation, - Running -}; - /// Handles assignments of type OctreeServer - sending octrees to various clients. class OctreeServer : public ThreadedAssignment, public HTTPRequestHandler { Q_OBJECT @@ -46,8 +40,6 @@ public: OctreeServer(ReceivedMessage& message); ~OctreeServer(); - OctreeServerState _state { OctreeServerState::WaitingForDomainSettings }; - /// allows setting of run arguments void setArguments(int argc, char** argv); @@ -68,12 +60,12 @@ public: static void clientConnected() { _clientCount++; } static void clientDisconnected() { _clientCount--; } - bool isInitialLoadComplete() const { return (_persistThread) ? _persistThread->isInitialLoadComplete() : true; } - bool isPersistEnabled() const { return (_persistThread) ? true : false; } - quint64 getLoadElapsedTime() const { return (_persistThread) ? _persistThread->getLoadElapsedTime() : 0; } - QString getPersistFilename() const { return (_persistThread) ? _persistThread->getPersistFilename() : ""; } - QString getPersistFileMimeType() const { return (_persistThread) ? _persistThread->getPersistFileMimeType() : "text/plain"; } - QByteArray getPersistFileContents() const { return (_persistThread) ? _persistThread->getPersistFileContents() : QByteArray(); } + bool isInitialLoadComplete() const { return (_persistManager) ? _persistManager->isInitialLoadComplete() : true; } + bool isPersistEnabled() const { return (_persistManager) ? true : false; } + quint64 getLoadElapsedTime() const { return (_persistManager) ? _persistManager->getLoadElapsedTime() : 0; } + QString getPersistFilename() const { return (_persistManager) ? _persistManager->getPersistFilename() : ""; } + QString getPersistFileMimeType() const { return (_persistManager) ? _persistManager->getPersistFileMimeType() : "text/plain"; } + QByteArray getPersistFileContents() const { return (_persistManager) ? _persistManager->getPersistFileContents() : QByteArray(); } // Subclasses must implement these methods virtual std::unique_ptr createOctreeQueryNode() = 0; @@ -149,7 +141,6 @@ private slots: void domainSettingsRequestComplete(); void handleOctreeQueryPacket(QSharedPointer message, SharedNodePointer senderNode); void handleOctreeDataNackPacket(QSharedPointer message, SharedNodePointer senderNode); - void handleOctreeDataFileReply(QSharedPointer message); void removeSendThread(); protected: @@ -171,7 +162,7 @@ protected: QString getConfiguration(); QString getStatusLink(); - void beginRunning(QByteArray replaceData); + void beginRunning(); UniqueSendThread createSendThread(const SharedNodePointer& node); virtual UniqueSendThread newSendThread(const SharedNodePointer& node) = 0; @@ -199,7 +190,8 @@ protected: bool _debugTimestampNow; bool _verboseDebug; OctreeInboundPacketProcessor* _octreeInboundPacketProcessor; - OctreePersistThread* _persistThread; + OctreePersistThread* _persistManager; + QThread _persistThread; int _persistInterval; bool _persistFileDownload; diff --git a/libraries/octree/src/OctreePersistThread.cpp b/libraries/octree/src/OctreePersistThread.cpp index 5f05c3cc85..94d76b26ec 100644 --- a/libraries/octree/src/OctreePersistThread.cpp +++ b/libraries/octree/src/OctreePersistThread.cpp @@ -38,12 +38,11 @@ const int OctreePersistThread::DEFAULT_PERSIST_INTERVAL = 1000 * 30; // every 30 seconds OctreePersistThread::OctreePersistThread(OctreePointer tree, const QString& filename, int persistInterval, - bool debugTimestampNow, QString persistAsFileType, const QByteArray& replacementData) : + bool debugTimestampNow, QString persistAsFileType) : _tree(tree), _filename(filename), _persistInterval(persistInterval), _initialLoadComplete(false), - _replacementData(replacementData), _loadTimeUSecs(0), _lastCheck(0), _debugTimestampNow(debugTimestampNow), @@ -55,6 +54,140 @@ OctreePersistThread::OctreePersistThread(OctreePointer tree, const QString& file _filename = sansExt + "." + _persistAsFileType; } +void OctreePersistThread::start() { + cleanupOldReplacementBackups(); + + QFile tempFile { getTempFilename() }; + if (tempFile.exists()) { + qWarning(octree) << "Found temporary octree file at" << tempFile.fileName(); + qDebug(octree) << "Attempting to recover from temporary octree file"; + QFile::remove(_filename); + if (tempFile.rename(_filename)) { + qDebug(octree) << "Successfully recovered from temporary octree file"; + } else { + qWarning(octree) << "Failed to recover from temporary octree file"; + tempFile.remove(); + } + } + + auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); + packetReceiver.registerListener(PacketType::OctreeDataFileReply, this, "handleOctreeDataFileReply"); + + auto nodeList = DependencyManager::get(); + const DomainHandler& domainHandler = nodeList->getDomainHandler(); + + auto packet = NLPacket::create(PacketType::OctreeDataFileRequest, -1, true, false); + + OctreeUtils::RawOctreeData data; + qCDebug(octree) << "Reading octree data from" << _filename; + if (data.readOctreeDataInfoFromFile(_filename)) { + qCDebug(octree) << "Current octree data: ID(" << data.id << ") DataVersion(" << data.version << ")"; + packet->writePrimitive(true); + auto id = data.id.toRfc4122(); + packet->write(id); + packet->writePrimitive(data.version); + } else { + qCWarning(octree) << "No octree data found"; + packet->writePrimitive(false); + } + + qCDebug(octree) << "Sending request for octree data to DS"; + nodeList->sendPacket(std::move(packet), domainHandler.getSockAddr()); +} + +void OctreePersistThread::handleOctreeDataFileReply(QSharedPointer message) { + if (_initialLoadComplete) { + qCWarning(octree) << "Received OctreeDataFileReply after initial load had completed"; + return; + } + + bool includesNewData; + message->readPrimitive(&includesNewData); + QByteArray replacementData; + if (includesNewData) { + replacementData = message->readAll(); + replaceData(replacementData); + qDebug() << "Got reply to octree data file request, new data sent"; + } else { + qDebug() << "Got reply to octree data file request, current entity data is sufficient"; + + OctreeUtils::RawEntityData data; + qCDebug(octree) << "Reading octree data from" << _filename; + if (data.readOctreeDataInfoFromFile(_filename)) { + if (data.id.isNull()) { + qCDebug(octree) << "Current octree data has a null id, updating"; + data.resetIdAndVersion(); + + QFile file(_filename); + if (file.open(QIODevice::WriteOnly)) { + auto entityData = data.toGzippedByteArray(); + file.write(entityData); + file.close(); + } else { + qCDebug(octree) << "Failed to update octree data"; + } + } + } + } + + quint64 loadStarted = usecTimestampNow(); + qCDebug(octree) << "loading Octrees from file: " << _filename << "..."; + + OctreeUtils::RawOctreeData data; + if (data.readOctreeDataInfoFromFile(_filename)) { + qDebug() << "Setting entity version info to: " << data.id << data.version; + _tree->setOctreeVersionInfo(data.id, data.version); + } + + bool persistentFileRead; + + _tree->withWriteLock([&] { + PerformanceWarning warn(true, "Loading Octree File", true); + + persistentFileRead = _tree->readFromFile(_filename.toLocal8Bit().constData()); + _tree->pruneTree(); + }); + + quint64 loadDone = usecTimestampNow(); + _loadTimeUSecs = loadDone - loadStarted; + + _tree->clearDirtyBit(); // the tree is clean since we just loaded it + qCDebug(octree, "DONE loading Octrees from file... fileRead=%s", debug::valueOf(persistentFileRead)); + + unsigned long nodeCount = OctreeElement::getNodeCount(); + unsigned long internalNodeCount = OctreeElement::getInternalNodeCount(); + unsigned long leafNodeCount = OctreeElement::getLeafNodeCount(); + qCDebug(octree, "Nodes after loading scene %lu nodes %lu internal %lu leaves", nodeCount, internalNodeCount, leafNodeCount); + + bool wantDebug = false; + if (wantDebug) { + double usecPerGet = (double)OctreeElement::getGetChildAtIndexTime() + / (double)OctreeElement::getGetChildAtIndexCalls(); + qCDebug(octree) << "getChildAtIndexCalls=" << OctreeElement::getGetChildAtIndexCalls() + << " getChildAtIndexTime=" << OctreeElement::getGetChildAtIndexTime() << " perGet=" << usecPerGet; + + double usecPerSet = (double)OctreeElement::getSetChildAtIndexTime() + / (double)OctreeElement::getSetChildAtIndexCalls(); + qCDebug(octree) << "setChildAtIndexCalls=" << OctreeElement::getSetChildAtIndexCalls() + << " setChildAtIndexTime=" << OctreeElement::getSetChildAtIndexTime() << " perSet=" << usecPerSet; + } + + _initialLoadComplete = true; + + // Since we just loaded the persistent file, we can consider ourselves as having "just checked" for persistance. + _lastCheck = usecTimestampNow(); // we just loaded, no need to save again + + if (replacementData.isNull()) { + sendLatestEntityDataToDS(); + } + + qDebug() << "Starting timer"; + QTimer::singleShot(_persistInterval, this, &OctreePersistThread::process); + + emit loadCompleted(); +} + + QString OctreePersistThread::getPersistFileMimeType() const { if (_persistAsFileType == "json") { return "application/json"; @@ -96,68 +229,9 @@ bool OctreePersistThread::backupCurrentFile() { } bool OctreePersistThread::process() { + qDebug() << "Processing..."; - if (!_initialLoadComplete) { - quint64 loadStarted = usecTimestampNow(); - qCDebug(octree) << "loading Octrees from file: " << _filename << "..."; - - if (!_replacementData.isNull()) { - replaceData(_replacementData); - } - - OctreeUtils::RawOctreeData data; - if (data.readOctreeDataInfoFromFile(_filename)) { - qDebug() << "Setting entity version info to: " << data.id << data.dataVersion; - _tree->setOctreeVersionInfo(data.id, data.dataVersion); - } - - bool persistentFileRead; - - _tree->withWriteLock([&] { - PerformanceWarning warn(true, "Loading Octree File", true); - - persistentFileRead = _tree->readFromFile(_filename.toLocal8Bit().constData()); - _tree->pruneTree(); - }); - - quint64 loadDone = usecTimestampNow(); - _loadTimeUSecs = loadDone - loadStarted; - - _tree->clearDirtyBit(); // the tree is clean since we just loaded it - qCDebug(octree, "DONE loading Octrees from file... fileRead=%s", debug::valueOf(persistentFileRead)); - - unsigned long nodeCount = OctreeElement::getNodeCount(); - unsigned long internalNodeCount = OctreeElement::getInternalNodeCount(); - unsigned long leafNodeCount = OctreeElement::getLeafNodeCount(); - qCDebug(octree, "Nodes after loading scene %lu nodes %lu internal %lu leaves", nodeCount, internalNodeCount, leafNodeCount); - - bool wantDebug = false; - if (wantDebug) { - double usecPerGet = (double)OctreeElement::getGetChildAtIndexTime() - / (double)OctreeElement::getGetChildAtIndexCalls(); - qCDebug(octree) << "getChildAtIndexCalls=" << OctreeElement::getGetChildAtIndexCalls() - << " getChildAtIndexTime=" << OctreeElement::getGetChildAtIndexTime() << " perGet=" << usecPerGet; - - double usecPerSet = (double)OctreeElement::getSetChildAtIndexTime() - / (double)OctreeElement::getSetChildAtIndexCalls(); - qCDebug(octree) << "setChildAtIndexCalls=" << OctreeElement::getSetChildAtIndexCalls() - << " setChildAtIndexTime=" << OctreeElement::getSetChildAtIndexTime() << " perSet=" << usecPerSet; - } - - _initialLoadComplete = true; - - // Since we just loaded the persistent file, we can consider ourselves as having "just checked" for persistance. - _lastCheck = usecTimestampNow(); // we just loaded, no need to save again - - if (_replacementData.isNull()) { - sendLatestEntityDataToDS(); - } - _replacementData.clear(); - - emit loadCompleted(); - } - - if (isStillRunning()) { + if (true) { //isStillRunning()) { quint64 MSECS_TO_USECS = 1000; quint64 USECS_TO_SLEEP = 10 * MSECS_TO_USECS; // every 10ms std::this_thread::sleep_for(std::chrono::microseconds(USECS_TO_SLEEP)); @@ -186,14 +260,15 @@ bool OctreePersistThread::process() { } } - return isStillRunning(); // keep running till they terminate us + //return isStillRunning(); // keep running till they terminate us + QTimer::singleShot(1000, this, &OctreePersistThread::process); + return true; } void OctreePersistThread::aboutToFinish() { qCDebug(octree) << "Persist thread about to finish..."; persist(); qCDebug(octree) << "Persist thread done with about to finish..."; - _stopThread = true; } QByteArray OctreePersistThread::getPersistFileContents() const { diff --git a/libraries/octree/src/OctreePersistThread.h b/libraries/octree/src/OctreePersistThread.h index 990a3b1fad..5f161c100f 100644 --- a/libraries/octree/src/OctreePersistThread.h +++ b/libraries/octree/src/OctreePersistThread.h @@ -18,7 +18,7 @@ #include #include "Octree.h" -class OctreePersistThread : public GenericThread { +class OctreePersistThread : public QObject { Q_OBJECT public: class BackupRule { @@ -36,8 +36,7 @@ public: const QString& filename, int persistInterval = DEFAULT_PERSIST_INTERVAL, bool debugTimestampNow = false, - QString persistAsFileType = "json.gz", - const QByteArray& replacementData = QByteArray()); + QString persistAsFileType = "json.gz"); bool isInitialLoadComplete() const { return _initialLoadComplete; } quint64 getLoadElapsedTime() const { return _loadTimeUSecs; } @@ -48,25 +47,31 @@ public: void aboutToFinish(); /// call this to inform the persist thread that the owner is about to finish to support final persist +public slots: + void start(); + signals: void loadCompleted(); -protected: - /// Implements generic processing behavior for this thread. - virtual bool process() override; +protected slots: + bool process(); + void handleOctreeDataFileReply(QSharedPointer message); +protected: void persist(); bool backupCurrentFile(); + void cleanupOldReplacementBackups(); void replaceData(QByteArray data); void sendLatestEntityDataToDS(); + QString getTempFilename() const { return _filename + ".temp"; } + private: OctreePointer _tree; QString _filename; int _persistInterval; bool _initialLoadComplete; - QByteArray _replacementData; quint64 _loadTimeUSecs;