added back use of nodeWithUUID() instead of sharedpointer

This commit is contained in:
ZappoMan 2014-03-27 13:35:22 -07:00
parent 9dd2573311
commit bda96ef935
6 changed files with 73 additions and 25 deletions

View file

@ -21,8 +21,9 @@ quint64 endSceneSleepTime = 0;
OctreeSendThread::OctreeSendThread(OctreeServer* myServer, SharedNodePointer node) : OctreeSendThread::OctreeSendThread(OctreeServer* myServer, SharedNodePointer node) :
_myServer(myServer), _myServer(myServer),
_node(node), _nodeUUID(node->getUUID()),
_packetData(), _packetData(),
_nodeMissingCount(0),
_processLock(), _processLock(),
_isShuttingDown(false) _isShuttingDown(false)
{ {
@ -43,7 +44,6 @@ OctreeSendThread::~OctreeSendThread() {
} }
qDebug() << qPrintable(safeServerName) << "server [" << _myServer << "]: client disconnected " qDebug() << qPrintable(safeServerName) << "server [" << _myServer << "]: client disconnected "
"- ending sending thread [" << this << "]"; "- ending sending thread [" << this << "]";
_node.clear();
OctreeServer::clientDisconnected(); OctreeServer::clientDisconnected();
} }
@ -72,13 +72,28 @@ bool OctreeSendThread::process() {
// don't do any send processing until the initial load of the octree is complete... // don't do any send processing until the initial load of the octree is complete...
if (_myServer->isInitialLoadComplete()) { if (_myServer->isInitialLoadComplete()) {
if (!_node.isNull()) { SharedNodePointer node = NodeList::getInstance()->nodeWithUUID(_nodeUUID, false);
OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(_node->getLinkedData()); if (node) {
_nodeMissingCount = 0;
OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(node->getLinkedData());
// Sometimes the node data has not yet been linked, in which case we can't really do anything // Sometimes the node data has not yet been linked, in which case we can't really do anything
if (nodeData && !nodeData->isShuttingDown()) { if (nodeData && !nodeData->isShuttingDown()) {
bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); bool viewFrustumChanged = nodeData->updateCurrentViewFrustum();
packetDistributor(_node, nodeData, viewFrustumChanged); packetDistributor(node, nodeData, viewFrustumChanged);
}
} else {
_nodeMissingCount++;
const int MANY_FAILED_LOCKS = 1;
if (_nodeMissingCount >= MANY_FAILED_LOCKS) {
QString safeServerName("Octree");
if (_myServer) {
safeServerName = _myServer->getMyServerName();
}
qDebug() << qPrintable(safeServerName) << "server: sending thread [" << this << "]"
<< "failed to get nodeWithUUID() " << _nodeUUID <<". Failed:" << _nodeMissingCount << "times";
} }
} }
} }

View file

@ -38,14 +38,15 @@ protected:
virtual bool process(); virtual bool process();
private: private:
SharedNodePointer _node;
OctreeServer* _myServer; OctreeServer* _myServer;
QUuid _nodeUUID;
int handlePacketSend(const SharedNodePointer& node, OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent); int handlePacketSend(const SharedNodePointer& node, OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent);
int packetDistributor(const SharedNodePointer& node, OctreeQueryNode* nodeData, bool viewFrustumChanged); int packetDistributor(const SharedNodePointer& node, OctreeQueryNode* nodeData, bool viewFrustumChanged);
OctreePacketData _packetData; OctreePacketData _packetData;
int _nodeMissingCount;
QMutex _processLock; // don't allow us to have our nodeData, or our thread to be deleted while we're processing QMutex _processLock; // don't allow us to have our nodeData, or our thread to be deleted while we're processing
bool _isShuttingDown; bool _isShuttingDown;
}; };

View file

