cherrypick traverseTreeAndBuildNextPacketPayload()

This commit is contained in:
Andrew Meadows 2017-08-23 14:56:18 -07:00
parent a32cc7f555
commit 1562fb153e
4 changed files with 34 additions and 35 deletions

View file

@ -27,8 +27,8 @@ quint64 startSceneSleepTime = 0;
quint64 endSceneSleepTime = 0; quint64 endSceneSleepTime = 0;
OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) : OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) :
_myServer(myServer),
_node(node), _node(node),
_myServer(myServer),
_nodeUuid(node->getUUID()) _nodeUuid(node->getUUID())
{ {
QString safeServerName("Octree"); QString safeServerName("Octree");

View file

@ -57,18 +57,16 @@ protected:
bool viewFrustumChanged, bool isFullScene); bool viewFrustumChanged, bool isFullScene);
virtual bool traverseTreeAndBuildNextPacketPayload(EncodeBitstreamParams& params); virtual bool traverseTreeAndBuildNextPacketPayload(EncodeBitstreamParams& params);
OctreeServer* _myServer { nullptr }; OctreePacketData _packetData;
QWeakPointer<Node> _node; QWeakPointer<Node> _node;
OctreeServer* _myServer { nullptr };
private: private:
int handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, bool dontSuppressDuplicate = false); int handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, bool dontSuppressDuplicate = false);
int packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged); int packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged);
QUuid _nodeUuid; QUuid _nodeUuid;
OctreePacketData _packetData;
int _truePacketsSent { 0 }; // available for debug stats int _truePacketsSent { 0 }; // available for debug stats
int _trueBytesSent { 0 }; // available for debug stats int _trueBytesSent { 0 }; // available for debug stats
int _packetsSentThisInterval { 0 }; // used for bandwidth throttle condition int _packetsSentThisInterval { 0 }; // used for bandwidth throttle condition

View file

