Move OctreePersistThread off GenericThread and add octree negotiation

This commit is contained in:
Ryan Huffman 2018-05-10 10:47:49 -07:00
parent 2143bae100
commit d476146a56
4 changed files with 208 additions and 204 deletions

View file

@ -231,18 +231,19 @@ void OctreeServer::trackProcessWaitTime(float time) {
OctreeServer::OctreeServer(ReceivedMessage& message) :
ThreadedAssignment(message),
_argc(0),
_argv(NULL),
_parsedArgV(NULL),
_argv(nullptr),
_parsedArgV(nullptr),
_httpManager(nullptr),
_statusPort(0),
_packetsPerClientPerInterval(10),
_packetsTotalPerInterval(DEFAULT_PACKETS_PER_INTERVAL),
_tree(NULL),
_tree(nullptr),
_wantPersist(true),
_debugSending(false),
_debugReceiving(false),
_verboseDebug(false),
_octreeInboundPacketProcessor(NULL),
_persistThread(NULL),
_octreeInboundPacketProcessor(nullptr),
_persistManager(nullptr),
_started(time(0)),
_startedUSecs(usecTimestampNow())
{
@ -265,11 +266,8 @@ OctreeServer::~OctreeServer() {
_octreeInboundPacketProcessor->deleteLater();
}
if (_persistThread) {
_persistThread->terminating();
_persistThread->terminate();
_persistThread->deleteLater();
}
qDebug() << "Waiting for persist thread to come down";
_persistThread.wait();
// cleanup our tree here...
qDebug() << qPrintable(_safeServerName) << "server START cleaning up octree... [" << this << "]";
@ -1117,111 +1115,14 @@ void OctreeServer::run() {
}
void OctreeServer::domainSettingsRequestComplete() {
if (_state != OctreeServerState::WaitingForDomainSettings) {
qCWarning(octree_server) << "Received domain settings after they have already been received";
return;
}
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket");
packetReceiver.registerListener(PacketType::OctreeDataNack, this, "handleOctreeDataNackPacket");
packetReceiver.registerListener(PacketType::OctreeDataFileReply, this, "handleOctreeDataFileReply");
packetReceiver.registerListener(getMyQueryMessageType(), this, "handleOctreeQueryPacket");
qDebug(octree_server) << "Received domain settings";
readConfiguration();
_state = OctreeServerState::WaitingForOctreeDataNegotation;
auto nodeList = DependencyManager::get<NodeList>();
const DomainHandler& domainHandler = nodeList->getDomainHandler();
auto packet = NLPacket::create(PacketType::OctreeDataFileRequest, -1, true, false);
OctreeUtils::RawOctreeData data;
qCDebug(octree_server) << "Reading octree data from" << _persistAbsoluteFilePath;
if (data.readOctreeDataInfoFromFile(_persistAbsoluteFilePath)) {
qCDebug(octree_server) << "Current octree data: ID(" << data.id << ") DataVersion(" << data.version << ")";
packet->writePrimitive(true);
auto id = data.id.toRfc4122();
packet->write(id);
packet->writePrimitive(data.version);
} else {
qCWarning(octree_server) << "No octree data found";
packet->writePrimitive(false);
}
qCDebug(octree_server) << "Sending request for octree data to DS";
nodeList->sendPacket(std::move(packet), domainHandler.getSockAddr());
}
void OctreeServer::handleOctreeDataFileReply(QSharedPointer<ReceivedMessage> message) {
if (_state != OctreeServerState::WaitingForOctreeDataNegotation) {
qCWarning(octree_server) << "Server received ocree data file reply but is not currently negotiating.";
return;
}
bool includesNewData;
message->readPrimitive(&includesNewData);
QByteArray replaceData;
if (includesNewData) {
replaceData = message->readAll();
qDebug() << "Got reply to octree data file request, new data sent";
} else {
qDebug() << "Got reply to octree data file request, current entity data is sufficient";
OctreeUtils::RawEntityData data;
qCDebug(octree_server) << "Reading octree data from" << _persistAbsoluteFilePath;
if (data.readOctreeDataInfoFromFile(_persistAbsoluteFilePath)) {
if (data.id.isNull()) {
qCDebug(octree_server) << "Current octree data has a null id, updating";
data.resetIdAndVersion();
QFile file(_persistAbsoluteFilePath);
if (file.open(QIODevice::WriteOnly)) {
auto entityData = data.toGzippedByteArray();
file.write(entityData);
file.close();
} else {
qCDebug(octree_server) << "Failed to update octree data";
}
}
}
}
_state = OctreeServerState::Running;
beginRunning(replaceData);
}
void OctreeServer::beginRunning(QByteArray replaceData) {
if (_state != OctreeServerState::Running) {
qCWarning(octree_server) << "Server is not running";
return;
}
auto nodeList = DependencyManager::get<NodeList>();
// we need to ask the DS about agents so we can ping/reply with them
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
beforeRun(); // after payload has been processed
connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
#ifndef WIN32
setvbuf(stdout, NULL, _IOLBF, 0);
#endif
nodeList->linkedDataCreateCallback = [this](Node* node) {
auto queryNodeData = createOctreeQueryNode();
queryNodeData->init();
node->setLinkedData(std::move(queryNodeData));
};
srand((unsigned)time(0));
// if we want Persistence, set up the local file and persist thread
if (_wantPersist) {
static const QString ENTITY_PERSIST_EXTENSION = ".json.gz";
@ -1279,10 +1180,42 @@ void OctreeServer::beginRunning(QByteArray replaceData) {
auto persistFileDirectory = QFileInfo(_persistAbsoluteFilePath).absolutePath();
// now set up PersistThread
_persistThread = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _persistInterval, _debugTimestampNow,
_persistAsFileType, replaceData);
_persistThread->initialize(true);
_persistManager = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _persistInterval, _debugTimestampNow,
_persistAsFileType);
_persistManager->moveToThread(&_persistThread);
connect(&_persistThread, &QThread::finished, _persistManager, &QObject::deleteLater);
connect(&_persistThread, &QThread::started, _persistManager, &OctreePersistThread::start);
connect(_persistManager, &OctreePersistThread::loadCompleted, this, [this]() {
beginRunning();
});
_persistThread.start();
} else {
beginRunning();
}
}
void OctreeServer::beginRunning() {
auto nodeList = DependencyManager::get<NodeList>();
// we need to ask the DS about agents so we can ping/reply with them
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
beforeRun(); // after payload has been processed
connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
#ifndef WIN32
setvbuf(stdout, nullptr, _IOLBF, 0);
#endif
nodeList->linkedDataCreateCallback = [this](Node* node) {
auto queryNodeData = createOctreeQueryNode();
queryNodeData->init();
node->setLinkedData(std::move(queryNodeData));
};
srand((unsigned)time(0));
// set up our OctreeServerPacketProcessor
_octreeInboundPacketProcessor = new OctreeInboundPacketProcessor(this);
@ -1345,7 +1278,7 @@ void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
// we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects.
// we're going down - set the NodeList linkedDataCallback to nullptr so we do not create any more OctreeQueryNode objects.
// This ensures that we don't get any more newly connecting nodes
DependencyManager::get<NodeList>()->linkedDataCreateCallback = nullptr;
@ -1363,9 +1296,8 @@ void OctreeServer::aboutToFinish() {
// which waits on the thread to be done before returning
_sendThreads.clear(); // Cleans up all the send threads.
if (_persistThread) {
_persistThread->aboutToFinish();
_persistThread->terminating();
if (_persistManager) {
_persistThread.quit();
}
qDebug() << qPrintable(_safeServerName) << "server ENDING about to finish...";

View file

@ -33,12 +33,6 @@ Q_DECLARE_LOGGING_CATEGORY(octree_server)
const int DEFAULT_PACKETS_PER_INTERVAL = 2000; // some 120,000 packets per second total
enum class OctreeServerState {
WaitingForDomainSettings,
WaitingForOctreeDataNegotation,
Running
};
/// Handles assignments of type OctreeServer - sending octrees to various clients.
class OctreeServer : public ThreadedAssignment, public HTTPRequestHandler {
Q_OBJECT
@ -46,8 +40,6 @@ public:
OctreeServer(ReceivedMessage& message);
~OctreeServer();
OctreeServerState _state { OctreeServerState::WaitingForDomainSettings };
/// allows setting of run arguments
void setArguments(int argc, char** argv);
@ -68,12 +60,12 @@ public:
static void clientConnected() { _clientCount++; }
static void clientDisconnected() { _clientCount--; }
bool isInitialLoadComplete() const { return (_persistThread) ? _persistThread->isInitialLoadComplete() : true; }
bool isPersistEnabled() const { return (_persistThread) ? true : false; }
quint64 getLoadElapsedTime() const { return (_persistThread) ? _persistThread->getLoadElapsedTime() : 0; }
QString getPersistFilename() const { return (_persistThread) ? _persistThread->getPersistFilename() : ""; }
QString getPersistFileMimeType() const { return (_persistThread) ? _persistThread->getPersistFileMimeType() : "text/plain"; }
QByteArray getPersistFileContents() const { return (_persistThread) ? _persistThread->getPersistFileContents() : QByteArray(); }
bool isInitialLoadComplete() const { return (_persistManager) ? _persistManager->isInitialLoadComplete() : true; }
bool isPersistEnabled() const { return (_persistManager) ? true : false; }
quint64 getLoadElapsedTime() const { return (_persistManager) ? _persistManager->getLoadElapsedTime() : 0; }
QString getPersistFilename() const { return (_persistManager) ? _persistManager->getPersistFilename() : ""; }
QString getPersistFileMimeType() const { return (_persistManager) ? _persistManager->getPersistFileMimeType() : "text/plain"; }
QByteArray getPersistFileContents() const { return (_persistManager) ? _persistManager->getPersistFileContents() : QByteArray(); }
// Subclasses must implement these methods
virtual std::unique_ptr<OctreeQueryNode> createOctreeQueryNode() = 0;
@ -149,7 +141,6 @@ private slots:
void domainSettingsRequestComplete();
void handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleOctreeDataNackPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleOctreeDataFileReply(QSharedPointer<ReceivedMessage> message);
void removeSendThread();
protected:
@ -171,7 +162,7 @@ protected:
QString getConfiguration();
QString getStatusLink();
void beginRunning(QByteArray replaceData);
void beginRunning();
UniqueSendThread createSendThread(const SharedNodePointer& node);
virtual UniqueSendThread newSendThread(const SharedNodePointer& node) = 0;
@ -199,7 +190,8 @@ protected:
bool _debugTimestampNow;
bool _verboseDebug;
OctreeInboundPacketProcessor* _octreeInboundPacketProcessor;
OctreePersistThread* _persistThread;
OctreePersistThread* _persistManager;
QThread _persistThread;
int _persistInterval;
bool _persistFileDownload;

View file

@ -38,12 +38,11 @@
const int OctreePersistThread::DEFAULT_PERSIST_INTERVAL = 1000 * 30; // every 30 seconds
OctreePersistThread::OctreePersistThread(OctreePointer tree, const QString& filename, int persistInterval,
bool debugTimestampNow, QString persistAsFileType, const QByteArray& replacementData) :
bool debugTimestampNow, QString persistAsFileType) :
_tree(tree),
_filename(filename),
_persistInterval(persistInterval),
_initialLoadComplete(false),
_replacementData(replacementData),
_loadTimeUSecs(0),
_lastCheck(0),
_debugTimestampNow(debugTimestampNow),
@ -55,6 +54,140 @@ OctreePersistThread::OctreePersistThread(OctreePointer tree, const QString& file
_filename = sansExt + "." + _persistAsFileType;
}
void OctreePersistThread::start() {
cleanupOldReplacementBackups();
QFile tempFile { getTempFilename() };
if (tempFile.exists()) {
qWarning(octree) << "Found temporary octree file at" << tempFile.fileName();
qDebug(octree) << "Attempting to recover from temporary octree file";
QFile::remove(_filename);
if (tempFile.rename(_filename)) {
qDebug(octree) << "Successfully recovered from temporary octree file";
} else {
qWarning(octree) << "Failed to recover from temporary octree file";
tempFile.remove();
}
}
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::OctreeDataFileReply, this, "handleOctreeDataFileReply");
auto nodeList = DependencyManager::get<NodeList>();
const DomainHandler& domainHandler = nodeList->getDomainHandler();
auto packet = NLPacket::create(PacketType::OctreeDataFileRequest, -1, true, false);
OctreeUtils::RawOctreeData data;
qCDebug(octree) << "Reading octree data from" << _filename;
if (data.readOctreeDataInfoFromFile(_filename)) {
qCDebug(octree) << "Current octree data: ID(" << data.id << ") DataVersion(" << data.version << ")";
packet->writePrimitive(true);
auto id = data.id.toRfc4122();
packet->write(id);
packet->writePrimitive(data.version);
} else {
qCWarning(octree) << "No octree data found";
packet->writePrimitive(false);
}
qCDebug(octree) << "Sending request for octree data to DS";
nodeList->sendPacket(std::move(packet), domainHandler.getSockAddr());
}
void OctreePersistThread::handleOctreeDataFileReply(QSharedPointer<ReceivedMessage> message) {
if (_initialLoadComplete) {
qCWarning(octree) << "Received OctreeDataFileReply after initial load had completed";
return;
}
bool includesNewData;
message->readPrimitive(&includesNewData);
QByteArray replacementData;
if (includesNewData) {
replacementData = message->readAll();
replaceData(replacementData);
qDebug() << "Got reply to octree data file request, new data sent";
} else {
qDebug() << "Got reply to octree data file request, current entity data is sufficient";
OctreeUtils::RawEntityData data;
qCDebug(octree) << "Reading octree data from" << _filename;
if (data.readOctreeDataInfoFromFile(_filename)) {
if (data.id.isNull()) {
qCDebug(octree) << "Current octree data has a null id, updating";
data.resetIdAndVersion();
QFile file(_filename);
if (file.open(QIODevice::WriteOnly)) {
auto entityData = data.toGzippedByteArray();
file.write(entityData);
file.close();
} else {
qCDebug(octree) << "Failed to update octree data";
}
}
}
}
quint64 loadStarted = usecTimestampNow();
qCDebug(octree) << "loading Octrees from file: " << _filename << "...";
OctreeUtils::RawOctreeData data;
if (data.readOctreeDataInfoFromFile(_filename)) {
qDebug() << "Setting entity version info to: " << data.id << data.version;
_tree->setOctreeVersionInfo(data.id, data.version);
}
bool persistentFileRead;
_tree->withWriteLock([&] {
PerformanceWarning warn(true, "Loading Octree File", true);
persistentFileRead = _tree->readFromFile(_filename.toLocal8Bit().constData());
_tree->pruneTree();
});
quint64 loadDone = usecTimestampNow();
_loadTimeUSecs = loadDone - loadStarted;
_tree->clearDirtyBit(); // the tree is clean since we just loaded it
qCDebug(octree, "DONE loading Octrees from file... fileRead=%s", debug::valueOf(persistentFileRead));
unsigned long nodeCount = OctreeElement::getNodeCount();
unsigned long internalNodeCount = OctreeElement::getInternalNodeCount();
unsigned long leafNodeCount = OctreeElement::getLeafNodeCount();
qCDebug(octree, "Nodes after loading scene %lu nodes %lu internal %lu leaves", nodeCount, internalNodeCount, leafNodeCount);
bool wantDebug = false;
if (wantDebug) {
double usecPerGet = (double)OctreeElement::getGetChildAtIndexTime()
/ (double)OctreeElement::getGetChildAtIndexCalls();
qCDebug(octree) << "getChildAtIndexCalls=" << OctreeElement::getGetChildAtIndexCalls()
<< " getChildAtIndexTime=" << OctreeElement::getGetChildAtIndexTime() << " perGet=" << usecPerGet;
double usecPerSet = (double)OctreeElement::getSetChildAtIndexTime()
/ (double)OctreeElement::getSetChildAtIndexCalls();
qCDebug(octree) << "setChildAtIndexCalls=" << OctreeElement::getSetChildAtIndexCalls()
<< " setChildAtIndexTime=" << OctreeElement::getSetChildAtIndexTime() << " perSet=" << usecPerSet;
}
_initialLoadComplete = true;
// Since we just loaded the persistent file, we can consider ourselves as having "just checked" for persistance.
_lastCheck = usecTimestampNow(); // we just loaded, no need to save again
if (replacementData.isNull()) {
sendLatestEntityDataToDS();
}
qDebug() << "Starting timer";
QTimer::singleShot(_persistInterval, this, &OctreePersistThread::process);
emit loadCompleted();
}
QString OctreePersistThread::getPersistFileMimeType() const {
if (_persistAsFileType == "json") {
return "application/json";
@ -96,68 +229,9 @@ bool OctreePersistThread::backupCurrentFile() {
}
bool OctreePersistThread::process() {
qDebug() << "Processing...";
if (!_initialLoadComplete) {
quint64 loadStarted = usecTimestampNow();
qCDebug(octree) << "loading Octrees from file: " << _filename << "...";
if (!_replacementData.isNull()) {
replaceData(_replacementData);
}
OctreeUtils::RawOctreeData data;
if (data.readOctreeDataInfoFromFile(_filename)) {
qDebug() << "Setting entity version info to: " << data.id << data.dataVersion;
_tree->setOctreeVersionInfo(data.id, data.dataVersion);
}
bool persistentFileRead;
_tree->withWriteLock([&] {
PerformanceWarning warn(true, "Loading Octree File", true);
persistentFileRead = _tree->readFromFile(_filename.toLocal8Bit().constData());
_tree->pruneTree();
});
quint64 loadDone = usecTimestampNow();
_loadTimeUSecs = loadDone - loadStarted;
_tree->clearDirtyBit(); // the tree is clean since we just loaded it
qCDebug(octree, "DONE loading Octrees from file... fileRead=%s", debug::valueOf(persistentFileRead));
unsigned long nodeCount = OctreeElement::getNodeCount();
unsigned long internalNodeCount = OctreeElement::getInternalNodeCount();
unsigned long leafNodeCount = OctreeElement::getLeafNodeCount();
qCDebug(octree, "Nodes after loading scene %lu nodes %lu internal %lu leaves", nodeCount, internalNodeCount, leafNodeCount);
bool wantDebug = false;
if (wantDebug) {
double usecPerGet = (double)OctreeElement::getGetChildAtIndexTime()
/ (double)OctreeElement::getGetChildAtIndexCalls();
qCDebug(octree) << "getChildAtIndexCalls=" << OctreeElement::getGetChildAtIndexCalls()
<< " getChildAtIndexTime=" << OctreeElement::getGetChildAtIndexTime() << " perGet=" << usecPerGet;
double usecPerSet = (double)OctreeElement::getSetChildAtIndexTime()
/ (double)OctreeElement::getSetChildAtIndexCalls();
qCDebug(octree) << "setChildAtIndexCalls=" << OctreeElement::getSetChildAtIndexCalls()
<< " setChildAtIndexTime=" << OctreeElement::getSetChildAtIndexTime() << " perSet=" << usecPerSet;
}
_initialLoadComplete = true;
// Since we just loaded the persistent file, we can consider ourselves as having "just checked" for persistance.
_lastCheck = usecTimestampNow(); // we just loaded, no need to save again
if (_replacementData.isNull()) {
sendLatestEntityDataToDS();
}
_replacementData.clear();
emit loadCompleted();
}
if (isStillRunning()) {
if (true) { //isStillRunning()) {
quint64 MSECS_TO_USECS = 1000;
quint64 USECS_TO_SLEEP = 10 * MSECS_TO_USECS; // every 10ms
std::this_thread::sleep_for(std::chrono::microseconds(USECS_TO_SLEEP));
@ -186,14 +260,15 @@ bool OctreePersistThread::process() {
}
}
return isStillRunning(); // keep running till they terminate us
//return isStillRunning(); // keep running till they terminate us
QTimer::singleShot(1000, this, &OctreePersistThread::process);
return true;
}
void OctreePersistThread::aboutToFinish() {
qCDebug(octree) << "Persist thread about to finish...";
persist();
qCDebug(octree) << "Persist thread done with about to finish...";
_stopThread = true;
}
QByteArray OctreePersistThread::getPersistFileContents() const {

View file

@ -18,7 +18,7 @@
#include <GenericThread.h>
#include "Octree.h"
class OctreePersistThread : public GenericThread {
class OctreePersistThread : public QObject {
Q_OBJECT
public:
class BackupRule {
@ -36,8 +36,7 @@ public:
const QString& filename,
int persistInterval = DEFAULT_PERSIST_INTERVAL,
bool debugTimestampNow = false,
QString persistAsFileType = "json.gz",
const QByteArray& replacementData = QByteArray());
QString persistAsFileType = "json.gz");
bool isInitialLoadComplete() const { return _initialLoadComplete; }
quint64 getLoadElapsedTime() const { return _loadTimeUSecs; }
@ -48,25 +47,31 @@ public:
void aboutToFinish(); /// call this to inform the persist thread that the owner is about to finish to support final persist
public slots:
void start();
signals:
void loadCompleted();
protected:
/// Implements generic processing behavior for this thread.
virtual bool process() override;
protected slots:
bool process();
void handleOctreeDataFileReply(QSharedPointer<ReceivedMessage> message);
protected:
void persist();
bool backupCurrentFile();
void cleanupOldReplacementBackups();
void replaceData(QByteArray data);
void sendLatestEntityDataToDS();
QString getTempFilename() const { return _filename + ".temp"; }
private:
OctreePointer _tree;
QString _filename;
int _persistInterval;
bool _initialLoadComplete;
QByteArray _replacementData;
quint64 _loadTimeUSecs;