Merge pull request #2149 from ZappoMan/octree_server_scaling

Octree server scaling improvements
This commit is contained in:
Philip Rosedale 2014-02-28 17:03:04 -08:00
commit 8e30766c68
9 changed files with 105 additions and 34 deletions

View file

@ -20,7 +20,7 @@
class AABoundingVolume class AABoundingVolume
{ {
public: public:
AABoundingVolume() : _isSingleDirectionSet(false), _numPointsInSet(0) { memset(_bounds,0,sizeof(_bounds)); } AABoundingVolume() : _numPointsInSet(0), _isSingleDirectionSet(false) { memset(_bounds,0,sizeof(_bounds)); }
~AABoundingVolume(){} ~AABoundingVolume(){}
void AddToSet(const glm::vec3 newPt) void AddToSet(const glm::vec3 newPt)

View file

@ -28,7 +28,6 @@ GLubyte SvoViewer::SetupGlVBO(GLuint * id, int sizeInBytes, GLenum target, GLenu
bool SvoViewer::FindNumLeaves(OctreeElement* node, void* extraData) bool SvoViewer::FindNumLeaves(OctreeElement* node, void* extraData)
{ {
VoxelTreeElement* voxel = (VoxelTreeElement*)node;
FindNumLeavesData* args = (FindNumLeavesData*)extraData; FindNumLeavesData* args = (FindNumLeavesData*)extraData;
if (node->isLeaf()) args->numLeaves++; if (node->isLeaf()) args->numLeaves++;
return true; return true;

View file

@ -23,13 +23,13 @@ OctreeSendThread::OctreeSendThread(const QUuid& nodeUUID, OctreeServer* myServer
_packetData(), _packetData(),
_nodeMissingCount(0) _nodeMissingCount(0)
{ {
qDebug() << "client connected"; qDebug() << "client connected - starting sending thread";
_myServer->clientConnected(); OctreeServer::clientConnected();
} }
OctreeSendThread::~OctreeSendThread() { OctreeSendThread::~OctreeSendThread() {
qDebug() << "client disconnected"; qDebug() << "client disconnected - ending sending thread";
_myServer->clientDisconnected(); OctreeServer::clientDisconnected();
} }
@ -43,7 +43,6 @@ bool OctreeSendThread::process() {
} }
quint64 start = usecTimestampNow(); quint64 start = usecTimestampNow();
bool gotLock = false;
// don't do any send processing until the initial load of the octree is complete... // don't do any send processing until the initial load of the octree is complete...
if (_myServer->isInitialLoadComplete()) { if (_myServer->isInitialLoadComplete()) {
@ -51,25 +50,19 @@ bool OctreeSendThread::process() {
if (node) { if (node) {
_nodeMissingCount = 0; _nodeMissingCount = 0;
// make sure the node list doesn't kill our node while we're using it OctreeQueryNode* nodeData = NULL;
if (node->getMutex().tryLock()) {
gotLock = true;
OctreeQueryNode* nodeData = NULL;
nodeData = (OctreeQueryNode*) node->getLinkedData(); nodeData = (OctreeQueryNode*) node->getLinkedData();
int packetsSent = 0; int packetsSent = 0;
// Sometimes the node data has not yet been linked, in which case we can't really do anything // Sometimes the node data has not yet been linked, in which case we can't really do anything
if (nodeData) { if (nodeData) {
bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); bool viewFrustumChanged = nodeData->updateCurrentViewFrustum();
if (_myServer->wantsDebugSending() && _myServer->wantsVerboseDebug()) { if (_myServer->wantsDebugSending() && _myServer->wantsVerboseDebug()) {
printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged)); printf("nodeData->updateCurrentViewFrustum() changed=%s\n", debug::valueOf(viewFrustumChanged));
}
packetsSent = packetDistributor(node, nodeData, viewFrustumChanged);
} }
packetsSent = packetDistributor(node, nodeData, viewFrustumChanged);
node->getMutex().unlock(); // we're done with this node for now.
} }
} else { } else {
_nodeMissingCount++; _nodeMissingCount++;
@ -81,7 +74,7 @@ bool OctreeSendThread::process() {
} }
// Only sleep if we're still running and we got the lock last time we tried, otherwise try to get the lock asap // Only sleep if we're still running and we got the lock last time we tried, otherwise try to get the lock asap
if (isStillRunning() && gotLock) { if (isStillRunning()) {
// dynamically sleep until we need to fire off the next set of octree elements // dynamically sleep until we need to fire off the next set of octree elements
int elapsed = (usecTimestampNow() - start); int elapsed = (usecTimestampNow() - start);
int usecToSleep = OCTREE_SEND_INTERVAL_USECS - elapsed; int usecToSleep = OCTREE_SEND_INTERVAL_USECS - elapsed;
@ -90,9 +83,12 @@ bool OctreeSendThread::process() {
PerformanceWarning warn(false,"OctreeSendThread... usleep()",false,&_usleepTime,&_usleepCalls); PerformanceWarning warn(false,"OctreeSendThread... usleep()",false,&_usleepTime,&_usleepCalls);
usleep(usecToSleep); usleep(usecToSleep);
} else { } else {
if (_myServer->wantsDebugSending() && _myServer->wantsVerboseDebug()) { if (true || (_myServer->wantsDebugSending() && _myServer->wantsVerboseDebug())) {
std::cout << "Last send took too much time, not sleeping!\n"; qDebug() << "Last send took too much time (" << (elapsed / USECS_PER_MSEC)
<<" msecs), barely sleeping 1 usec!\n";
} }
const int MIN_USEC_TO_SLEEP = 1;
usleep(MIN_USEC_TO_SLEEP);
} }
} }
@ -114,6 +110,13 @@ int OctreeSendThread::handlePacketSend(const SharedNodePointer& node, OctreeQuer
int packetsSent = 0; int packetsSent = 0;
// double check that the node has an active socket, otherwise, don't send... // double check that the node has an active socket, otherwise, don't send...
quint64 lockWaitStart = usecTimestampNow();
QMutexLocker locker(&node->getMutex());
quint64 lockWaitEnd = usecTimestampNow();
float lockWaitElapsedUsec = (float)(lockWaitEnd - lockWaitStart);
OctreeServer::trackNodeWaitTime(lockWaitElapsedUsec);
const HifiSockAddr* nodeAddress = node->getActiveSocket(); const HifiSockAddr* nodeAddress = node->getActiveSocket();
if (!nodeAddress) { if (!nodeAddress) {
return packetsSent; // without sending... return packetsSent; // without sending...
@ -440,9 +443,19 @@ int OctreeSendThread::packetDistributor(const SharedNodePointer& node, OctreeQue
isFullScene, &nodeData->stats, _myServer->getJurisdiction()); isFullScene, &nodeData->stats, _myServer->getJurisdiction());
quint64 lockWaitStart = usecTimestampNow();
_myServer->getOctree()->lockForRead(); _myServer->getOctree()->lockForRead();
quint64 lockWaitEnd = usecTimestampNow();
float lockWaitElapsedUsec = (float)(lockWaitEnd - lockWaitStart);
OctreeServer::trackTreeWaitTime(lockWaitElapsedUsec);
nodeData->stats.encodeStarted(); nodeData->stats.encodeStarted();
quint64 encodeStart = usecTimestampNow();
bytesWritten = _myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->nodeBag, params); bytesWritten = _myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->nodeBag, params);
quint64 encodeEnd = usecTimestampNow();
int encodeElapsedMsec = (encodeEnd - encodeStart)/USECS_PER_MSEC;
OctreeServer::trackEncodeTime(encodeElapsedMsec);
// If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've // If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've
// sent the entire scene. We want to know this below so we'll actually write this content into // sent the entire scene. We want to know this below so we'll actually write this content into
@ -555,7 +568,8 @@ int OctreeSendThread::packetDistributor(const SharedNodePointer& node, OctreeQue
quint64 end = usecTimestampNow(); quint64 end = usecTimestampNow();
int elapsedmsec = (end - start)/1000; int elapsedmsec = (end - start)/USECS_PER_MSEC;
OctreeServer::trackLoopTime(elapsedmsec);
quint64 endCompressCalls = OctreePacketData::getCompressContentCalls(); quint64 endCompressCalls = OctreePacketData::getCompressContentCalls();
int elapsedCompressCalls = endCompressCalls - startCompressCalls; int elapsedCompressCalls = endCompressCalls - startCompressCalls;
@ -563,7 +577,6 @@ int OctreeSendThread::packetDistributor(const SharedNodePointer& node, OctreeQue
quint64 endCompressTimeMsecs = OctreePacketData::getCompressContentTime() / 1000; quint64 endCompressTimeMsecs = OctreePacketData::getCompressContentTime() / 1000;
int elapsedCompressTimeMsecs = endCompressTimeMsecs - startCompressTimeMsecs; int elapsedCompressTimeMsecs = endCompressTimeMsecs - startCompressTimeMsecs;
if (elapsedmsec > 100) { if (elapsedmsec > 100) {
if (elapsedmsec > 1000) { if (elapsedmsec > 1000) {
int elapsedsec = (end - start)/1000000; int elapsedsec = (end - start)/1000000;

View file

@ -20,7 +20,10 @@
OctreeServer* OctreeServer::_instance = NULL; OctreeServer* OctreeServer::_instance = NULL;
int OctreeServer::_clientCount = 0; int OctreeServer::_clientCount = 0;
SimpleMovingAverage OctreeServer::_averageLoopTime(10000);
SimpleMovingAverage OctreeServer::_averageEncodeTime(10000);
SimpleMovingAverage OctreeServer::_averageTreeWaitTime(10000);
SimpleMovingAverage OctreeServer::_averageNodeWaitTime(10000);
void OctreeServer::attachQueryNodeToNode(Node* newNode) { void OctreeServer::attachQueryNodeToNode(Node* newNode) {
if (newNode->getLinkedData() == NULL) { if (newNode->getLinkedData() == NULL) {
@ -51,6 +54,7 @@ OctreeServer::OctreeServer(const QByteArray& packet) :
_startedUSecs(usecTimestampNow()) _startedUSecs(usecTimestampNow())
{ {
_instance = this; _instance = this;
_averageLoopTime.updateAverage(0);
} }
OctreeServer::~OctreeServer() { OctreeServer::~OctreeServer() {
@ -266,7 +270,27 @@ bool OctreeServer::handleHTTPRequest(HTTPConnection* connection, const QString&
.arg(locale.toString((uint)getPacketsTotalPerSecond()).rightJustified(COLUMN_WIDTH, ' ')); .arg(locale.toString((uint)getPacketsTotalPerSecond()).rightJustified(COLUMN_WIDTH, ' '));
statsString += QString(" Total Clients Connected: %1 clients\r\n\r\n") statsString += QString(" Total Clients Connected: %1 clients\r\n\r\n")
.arg(locale.toString((uint)getCurrentClientCount()).rightJustified(COLUMN_WIDTH, ' ')); .arg(locale.toString((uint)getCurrentClientCount()).rightJustified(COLUMN_WIDTH, ' '));
float averageLoopTime = getAverageLoopTime();
statsString += QString().sprintf(" Average packetLoop() time: %5.2f msecs\r\n", averageLoopTime);
qDebug() << "averageLoopTime=" << averageLoopTime;
float averageEncodeTime = getAverageEncodeTime();
statsString += QString().sprintf(" Average encode time: %5.2f msecs\r\n", averageEncodeTime);
qDebug() << "averageEncodeTime=" << averageEncodeTime;
float averageTreeWaitTime = getAverageTreeWaitTime();
statsString += QString().sprintf(" Average tree lock wait time: %7.2f usecs\r\n", averageTreeWaitTime);
qDebug() << "averageTreeWaitTime=" << averageTreeWaitTime;
float averageNodeWaitTime = getAverageNodeWaitTime();
statsString += QString().sprintf(" Average node lock wait time: %7.2f usecs\r\n", averageNodeWaitTime);
qDebug() << "averageNodeWaitTime=" << averageNodeWaitTime;
statsString += QString("\r\n");
statsString += QString(" Total Outbound Packets: %1 packets\r\n") statsString += QString(" Total Outbound Packets: %1 packets\r\n")
.arg(locale.toString((uint)totalOutboundPackets).rightJustified(COLUMN_WIDTH, ' ')); .arg(locale.toString((uint)totalOutboundPackets).rightJustified(COLUMN_WIDTH, ' '));
statsString += QString(" Total Outbound Bytes: %1 bytes\r\n") statsString += QString(" Total Outbound Bytes: %1 bytes\r\n")
@ -584,6 +608,9 @@ void OctreeServer::run() {
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
nodeList->setOwnerType(getMyNodeType()); nodeList->setOwnerType(getMyNodeType());
connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), SLOT(nodeAdded(SharedNodePointer)));
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)),SLOT(nodeKilled(SharedNodePointer)));
// we need to ask the DS about agents so we can ping/reply with them // we need to ask the DS about agents so we can ping/reply with them
nodeList->addNodeTypeToInterestSet(NodeType::Agent); nodeList->addNodeTypeToInterestSet(NodeType::Agent);
@ -703,3 +730,19 @@ void OctreeServer::run() {
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000); silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
} }
void OctreeServer::nodeAdded(SharedNodePointer node) {
// we might choose to use this notifier to track clients in a pending state
}
void OctreeServer::nodeKilled(SharedNodePointer node) {
OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(node->getLinkedData());
if (nodeData) {
// Note: It should be safe to do this without locking the node, because if any other threads
// are using the SharedNodePointer, then they have a reference to the SharedNodePointer and the deleteLater()
// won't actually delete it until all threads have released their references to the pointer.
// But we can and should clear the linked data so that no one else tries to access it.
nodeData->deleteLater();
node->setLinkedData(NULL);
}
}

View file

@ -74,11 +74,24 @@ public:
static void attachQueryNodeToNode(Node* newNode); static void attachQueryNodeToNode(Node* newNode);
static void trackLoopTime(float time) { _averageLoopTime.updateAverage(time); }
static float getAverageLoopTime() { return _averageLoopTime.getAverage(); }
static void trackEncodeTime(float time) { _averageEncodeTime.updateAverage(time); }
static float getAverageEncodeTime() { return _averageEncodeTime.getAverage(); }
static void trackTreeWaitTime(float time) { _averageTreeWaitTime.updateAverage(time); }
static float getAverageTreeWaitTime() { return _averageTreeWaitTime.getAverage(); }
static void trackNodeWaitTime(float time) { _averageNodeWaitTime.updateAverage(time); }
static float getAverageNodeWaitTime() { return _averageNodeWaitTime.getAverage(); }
bool handleHTTPRequest(HTTPConnection* connection, const QString& path); bool handleHTTPRequest(HTTPConnection* connection, const QString& path);
public slots: public slots:
/// runs the voxel server assignment /// runs the voxel server assignment
void run(); void run();
void readPendingDatagrams(); void readPendingDatagrams();
void nodeAdded(SharedNodePointer node);
void nodeKilled(SharedNodePointer node);
protected: protected:
void parsePayload(); void parsePayload();
@ -109,6 +122,10 @@ protected:
quint64 _startedUSecs; quint64 _startedUSecs;
static int _clientCount; static int _clientCount;
static SimpleMovingAverage _averageLoopTime;
static SimpleMovingAverage _averageEncodeTime;
static SimpleMovingAverage _averageTreeWaitTime;
static SimpleMovingAverage _averageNodeWaitTime;
}; };
#endif // __octree_server__OctreeServer__ #endif // __octree_server__OctreeServer__

View file

@ -68,9 +68,7 @@ bool OctreePersistThread::process() {
usleep(USECS_TO_SLEEP); usleep(USECS_TO_SLEEP);
// do our updates then check to save... // do our updates then check to save...
_tree->lockForWrite();
_tree->update(); _tree->update();
_tree->unlock();
quint64 now = usecTimestampNow(); quint64 now = usecTimestampNow();
quint64 sinceLastSave = now - _lastCheck; quint64 sinceLastSave = now - _lastCheck;

View file

@ -474,7 +474,9 @@ void ParticleTree::update() {
AABox treeBounds = getRoot()->getAABox(); AABox treeBounds = getRoot()->getAABox();
if (!shouldDie && treeBounds.contains(args._movingParticles[i].getPosition())) { if (!shouldDie && treeBounds.contains(args._movingParticles[i].getPosition())) {
lockForWrite();
storeParticle(args._movingParticles[i]); storeParticle(args._movingParticles[i]);
unlock();
} else { } else {
uint32_t particleID = args._movingParticles[i].getID(); uint32_t particleID = args._movingParticles[i].getID();
quint64 deletedAt = usecTimestampNow(); quint64 deletedAt = usecTimestampNow();

View file

@ -72,6 +72,5 @@ void GenericThread::threadRoutine() {
if (_isThreaded && _thread) { if (_isThreaded && _thread) {
_thread->quit(); _thread->quit();
} }
emit finished(); emit finished();
} }

View file

@ -79,8 +79,8 @@ Resource::Resource(const QUrl& url, bool delayLoad) :
_startedLoading(false), _startedLoading(false),
_failedToLoad(false), _failedToLoad(false),
_loaded(false), _loaded(false),
_attempts(0), _reply(NULL),
_reply(NULL) { _attempts(0) {
if (!url.isValid()) { if (!url.isValid()) {
_startedLoading = _failedToLoad = true; _startedLoading = _failedToLoad = true;