@ -883,7 +883,7 @@ OctreeServer::UniqueSendThread OctreeServer::newSendThread(const SharedNodePoint
OctreeServer::UniqueSendThread OctreeServer::createSendThread(const SharedNodePointer& node) { OctreeServer::UniqueSendThread OctreeServer::createSendThread(const SharedNodePointer& node) {
auto sendThread = newSendThread(node); auto sendThread = newSendThread(node);
// we want to be notified when the thread finishes // we want to be notified when the thread finishes
connect(sendThread.get(), &GenericThread::finished, this, &OctreeServer::removeSendThread); connect(sendThread.get(), &GenericThread::finished, this, &OctreeServer::removeSendThread);
sendThread->initialize(true); sendThread->initialize(true);
@ -905,13 +905,13 @@ void OctreeServer::handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> messa
// need to make sure we have it in our nodeList. // need to make sure we have it in our nodeList.
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
nodeList->updateNodeWithDataFromPacket(message, senderNode); nodeList->updateNodeWithDataFromPacket(message, senderNode);
auto it = _sendThreads.find(senderNode->getUUID()); auto it = _sendThreads.find(senderNode->getUUID());
if (it == _sendThreads.end()) { if (it == _sendThreads.end()) {
_sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode)); _sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode));
} else if (it->second->isShuttingDown()) { } else if (it->second->isShuttingDown()) {
_sendThreads.erase(it); // Remove right away and wait on thread to be _sendThreads.erase(it); // Remove right away and wait on thread to be
_sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode)); _sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode));
} }
} }
@ -1085,7 +1085,7 @@ void OctreeServer::readConfiguration() {
if (getPayload().size() > 0) { if (getPayload().size() > 0) {
parsePayload(); parsePayload();
} }
const QJsonObject& settingsObject = DependencyManager::get<NodeList>()->getDomainHandler().getSettingsObject(); const QJsonObject& settingsObject = DependencyManager::get<NodeList>()->getDomainHandler().getSettingsObject();
QString settingsKey = getMyDomainSettingsKey(); QString settingsKey = getMyDomainSettingsKey();
@ -1212,9 +1212,9 @@ void OctreeServer::run() {
OctreeElement::resetPopulationStatistics(); OctreeElement::resetPopulationStatistics();
_tree = createTree(); _tree = createTree();
_tree->setIsServer(true); _tree->setIsServer(true);
qDebug() << "Waiting for connection to domain to request settings from domain-server."; qDebug() << "Waiting for connection to domain to request settings from domain-server.";
// wait until we have the domain-server settings, otherwise we bail // wait until we have the domain-server settings, otherwise we bail
DomainHandler& domainHandler = DependencyManager::get<NodeList>()->getDomainHandler(); DomainHandler& domainHandler = DependencyManager::get<NodeList>()->getDomainHandler();
connect(&domainHandler, &DomainHandler::settingsReceived, this, &OctreeServer::domainSettingsRequestComplete); connect(&domainHandler, &DomainHandler::settingsReceived, this, &OctreeServer::domainSettingsRequestComplete);
@ -1225,9 +1225,9 @@ void OctreeServer::run() {
} }
void OctreeServer::domainSettingsRequestComplete() { void OctreeServer::domainSettingsRequestComplete() {
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
// we need to ask the DS about agents so we can ping/reply with them // we need to ask the DS about agents so we can ping/reply with them
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
@ -1237,26 +1237,26 @@ void OctreeServer::domainSettingsRequestComplete() {
packetReceiver.registerListener(PacketType::JurisdictionRequest, this, "handleJurisdictionRequestPacket"); packetReceiver.registerListener(PacketType::JurisdictionRequest, this, "handleJurisdictionRequestPacket");
packetReceiver.registerListener(PacketType::OctreeFileReplacement, this, "handleOctreeFileReplacement"); packetReceiver.registerListener(PacketType::OctreeFileReplacement, this, "handleOctreeFileReplacement");
packetReceiver.registerListener(PacketType::OctreeFileReplacementFromUrl, this, "handleOctreeFileReplacementFromURL"); packetReceiver.registerListener(PacketType::OctreeFileReplacementFromUrl, this, "handleOctreeFileReplacementFromURL");
readConfiguration(); readConfiguration();
beforeRun(); // after payload has been processed beforeRun(); // after payload has been processed
connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer))); connect(nodeList.data(), SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); connect(nodeList.data(), SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
#ifndef WIN32 #ifndef WIN32
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
#endif #endif
nodeList->linkedDataCreateCallback = [this](Node* node) { nodeList->linkedDataCreateCallback = [this](Node* node) {
auto queryNodeData = createOctreeQueryNode(); auto queryNodeData = createOctreeQueryNode();
queryNodeData->init(); queryNodeData->init();
node->setLinkedData(std::move(queryNodeData)); node->setLinkedData(std::move(queryNodeData));
}; };
srand((unsigned)time(0)); srand((unsigned)time(0));
// if we want Persistence, set up the local file and persist thread // if we want Persistence, set up the local file and persist thread
if (_wantPersist) { if (_wantPersist) {
// If persist filename does not exist, let's see if there is one beside the application binary // If persist filename does not exist, let's see if there is one beside the application binary
@ -1351,24 +1351,24 @@ void OctreeServer::domainSettingsRequestComplete() {
} }
} }
qDebug() << "Backups will be stored in: " << _backupDirectoryPath; qDebug() << "Backups will be stored in: " << _backupDirectoryPath;
// now set up PersistThread // now set up PersistThread
_persistThread = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _backupDirectoryPath, _persistInterval, _persistThread = new OctreePersistThread(_tree, _persistAbsoluteFilePath, _backupDirectoryPath, _persistInterval,
_wantBackup, _settings, _debugTimestampNow, _persistAsFileType); _wantBackup, _settings, _debugTimestampNow, _persistAsFileType);
_persistThread->initialize(true); _persistThread->initialize(true);
} }
// set up our jurisdiction broadcaster... // set up our jurisdiction broadcaster...
if (_jurisdiction) { if (_jurisdiction) {
_jurisdiction->setNodeType(getMyNodeType()); _jurisdiction->setNodeType(getMyNodeType());
} }
_jurisdictionSender = new JurisdictionSender(_jurisdiction, getMyNodeType()); _jurisdictionSender = new JurisdictionSender(_jurisdiction, getMyNodeType());
_jurisdictionSender->initialize(true); _jurisdictionSender->initialize(true);
// set up our OctreeServerPacketProcessor // set up our OctreeServerPacketProcessor
_octreeInboundPacketProcessor = new OctreeInboundPacketProcessor(this); _octreeInboundPacketProcessor = new OctreeInboundPacketProcessor(this);
_octreeInboundPacketProcessor->initialize(true); _octreeInboundPacketProcessor->initialize(true);
// Convert now to tm struct for local timezone // Convert now to tm struct for local timezone
tm* localtm = localtime(&_started); tm* localtm = localtime(&_started);
const int MAX_TIME_LENGTH = 128; const int MAX_TIME_LENGTH = 128;
@ -1380,7 +1380,7 @@ void OctreeServer::domainSettingsRequestComplete() {
if (gmtm) { if (gmtm) {
strftime(utcBuffer, MAX_TIME_LENGTH, " [%m/%d/%Y %X UTC]", gmtm); strftime(utcBuffer, MAX_TIME_LENGTH, " [%m/%d/%Y %X UTC]", gmtm);
} }
qDebug() << "Now running... started at: " << localBuffer << utcBuffer; qDebug() << "Now running... started at: " << localBuffer << utcBuffer;
} }
@ -1391,7 +1391,7 @@ void OctreeServer::nodeAdded(SharedNodePointer node) {
void OctreeServer::nodeKilled(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) {
quint64 start = usecTimestampNow(); quint64 start = usecTimestampNow();
// Shutdown send thread // Shutdown send thread
auto it = _sendThreads.find(node->getUUID()); auto it = _sendThreads.find(node->getUUID());
if (it != _sendThreads.end()) { if (it != _sendThreads.end()) {
@ -1437,13 +1437,13 @@ void OctreeServer::aboutToFinish() {
if (_jurisdictionSender) { if (_jurisdictionSender) {
_jurisdictionSender->terminating(); _jurisdictionSender->terminating();
} }
// Shut down all the send threads // Shut down all the send threads
for (auto& it : _sendThreads) { for (auto& it : _sendThreads) {
auto& sendThread = *it.second; auto& sendThread = *it.second;
sendThread.setIsShuttingDown(); sendThread.setIsShuttingDown();
} }
// Clear will destruct all the unique_ptr to OctreeSendThreads which will call the GenericThread's dtor // Clear will destruct all the unique_ptr to OctreeSendThreads which will call the GenericThread's dtor
// which waits on the thread to be done before returning // which waits on the thread to be done before returning
_sendThreads.clear(); // Cleans up all the send threads. _sendThreads.clear(); // Cleans up all the send threads.
@ -1563,7 +1563,7 @@ void OctreeServer::sendStatsPacket() {
threadsStats["2. packetDistributor"] = (double)howManyThreadsDidPacketDistributor(oneSecondAgo); threadsStats["2. packetDistributor"] = (double)howManyThreadsDidPacketDistributor(oneSecondAgo);
threadsStats["3. handlePacektSend"] = (double)howManyThreadsDidHandlePacketSend(oneSecondAgo); threadsStats["3. handlePacektSend"] = (double)howManyThreadsDidHandlePacketSend(oneSecondAgo);
threadsStats["4. writeDatagram"] = (double)howManyThreadsDidCallWriteDatagram(oneSecondAgo); threadsStats["4. writeDatagram"] = (double)howManyThreadsDidCallWriteDatagram(oneSecondAgo);
QJsonObject statsArray1; QJsonObject statsArray1;
statsArray1["1. configuration"] = getConfiguration(); statsArray1["1. configuration"] = getConfiguration();
statsArray1["2. detailed_stats_url"] = getStatusLink(); statsArray1["2. detailed_stats_url"] = getStatusLink();
@ -1571,13 +1571,13 @@ void OctreeServer::sendStatsPacket() {
statsArray1["4. persistFileLoadTime"] = getFileLoadTime(); statsArray1["4. persistFileLoadTime"] = getFileLoadTime();
statsArray1["5. clients"] = getCurrentClientCount(); statsArray1["5. clients"] = getCurrentClientCount();
statsArray1["6. threads"] = threadsStats; statsArray1["6. threads"] = threadsStats;
// Octree Stats // Octree Stats
QJsonObject octreeStats; QJsonObject octreeStats;
octreeStats["1. elementCount"] = (double)OctreeElement::getNodeCount(); octreeStats["1. elementCount"] = (double)OctreeElement::getNodeCount();
octreeStats["2. internalElementCount"] = (double)OctreeElement::getInternalNodeCount(); octreeStats["2. internalElementCount"] = (double)OctreeElement::getInternalNodeCount();
octreeStats["3. leafElementCount"] = (double)OctreeElement::getLeafNodeCount(); octreeStats["3. leafElementCount"] = (double)OctreeElement::getLeafNodeCount();
// Stats Object 2 // Stats Object 2
QJsonObject dataObject1; QJsonObject dataObject1;
dataObject1["1. totalPackets"] = (double)OctreeSendThread::_totalPackets; dataObject1["1. totalPackets"] = (double)OctreeSendThread::_totalPackets;
@ -1595,7 +1595,7 @@ void OctreeServer::sendStatsPacket() {
timingArray1["5. avgCompressAndWriteTime"] = getAverageCompressAndWriteTime(); timingArray1["5. avgCompressAndWriteTime"] = getAverageCompressAndWriteTime();
timingArray1["6. avgSendTime"] = getAveragePacketSendingTime(); timingArray1["6. avgSendTime"] = getAveragePacketSendingTime();
timingArray1["7. nodeWaitTime"] = getAverageNodeWaitTime(); timingArray1["7. nodeWaitTime"] = getAverageNodeWaitTime();
QJsonObject statsObject2; QJsonObject statsObject2;
statsObject2["data"] = dataObject1; statsObject2["data"] = dataObject1;
statsObject2["timing"] = timingArray1; statsObject2["timing"] = timingArray1;
@ -1615,18 +1615,18 @@ void OctreeServer::sendStatsPacket() {
timingArray2["4. avgProcessTimePerElement"] = (double)_octreeInboundPacketProcessor->getAverageProcessTimePerElement(); timingArray2["4. avgProcessTimePerElement"] = (double)_octreeInboundPacketProcessor->getAverageProcessTimePerElement();
timingArray2["5. avgLockWaitTimePerElement"] = (double)_octreeInboundPacketProcessor->getAverageLockWaitTimePerElement(); timingArray2["5. avgLockWaitTimePerElement"] = (double)_octreeInboundPacketProcessor->getAverageLockWaitTimePerElement();
} }
QJsonObject statsObject3; QJsonObject statsObject3;
statsObject3["data"] = dataArray2; statsObject3["data"] = dataArray2;
statsObject3["timing"] = timingArray2; statsObject3["timing"] = timingArray2;
// Merge everything // Merge everything
QJsonObject jsonArray; QJsonObject jsonArray;
jsonArray["1. misc"] = statsArray1; jsonArray["1. misc"] = statsArray1;
jsonArray["2. octree"] = octreeStats; jsonArray["2. octree"] = octreeStats;
jsonArray["3. outbound"] = statsObject2; jsonArray["3. outbound"] = statsObject2;
jsonArray["4. inbound"] = statsObject3; jsonArray["4. inbound"] = statsObject3;
QJsonObject statsObject; QJsonObject statsObject;
statsObject[QString(getMyServerName()) + "Server"] = jsonArray; statsObject[QString(getMyServerName()) + "Server"] = jsonArray;
addPacketStatsAndSendStatsPacket(statsObject); addPacketStatsAndSendStatsPacket(statsObject);

View file

@ -92,7 +92,8 @@ public:
OUT_OF_VIEW, OUT_OF_VIEW,
WAS_IN_VIEW, WAS_IN_VIEW,
NO_CHANGE, NO_CHANGE,
OCCLUDED OCCLUDED,
FINISHED
} reason; } reason;
reason stopReason; reason stopReason;