@ -1160,7 +1160,6 @@ QString OctreeServer::getStatusLink() {
} }
void OctreeServer::sendStatsPacket() { void OctreeServer::sendStatsPacket() {
// TODO: we have too many stats to fit in a single MTU... so for now, we break it into multiple JSON objects and // TODO: we have too many stats to fit in a single MTU... so for now, we break it into multiple JSON objects and
// send them separately. What we really should do is change the NodeList::sendStatsToDomainServer() to handle the // send them separately. What we really should do is change the NodeList::sendStatsToDomainServer() to handle the
// the following features: // the following features:
@ -1241,59 +1240,78 @@ QMap<OctreeSendThread*, quint64> OctreeServer::_threadsDidPacketDistributor;
QMap<OctreeSendThread*, quint64> OctreeServer::_threadsDidHandlePacketSend; QMap<OctreeSendThread*, quint64> OctreeServer::_threadsDidHandlePacketSend;
QMap<OctreeSendThread*, quint64> OctreeServer::_threadsDidCallWriteDatagram; QMap<OctreeSendThread*, quint64> OctreeServer::_threadsDidCallWriteDatagram;
QMutex OctreeServer::_threadsDidProcessMutex;
QMutex OctreeServer::_threadsDidPacketDistributorMutex;
QMutex OctreeServer::_threadsDidHandlePacketSendMutex;
QMutex OctreeServer::_threadsDidCallWriteDatagramMutex;
void OctreeServer::didProcess(OctreeSendThread* thread) { void OctreeServer::didProcess(OctreeSendThread* thread) {
QMutexLocker locker(&_threadsDidProcessMutex);
_threadsDidProcess[thread] = usecTimestampNow(); _threadsDidProcess[thread] = usecTimestampNow();
} }
void OctreeServer::didPacketDistributor(OctreeSendThread* thread) { void OctreeServer::didPacketDistributor(OctreeSendThread* thread) {
QMutexLocker locker(&_threadsDidPacketDistributorMutex);
_threadsDidPacketDistributor[thread] = usecTimestampNow(); _threadsDidPacketDistributor[thread] = usecTimestampNow();
} }
void OctreeServer::didHandlePacketSend(OctreeSendThread* thread) { void OctreeServer::didHandlePacketSend(OctreeSendThread* thread) {
QMutexLocker locker(&_threadsDidHandlePacketSendMutex);
_threadsDidHandlePacketSend[thread] = usecTimestampNow(); _threadsDidHandlePacketSend[thread] = usecTimestampNow();
} }
void OctreeServer::didCallWriteDatagram(OctreeSendThread* thread) { void OctreeServer::didCallWriteDatagram(OctreeSendThread* thread) {
QMutexLocker locker(&_threadsDidCallWriteDatagramMutex);
_threadsDidCallWriteDatagram[thread] = usecTimestampNow(); _threadsDidCallWriteDatagram[thread] = usecTimestampNow();
} }
void OctreeServer::stopTrackingThread(OctreeSendThread* thread) { void OctreeServer::stopTrackingThread(OctreeSendThread* thread) {
QMutexLocker lockerA(&_threadsDidProcessMutex);
QMutexLocker lockerB(&_threadsDidPacketDistributorMutex);
QMutexLocker lockerC(&_threadsDidHandlePacketSendMutex);
QMutexLocker lockerD(&_threadsDidCallWriteDatagramMutex);
_threadsDidProcess.remove(thread); _threadsDidProcess.remove(thread);
_threadsDidPacketDistributor.remove(thread); _threadsDidPacketDistributor.remove(thread);
_threadsDidHandlePacketSend.remove(thread); _threadsDidHandlePacketSend.remove(thread);
_threadsDidCallWriteDatagram.remove(thread);
} }
int howManyThreadsDidSomething(QMap<OctreeSendThread*, quint64>& something, quint64 since) { int howManyThreadsDidSomething(QMutex& mutex, QMap<OctreeSendThread*, quint64>& something, quint64 since) {
if (since == 0) {
return something.size();
}
int count = 0; int count = 0;
QMap<OctreeSendThread*, quint64>::const_iterator i = something.constBegin(); if (mutex.tryLock()) {
while (i != something.constEnd()) { if (since == 0) {
if (i.value() > since) { count = something.size();
count++; } else {
QMap<OctreeSendThread*, quint64>::const_iterator i = something.constBegin();
while (i != something.constEnd()) {
if (i.value() > since) {
count++;
}
++i;
}
} }
++i; mutex.unlock();
} }
return count; return count;
} }
int OctreeServer::howManyThreadsDidProcess(quint64 since) { int OctreeServer::howManyThreadsDidProcess(quint64 since) {
return howManyThreadsDidSomething(_threadsDidProcess, since); return howManyThreadsDidSomething(_threadsDidProcessMutex, _threadsDidProcess, since);
} }
int OctreeServer::howManyThreadsDidPacketDistributor(quint64 since) { int OctreeServer::howManyThreadsDidPacketDistributor(quint64 since) {
return howManyThreadsDidSomething(_threadsDidPacketDistributor, since); return howManyThreadsDidSomething(_threadsDidPacketDistributorMutex, _threadsDidPacketDistributor, since);
} }
int OctreeServer::howManyThreadsDidHandlePacketSend(quint64 since) { int OctreeServer::howManyThreadsDidHandlePacketSend(quint64 since) {
return howManyThreadsDidSomething(_threadsDidHandlePacketSend, since); return howManyThreadsDidSomething(_threadsDidHandlePacketSendMutex, _threadsDidHandlePacketSend, since);
} }
int OctreeServer::howManyThreadsDidCallWriteDatagram(quint64 since) { int OctreeServer::howManyThreadsDidCallWriteDatagram(quint64 since) {
return howManyThreadsDidSomething(_threadsDidCallWriteDatagram, since); return howManyThreadsDidSomething(_threadsDidCallWriteDatagramMutex, _threadsDidCallWriteDatagram, since);
} }

View file

@ -211,6 +211,10 @@ protected:
static QMap<OctreeSendThread*, quint64> _threadsDidHandlePacketSend; static QMap<OctreeSendThread*, quint64> _threadsDidHandlePacketSend;
static QMap<OctreeSendThread*, quint64> _threadsDidCallWriteDatagram; static QMap<OctreeSendThread*, quint64> _threadsDidCallWriteDatagram;
static QMutex _threadsDidProcessMutex;
static QMutex _threadsDidPacketDistributorMutex;
static QMutex _threadsDidHandlePacketSendMutex;
static QMutex _threadsDidCallWriteDatagramMutex;
}; };
#endif // __octree_server__OctreeServer__ #endif // __octree_server__OctreeServer__

View file

@ -357,10 +357,20 @@ int NodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packet) {
return 0; return 0;
} }
SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID) { SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) {
QMutexLocker locker(&_nodeHashMutex); const int WAIT_TIME = 10; // wait up to 10ms in the try lock case
return _nodeHash.value(nodeUUID); SharedNodePointer node;
} // if caller wants us to block and guarantee the correct answer, then honor that request
if (blockingLock) {
// this will block till we can get access
QMutexLocker locker(&_nodeHashMutex);
node = _nodeHash.value(nodeUUID);
} else if (_nodeHashMutex.tryLock(WAIT_TIME)) { // some callers are willing to get wrong answers but not block
node = _nodeHash.value(nodeUUID);
_nodeHashMutex.unlock();
}
return node;
}
SharedNodePointer NodeList::sendingNodeForPacket(const QByteArray& packet) { SharedNodePointer NodeList::sendingNodeForPacket(const QByteArray& packet) {
QUuid nodeUUID = uuidFromPacketHeader(packet); QUuid nodeUUID = uuidFromPacketHeader(packet);

View file

@ -103,7 +103,7 @@ public:
QByteArray constructPingReplyPacket(const QByteArray& pingPacket); QByteArray constructPingReplyPacket(const QByteArray& pingPacket);
void pingPublicAndLocalSocketsForInactiveNode(const SharedNodePointer& node); void pingPublicAndLocalSocketsForInactiveNode(const SharedNodePointer& node);
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID); SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true);
SharedNodePointer sendingNodeForPacket(const QByteArray& packet); SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType, SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType,