Merge pull request #6670 from Atlante45/ac-race

Fix entity server race
This commit is contained in:
Brad Hefta-Gaub 2015-12-16 20:21:56 -08:00
commit 275b35ef60
13 changed files with 190 additions and 226 deletions

View file

@ -35,6 +35,7 @@
#include "AssignmentActionFactory.h" #include "AssignmentActionFactory.h"
#include "AssignmentClient.h" #include "AssignmentClient.h"
#include "AssignmentClientLogging.h"
#include "avatars/ScriptableAvatar.h" #include "avatars/ScriptableAvatar.h"
const QString ASSIGNMENT_CLIENT_TARGET_NAME = "assignment-client"; const QString ASSIGNMENT_CLIENT_TARGET_NAME = "assignment-client";
@ -84,7 +85,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
// check for a wallet UUID on the command line or in the config // check for a wallet UUID on the command line or in the config
// this would represent where the user running AC wants funds sent to // this would represent where the user running AC wants funds sent to
if (!walletUUID.isNull()) { if (!walletUUID.isNull()) {
qDebug() << "The destination wallet UUID for credits is" << uuidStringWithoutCurlyBraces(walletUUID); qCDebug(assigmnentclient) << "The destination wallet UUID for credits is" << uuidStringWithoutCurlyBraces(walletUUID);
_requestAssignment.setWalletUUID(walletUUID); _requestAssignment.setWalletUUID(walletUUID);
} }
@ -98,13 +99,13 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
_assignmentServerSocket.setObjectName("AssigmentServer"); _assignmentServerSocket.setObjectName("AssigmentServer");
nodeList->setAssignmentServerSocket(_assignmentServerSocket); nodeList->setAssignmentServerSocket(_assignmentServerSocket);
qDebug() << "Assignment server socket is" << _assignmentServerSocket; qCDebug(assigmnentclient) << "Assignment server socket is" << _assignmentServerSocket;
// call a timer function every ASSIGNMENT_REQUEST_INTERVAL_MSECS to ask for assignment, if required // call a timer function every ASSIGNMENT_REQUEST_INTERVAL_MSECS to ask for assignment, if required
qDebug() << "Waiting for assignment -" << _requestAssignment; qCDebug(assigmnentclient) << "Waiting for assignment -" << _requestAssignment;
if (_assignmentServerHostname != "localhost") { if (_assignmentServerHostname != "localhost") {
qDebug () << "- will attempt to connect to domain-server on" << _assignmentServerSocket.getPort(); qCDebug(assigmnentclient) << "- will attempt to connect to domain-server on" << _assignmentServerSocket.getPort();
} }
connect(&_requestTimer, SIGNAL(timeout()), SLOT(sendAssignmentRequest())); connect(&_requestTimer, SIGNAL(timeout()), SLOT(sendAssignmentRequest()));
@ -122,7 +123,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
_assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, assignmentMonitorPort); _assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, assignmentMonitorPort);
_assignmentClientMonitorSocket.setObjectName("AssignmentClientMonitor"); _assignmentClientMonitorSocket.setObjectName("AssignmentClientMonitor");
qDebug() << "Assignment-client monitor socket is" << _assignmentClientMonitorSocket; qCDebug(assigmnentclient) << "Assignment-client monitor socket is" << _assignmentClientMonitorSocket;
// Hook up a timer to send this child's status to the Monitor once per second // Hook up a timer to send this child's status to the Monitor once per second
setUpStatusToMonitor(); setUpStatusToMonitor();
@ -133,7 +134,7 @@ AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QStri
} }
void AssignmentClient::stopAssignmentClient() { void AssignmentClient::stopAssignmentClient() {
qDebug() << "Forced stop of assignment-client."; qCDebug(assigmnentclient) << "Forced stop of assignment-client.";
_requestTimer.stop(); _requestTimer.stop();
_statsTimerACM.stop(); _statsTimerACM.stop();
@ -209,14 +210,14 @@ void AssignmentClient::sendAssignmentRequest() {
quint16 localAssignmentServerPort; quint16 localAssignmentServerPort;
if (nodeList->getLocalServerPortFromSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, localAssignmentServerPort)) { if (nodeList->getLocalServerPortFromSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, localAssignmentServerPort)) {
if (localAssignmentServerPort != _assignmentServerSocket.getPort()) { if (localAssignmentServerPort != _assignmentServerSocket.getPort()) {
qDebug() << "Port for local assignment server read from shared memory is" qCDebug(assigmnentclient) << "Port for local assignment server read from shared memory is"
<< localAssignmentServerPort; << localAssignmentServerPort;
_assignmentServerSocket.setPort(localAssignmentServerPort); _assignmentServerSocket.setPort(localAssignmentServerPort);
nodeList->setAssignmentServerSocket(_assignmentServerSocket); nodeList->setAssignmentServerSocket(_assignmentServerSocket);
} }
} else { } else {
qDebug() << "Failed to read local assignment server port from shared memory" qCWarning(assigmnentclient) << "Failed to read local assignment server port from shared memory"
<< "- will send assignment request to previous assignment server socket."; << "- will send assignment request to previous assignment server socket.";
} }
} }
@ -226,13 +227,13 @@ void AssignmentClient::sendAssignmentRequest() {
} }
void AssignmentClient::handleCreateAssignmentPacket(QSharedPointer<ReceivedMessage> message) { void AssignmentClient::handleCreateAssignmentPacket(QSharedPointer<ReceivedMessage> message) {
qDebug() << "Received a PacketType::CreateAssignment - attempting to unpack."; qCDebug(assigmnentclient) << "Received a PacketType::CreateAssignment - attempting to unpack.";
// construct the deployed assignment from the packet data // construct the deployed assignment from the packet data
_currentAssignment = AssignmentFactory::unpackAssignment(*message); _currentAssignment = AssignmentFactory::unpackAssignment(*message);
if (_currentAssignment && !_isAssigned) { if (_currentAssignment && !_isAssigned) {
qDebug() << "Received an assignment -" << *_currentAssignment; qDebug(assigmnentclient) << "Received an assignment -" << *_currentAssignment;
_isAssigned = true; _isAssigned = true;
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
@ -242,7 +243,7 @@ void AssignmentClient::handleCreateAssignmentPacket(QSharedPointer<ReceivedMessa
nodeList->getDomainHandler().setSockAddr(message->getSenderSockAddr(), _assignmentServerHostname); nodeList->getDomainHandler().setSockAddr(message->getSenderSockAddr(), _assignmentServerHostname);
nodeList->getDomainHandler().setAssignmentUUID(_currentAssignment->getUUID()); nodeList->getDomainHandler().setAssignmentUUID(_currentAssignment->getUUID());
qDebug() << "Destination IP for assignment is" << nodeList->getDomainHandler().getIP().toString(); qCDebug(assigmnentclient) << "Destination IP for assignment is" << nodeList->getDomainHandler().getIP().toString();
// start the deployed assignment // start the deployed assignment
QThread* workerThread = new QThread; QThread* workerThread = new QThread;
@ -270,7 +271,7 @@ void AssignmentClient::handleCreateAssignmentPacket(QSharedPointer<ReceivedMessa
// Starts an event loop, and emits workerThread->started() // Starts an event loop, and emits workerThread->started()
workerThread->start(); workerThread->start();
} else { } else {
qDebug() << "Received an assignment that could not be unpacked. Re-requesting."; qCWarning(assigmnentclient) << "Received an assignment that could not be unpacked. Re-requesting.";
} }
} }
@ -278,12 +279,12 @@ void AssignmentClient::handleStopNodePacket(QSharedPointer<ReceivedMessage> mess
const HifiSockAddr& senderSockAddr = message->getSenderSockAddr(); const HifiSockAddr& senderSockAddr = message->getSenderSockAddr();
if (senderSockAddr.getAddress() == QHostAddress::LocalHost || if (senderSockAddr.getAddress() == QHostAddress::LocalHost ||
senderSockAddr.getAddress() == QHostAddress::LocalHostIPv6) { senderSockAddr.getAddress() == QHostAddress::LocalHostIPv6) {
qDebug() << "AssignmentClientMonitor at" << senderSockAddr << "requested stop via PacketType::StopNode.";
qCDebug(assigmnentclient) << "AssignmentClientMonitor at" << senderSockAddr << "requested stop via PacketType::StopNode.";
QCoreApplication::quit(); QCoreApplication::quit();
} else { } else {
qDebug() << "Got a stop packet from other than localhost."; qCWarning(assigmnentclient) << "Got a stop packet from other than localhost.";
} }
} }
@ -303,7 +304,7 @@ void AssignmentClient::handleAuthenticationRequest() {
// ask the account manager to log us in from the env variables // ask the account manager to log us in from the env variables
accountManager.requestAccessToken(username, password); accountManager.requestAccessToken(username, password);
} else { } else {
qDebug() << "Authentication was requested against" << qPrintable(accountManager.getAuthURL().toString()) qCWarning(assigmnentclient) << "Authentication was requested against" << qPrintable(accountManager.getAuthURL().toString())
<< "but both or one of" << qPrintable(DATA_SERVER_USERNAME_ENV) << "but both or one of" << qPrintable(DATA_SERVER_USERNAME_ENV)
<< "/" << qPrintable(DATA_SERVER_PASSWORD_ENV) << "are not set. Unable to authenticate."; << "/" << qPrintable(DATA_SERVER_PASSWORD_ENV) << "are not set. Unable to authenticate.";
@ -321,7 +322,7 @@ void AssignmentClient::assignmentCompleted() {
// reset the logging target to the the CHILD_TARGET_NAME // reset the logging target to the the CHILD_TARGET_NAME
LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME); LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME);
qDebug() << "Assignment finished or never started - waiting for new assignment."; qCDebug(assigmnentclient) << "Assignment finished or never started - waiting for new assignment.";
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();

View file

@ -0,0 +1,14 @@
//
// AssignmentClientLogging.cpp
// assignment-client/src
//
// Created by Clement on 12/14/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "AssignmentClientLogging.h"
Q_LOGGING_CATEGORY(assigmnentclient, "hifi.assignment-client")

View file

@ -0,0 +1,19 @@
//
// AssignmentClientLogging.h
// assignment-client/src
//
// Created by Clement on 12/14/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_AssignmentClientLogging_h
#define hifi_AssignmentClientLogging_h
#include <QLoggingCategory>
Q_DECLARE_LOGGING_CATEGORY(assigmnentclient)
#endif // hifi_AssignmentClientLogging_h

View file

@ -20,80 +20,8 @@
#include "OctreeSendThread.h" #include "OctreeSendThread.h"
OctreeQueryNode::OctreeQueryNode() :
_viewSent(false),
_octreePacket(),
_octreePacketWaiting(false),
_lastOctreePayload(new char[udt::MAX_PACKET_SIZE]),
_lastOctreePacketLength(0),
_duplicatePacketCount(0),
_firstSuppressedPacket(usecTimestampNow()),
_maxSearchLevel(1),
_maxLevelReachedInLastSearch(1),
_lastTimeBagEmpty(0),
_viewFrustumChanging(false),
_viewFrustumJustStoppedChanging(true),
_octreeSendThread(NULL),
_lastClientBoundaryLevelAdjust(0),
_lastClientOctreeSizeScale(DEFAULT_OCTREE_SIZE_SCALE),
_lodChanged(false),
_lodInitialized(false),
_sequenceNumber(0),
_lastRootTimestamp(0),
_myPacketType(PacketType::Unknown),
_isShuttingDown(false),
_sentPacketHistory()
{
}
OctreeQueryNode::~OctreeQueryNode() {
_isShuttingDown = true;
if (_octreeSendThread) {
forceNodeShutdown();
}
delete[] _lastOctreePayload;
}
void OctreeQueryNode::nodeKilled() { void OctreeQueryNode::nodeKilled() {
_isShuttingDown = true; _isShuttingDown = true;
if (_octreeSendThread) {
// just tell our thread we want to shutdown, this is asynchronous, and fast, we don't need or want it to block
// while the thread actually shuts down
_octreeSendThread->setIsShuttingDown();
}
}
void OctreeQueryNode::forceNodeShutdown() {
_isShuttingDown = true;
if (_octreeSendThread) {
// we really need to force our thread to shutdown, this is synchronous, we will block while the thread actually
// shuts down because we really need it to shutdown, and it's ok if we wait for it to complete
OctreeSendThread* sendThread = _octreeSendThread;
_octreeSendThread = NULL;
sendThread->setIsShuttingDown();
sendThread->terminate();
delete sendThread;
}
}
void OctreeQueryNode::sendThreadFinished() {
// We've been notified by our thread that it is shutting down. So we can clean up our reference to it, and
// delete the actual thread object. Cleaning up our thread will correctly unroll all refereces to shared
// pointers to our node as well as the octree server assignment
if (_octreeSendThread) {
OctreeSendThread* sendThread = _octreeSendThread;
_octreeSendThread = NULL;
delete sendThread;
}
}
void OctreeQueryNode::initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) {
_octreeSendThread = new OctreeSendThread(myServer, node);
// we want to be notified when the thread finishes
connect(_octreeSendThread, &GenericThread::finished, this, &OctreeQueryNode::sendThreadFinished);
_octreeSendThread->initialize(true);
} }
bool OctreeQueryNode::packetIsDuplicate() const { bool OctreeQueryNode::packetIsDuplicate() const {
@ -105,7 +33,7 @@ bool OctreeQueryNode::packetIsDuplicate() const {
// of the entire packet, we need to compare only the packet content... // of the entire packet, we need to compare only the packet content...
if (_lastOctreePacketLength == _octreePacket->getPayloadSize()) { if (_lastOctreePacketLength == _octreePacket->getPayloadSize()) {
if (memcmp(_lastOctreePayload + OCTREE_PACKET_EXTRA_HEADERS_SIZE, if (memcmp(&_lastOctreePayload + OCTREE_PACKET_EXTRA_HEADERS_SIZE,
_octreePacket->getPayload() + OCTREE_PACKET_EXTRA_HEADERS_SIZE, _octreePacket->getPayload() + OCTREE_PACKET_EXTRA_HEADERS_SIZE,
_octreePacket->getPayloadSize() - OCTREE_PACKET_EXTRA_HEADERS_SIZE) == 0) { _octreePacket->getPayloadSize() - OCTREE_PACKET_EXTRA_HEADERS_SIZE) == 0) {
return true; return true;
@ -173,7 +101,7 @@ void OctreeQueryNode::resetOctreePacket() {
// scene information, (e.g. the root node packet of a static scene), we can use this as a strategy for reducing // scene information, (e.g. the root node packet of a static scene), we can use this as a strategy for reducing
// packet send rate. // packet send rate.
_lastOctreePacketLength = _octreePacket->getPayloadSize(); _lastOctreePacketLength = _octreePacket->getPayloadSize();
memcpy(_lastOctreePayload, _octreePacket->getPayload(), _lastOctreePacketLength); memcpy(&_lastOctreePayload, _octreePacket->getPayload(), _lastOctreePacketLength);
// If we're moving, and the client asked for low res, then we force monochrome, otherwise, use // If we're moving, and the client asked for low res, then we force monochrome, otherwise, use
// the clients requested color state. // the clients requested color state.

View file

@ -29,8 +29,8 @@ class OctreeServer;
class OctreeQueryNode : public OctreeQuery { class OctreeQueryNode : public OctreeQuery {
Q_OBJECT Q_OBJECT
public: public:
OctreeQueryNode(); OctreeQueryNode() = default;
virtual ~OctreeQueryNode(); virtual ~OctreeQueryNode() = default;
void init(); // called after creation to set up some virtual items void init(); // called after creation to set up some virtual items
virtual PacketType getMyPacketType() const = 0; virtual PacketType getMyPacketType() const = 0;
@ -79,9 +79,6 @@ public:
OctreeSceneStats stats; OctreeSceneStats stats;
void initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node);
bool isOctreeSendThreadInitalized() { return _octreeSendThread; }
void dumpOutOfView(); void dumpOutOfView();
quint64 getLastRootTimestamp() const { return _lastRootTimestamp; } quint64 getLastRootTimestamp() const { return _lastRootTimestamp; }
@ -92,7 +89,6 @@ public:
void sceneStart(quint64 sceneSendStartTime) { _sceneSendStartTime = sceneSendStartTime; } void sceneStart(quint64 sceneSendStartTime) { _sceneSendStartTime = sceneSendStartTime; }
void nodeKilled(); void nodeKilled();
void forceNodeShutdown();
bool isShuttingDown() const { return _isShuttingDown; } bool isShuttingDown() const { return _isShuttingDown; }
void octreePacketSent() { packetSent(*_octreePacket); } void octreePacketSent() { packetSent(*_octreePacket); }
@ -104,49 +100,47 @@ public:
bool hasNextNackedPacket() const; bool hasNextNackedPacket() const;
const NLPacket* getNextNackedPacket(); const NLPacket* getNextNackedPacket();
private slots:
void sendThreadFinished();
private: private:
OctreeQueryNode(const OctreeQueryNode &); OctreeQueryNode(const OctreeQueryNode &);
OctreeQueryNode& operator= (const OctreeQueryNode&); OctreeQueryNode& operator= (const OctreeQueryNode&);
bool _viewSent; bool _viewSent { false };
std::unique_ptr<NLPacket> _octreePacket; std::unique_ptr<NLPacket> _octreePacket;
bool _octreePacketWaiting; bool _octreePacketWaiting;
char* _lastOctreePayload = nullptr; unsigned int _lastOctreePacketLength { 0 };
unsigned int _lastOctreePacketLength; int _duplicatePacketCount { 0 };
int _duplicatePacketCount; quint64 _firstSuppressedPacket { usecTimestampNow() };
quint64 _firstSuppressedPacket;
int _maxSearchLevel; int _maxSearchLevel { 1 };
int _maxLevelReachedInLastSearch; int _maxLevelReachedInLastSearch { 1 };
ViewFrustum _currentViewFrustum; ViewFrustum _currentViewFrustum;
ViewFrustum _lastKnownViewFrustum; ViewFrustum _lastKnownViewFrustum;
quint64 _lastTimeBagEmpty; quint64 _lastTimeBagEmpty { 0 };
bool _viewFrustumChanging; bool _viewFrustumChanging { false };
bool _viewFrustumJustStoppedChanging; bool _viewFrustumJustStoppedChanging { true };
OctreeSendThread* _octreeSendThread; OctreeSendThread* _octreeSendThread { nullptr };
// watch for LOD changes // watch for LOD changes
int _lastClientBoundaryLevelAdjust; int _lastClientBoundaryLevelAdjust { 0 };
float _lastClientOctreeSizeScale; float _lastClientOctreeSizeScale { DEFAULT_OCTREE_SIZE_SCALE };
bool _lodChanged; bool _lodChanged { false };
bool _lodInitialized; bool _lodInitialized { false };
OCTREE_PACKET_SEQUENCE _sequenceNumber; OCTREE_PACKET_SEQUENCE _sequenceNumber { 0 };
quint64 _lastRootTimestamp; quint64 _lastRootTimestamp { 0 };
PacketType _myPacketType; PacketType _myPacketType { PacketType::Unknown };
bool _isShuttingDown; bool _isShuttingDown { false };
SentPacketHistory _sentPacketHistory; SentPacketHistory _sentPacketHistory;
QQueue<OCTREE_PACKET_SEQUENCE> _nackedSequenceNumbers; QQueue<OCTREE_PACKET_SEQUENCE> _nackedSequenceNumbers;
quint64 _sceneSendStartTime = 0; quint64 _sceneSendStartTime = 0;
std::array<char, udt::MAX_PACKET_SIZE> _lastOctreePayload;
}; };
#endif // hifi_OctreeQueryNode_h #endif // hifi_OctreeQueryNode_h

View file

@ -14,6 +14,7 @@
#include <udt/PacketHeaders.h> #include <udt/PacketHeaders.h>
#include <PerfStat.h> #include <PerfStat.h>
#include "OctreeQueryNode.h"
#include "OctreeSendThread.h" #include "OctreeSendThread.h"
#include "OctreeServer.h" #include "OctreeServer.h"
#include "OctreeServerConsts.h" #include "OctreeServerConsts.h"
@ -25,15 +26,12 @@ quint64 endSceneSleepTime = 0;
OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) : OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) :
_myServer(myServer), _myServer(myServer),
_node(node), _node(node),
_nodeUUID(node->getUUID()), _nodeUuid(node->getUUID())
_packetData(),
_nodeMissingCount(0),
_isShuttingDown(false)
{ {
QString safeServerName("Octree"); QString safeServerName("Octree");
// set our QThread object name so we can identify this thread while debugging // set our QThread object name so we can identify this thread while debugging
setObjectName(QString("Octree Send Thread (%1)").arg(uuidStringWithoutCurlyBraces(node->getUUID()))); setObjectName(QString("Octree Send Thread (%1)").arg(uuidStringWithoutCurlyBraces(_nodeUuid)));
if (_myServer) { if (_myServer) {
safeServerName = _myServer->getMyServerName(); safeServerName = _myServer->getMyServerName();
@ -46,6 +44,8 @@ OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePoint
} }
OctreeSendThread::~OctreeSendThread() { OctreeSendThread::~OctreeSendThread() {
setIsShuttingDown();
QString safeServerName("Octree"); QString safeServerName("Octree");
if (_myServer) { if (_myServer) {
safeServerName = _myServer->getMyServerName(); safeServerName = _myServer->getMyServerName();
@ -56,8 +56,6 @@ OctreeSendThread::~OctreeSendThread() {
OctreeServer::clientDisconnected(); OctreeServer::clientDisconnected();
OctreeServer::stopTrackingThread(this); OctreeServer::stopTrackingThread(this);
_node.clear();
} }
void OctreeSendThread::setIsShuttingDown() { void OctreeSendThread::setIsShuttingDown() {
@ -79,15 +77,17 @@ bool OctreeSendThread::process() {
// don't do any send processing until the initial load of the octree is complete... // don't do any send processing until the initial load of the octree is complete...
if (_myServer->isInitialLoadComplete()) { if (_myServer->isInitialLoadComplete()) {
if (_node) { if (auto node = _node.lock()) {
_nodeMissingCount = 0; _nodeMissingCount = 0;
OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(_node->getLinkedData()); OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(node->getLinkedData());
// Sometimes the node data has not yet been linked, in which case we can't really do anything // Sometimes the node data has not yet been linked, in which case we can't really do anything
if (nodeData && !nodeData->isShuttingDown()) { if (nodeData && !nodeData->isShuttingDown()) {
bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); bool viewFrustumChanged = nodeData->updateCurrentViewFrustum();
packetDistributor(nodeData, viewFrustumChanged); packetDistributor(node, nodeData, viewFrustumChanged);
} }
} else {
return false; // exit early if we're shutting down
} }
} }
@ -123,7 +123,8 @@ AtomicUIntStat OctreeSendThread::_totalSpecialBytes { 0 };
AtomicUIntStat OctreeSendThread::_totalSpecialPackets { 0 }; AtomicUIntStat OctreeSendThread::_totalSpecialPackets { 0 };
int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent) { int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, int& trueBytesSent,
int& truePacketsSent) {
OctreeServer::didHandlePacketSend(this); OctreeServer::didHandlePacketSend(this);
// if we're shutting down, then exit early // if we're shutting down, then exit early
@ -183,12 +184,12 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
// actually send it // actually send it
OctreeServer::didCallWriteDatagram(this); OctreeServer::didCallWriteDatagram(this);
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *_node); DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
packetSent = true; packetSent = true;
} else { } else {
// not enough room in the packet, send two packets // not enough room in the packet, send two packets
OctreeServer::didCallWriteDatagram(this); OctreeServer::didCallWriteDatagram(this);
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *_node); DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since // since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
// there was nothing else to send. // there was nothing else to send.
@ -219,7 +220,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
packetsSent++; packetsSent++;
OctreeServer::didCallWriteDatagram(this); OctreeServer::didCallWriteDatagram(this);
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *_node); DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
packetSent = true; packetSent = true;
int packetSizeWithHeader = nodeData->getPacket().getDataSize(); int packetSizeWithHeader = nodeData->getPacket().getDataSize();
@ -251,7 +252,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
if (nodeData->isPacketWaiting() && !nodeData->isShuttingDown()) { if (nodeData->isPacketWaiting() && !nodeData->isShuttingDown()) {
// just send the octree packet // just send the octree packet
OctreeServer::didCallWriteDatagram(this); OctreeServer::didCallWriteDatagram(this);
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *_node); DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
packetSent = true; packetSent = true;
int packetSizeWithHeader = nodeData->getPacket().getDataSize(); int packetSizeWithHeader = nodeData->getPacket().getDataSize();
@ -293,7 +294,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
} }
/// Version of octree element distributor that sends the deepest LOD level at once /// Version of octree element distributor that sends the deepest LOD level at once
int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) { int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged) {
OctreeServer::didPacketDistributor(this); OctreeServer::didPacketDistributor(this);
@ -322,7 +323,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// If we have a packet waiting, and our desired want color, doesn't match the current waiting packets color // If we have a packet waiting, and our desired want color, doesn't match the current waiting packets color
// then let's just send that waiting packet. // then let's just send that waiting packet.
if (nodeData->isPacketWaiting()) { if (nodeData->isPacketWaiting()) {
packetsSentThisInterval += handlePacketSend(nodeData, trueBytesSent, truePacketsSent); packetsSentThisInterval += handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
} else { } else {
nodeData->resetOctreePacket(); nodeData->resetOctreePacket();
} }
@ -355,7 +356,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
//unsigned long encodeTime = nodeData->stats.getTotalEncodeTime(); //unsigned long encodeTime = nodeData->stats.getTotalEncodeTime();
//unsigned long elapsedTime = nodeData->stats.getElapsedTime(); //unsigned long elapsedTime = nodeData->stats.getElapsedTime();
int packetsJustSent = handlePacketSend(nodeData, trueBytesSent, truePacketsSent); int packetsJustSent = handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
packetsSentThisInterval += packetsJustSent; packetsSentThisInterval += packetsJustSent;
// If we're starting a full scene, then definitely we want to empty the elementBag // If we're starting a full scene, then definitely we want to empty the elementBag
@ -431,8 +432,8 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// Our trackSend() function is implemented by the server subclass, and will be called back // Our trackSend() function is implemented by the server subclass, and will be called back
// during the encodeTreeBitstream() as new entities/data elements are sent // during the encodeTreeBitstream() as new entities/data elements are sent
params.trackSend = [this](const QUuid& dataID, quint64 dataEdited) { params.trackSend = [this, node](const QUuid& dataID, quint64 dataEdited) {
_myServer->trackSend(dataID, dataEdited, _nodeUUID); _myServer->trackSend(dataID, dataEdited, node->getUUID());
}; };
// TODO: should this include the lock time or not? This stat is sent down to the client, // TODO: should this include the lock time or not? This stat is sent down to the client,
@ -481,7 +482,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
unsigned int writtenSize = _packetData.getFinalizedSize() + sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE); unsigned int writtenSize = _packetData.getFinalizedSize() + sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
if (writtenSize > nodeData->getAvailable()) { if (writtenSize > nodeData->getAvailable()) {
packetsSentThisInterval += handlePacketSend(nodeData, trueBytesSent, truePacketsSent); packetsSentThisInterval += handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
} }
nodeData->writeToPacket(_packetData.getFinalizedData(), _packetData.getFinalizedSize()); nodeData->writeToPacket(_packetData.getFinalizedData(), _packetData.getFinalizedSize());
@ -501,7 +502,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
int targetSize = MAX_OCTREE_PACKET_DATA_SIZE; int targetSize = MAX_OCTREE_PACKET_DATA_SIZE;
if (sendNow) { if (sendNow) {
quint64 packetSendingStart = usecTimestampNow(); quint64 packetSendingStart = usecTimestampNow();
packetsSentThisInterval += handlePacketSend(nodeData, trueBytesSent, truePacketsSent); packetsSentThisInterval += handlePacketSend(node, nodeData, trueBytesSent, truePacketsSent);
quint64 packetSendingEnd = usecTimestampNow(); quint64 packetSendingEnd = usecTimestampNow();
packetSendingElapsedUsec = (float)(packetSendingEnd - packetSendingStart); packetSendingElapsedUsec = (float)(packetSendingEnd - packetSendingStart);
@ -538,9 +539,9 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// Here's where we can/should allow the server to send other data... // Here's where we can/should allow the server to send other data...
// send the environment packet // send the environment packet
// TODO: should we turn this into a while loop to better handle sending multiple special packets // TODO: should we turn this into a while loop to better handle sending multiple special packets
if (_myServer->hasSpecialPacketsToSend(_node) && !nodeData->isShuttingDown()) { if (_myServer->hasSpecialPacketsToSend(node) && !nodeData->isShuttingDown()) {
int specialPacketsSent = 0; int specialPacketsSent = 0;
trueBytesSent += _myServer->sendSpecialPackets(_node, nodeData, specialPacketsSent); trueBytesSent += _myServer->sendSpecialPackets(node, nodeData, specialPacketsSent);
nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed
truePacketsSent += specialPacketsSent; truePacketsSent += specialPacketsSent;
packetsSentThisInterval += specialPacketsSent; packetsSentThisInterval += specialPacketsSent;
@ -556,7 +557,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) { while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) {
const NLPacket* packet = nodeData->getNextNackedPacket(); const NLPacket* packet = nodeData->getNextNackedPacket();
if (packet) { if (packet) {
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packet, *_node); DependencyManager::get<NodeList>()->sendUnreliablePacket(*packet, *node);
truePacketsSent++; truePacketsSent++;
packetsSentThisInterval++; packetsSentThisInterval++;

View file

@ -18,8 +18,7 @@
#include <GenericThread.h> #include <GenericThread.h>
#include "OctreeQueryNode.h" class OctreeQueryNode;
class OctreeServer; class OctreeServer;
using AtomicUIntStat = std::atomic<uintmax_t>; using AtomicUIntStat = std::atomic<uintmax_t>;
@ -32,6 +31,9 @@ public:
virtual ~OctreeSendThread(); virtual ~OctreeSendThread();
void setIsShuttingDown(); void setIsShuttingDown();
bool isShuttingDown() { return _isShuttingDown; }
QUuid getNodeUuid() const { return _nodeUuid; }
static AtomicUIntStat _totalBytes; static AtomicUIntStat _totalBytes;
static AtomicUIntStat _totalWastedBytes; static AtomicUIntStat _totalWastedBytes;
@ -48,17 +50,18 @@ protected:
virtual bool process(); virtual bool process();
private: private:
OctreeServer* _myServer; int handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent);
SharedNodePointer _node; int packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged);
QUuid _nodeUUID;
int handlePacketSend(OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent); OctreeServer* _myServer { nullptr };
int packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged); QWeakPointer<Node> _node;
QUuid _nodeUuid;
OctreePacketData _packetData; OctreePacketData _packetData;
int _nodeMissingCount; int _nodeMissingCount { 0 };
bool _isShuttingDown; bool _isShuttingDown { false };
}; };
#endif // hifi_OctreeSendThread_h #endif // hifi_OctreeSendThread_h

View file

@ -25,9 +25,9 @@
#include "../AssignmentClient.h" #include "../AssignmentClient.h"
#include "OctreeQueryNode.h"
#include "OctreeServerConsts.h" #include "OctreeServerConsts.h"
OctreeServer* OctreeServer::_instance = NULL;
int OctreeServer::_clientCount = 0; int OctreeServer::_clientCount = 0;
const int MOVING_AVERAGE_SAMPLE_COUNTS = 1000000; const int MOVING_AVERAGE_SAMPLE_COUNTS = 1000000;
@ -231,13 +231,6 @@ OctreeServer::OctreeServer(ReceivedMessage& message) :
_started(time(0)), _started(time(0)),
_startedUSecs(usecTimestampNow()) _startedUSecs(usecTimestampNow())
{ {
if (_instance) {
qDebug() << "Octree Server starting... while old instance still running _instance=["<<_instance<<"] this=[" << this << "]";
}
qDebug() << "Octree Server starting... setting _instance to=[" << this << "]";
_instance = this;
_averageLoopTime.updateAverage(0); _averageLoopTime.updateAverage(0);
qDebug() << "Octree server starting... [" << this << "]"; qDebug() << "Octree server starting... [" << this << "]";
@ -281,9 +274,6 @@ OctreeServer::~OctreeServer() {
_tree.reset(); _tree.reset();
qDebug() << qPrintable(_safeServerName) << "server DONE cleaning up octree... [" << this << "]"; qDebug() << qPrintable(_safeServerName) << "server DONE cleaning up octree... [" << this << "]";
if (_instance == this) {
_instance = NULL; // we are gone
}
qDebug() << qPrintable(_safeServerName) << "server DONE shutting down... [" << this << "]"; qDebug() << qPrintable(_safeServerName) << "server DONE shutting down... [" << this << "]";
} }
@ -878,16 +868,38 @@ void OctreeServer::parsePayload() {
} }
} }
OctreeServer::UniqueSendThread OctreeServer::createSendThread(const SharedNodePointer& node) {
auto sendThread = std::unique_ptr<OctreeSendThread>(new OctreeSendThread(this, node));
// we want to be notified when the thread finishes
connect(sendThread.get(), &GenericThread::finished, this, &OctreeServer::removeSendThread);
sendThread->initialize(true);
return sendThread;
}
void OctreeServer::removeSendThread() {
// If the object has been deleted since the event was queued, sender() will return nullptr
if (auto sendThread = qobject_cast<OctreeSendThread*>(sender())) {
// This deletes the unique_ptr, so sendThread is destructed after that line
_sendThreads.erase(sendThread->getNodeUuid());
}
}
void OctreeServer::handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) { void OctreeServer::handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
if (!_isFinished) { if (!_isFinished && !_isShuttingDown) {
// If we got a query packet, then we're talking to an agent, and we // If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList. // need to make sure we have it in our nodeList.
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
nodeList->updateNodeWithDataFromPacket(message, senderNode); nodeList->updateNodeWithDataFromPacket(message, senderNode);
OctreeQueryNode* nodeData = dynamic_cast<OctreeQueryNode*>(senderNode->getLinkedData()); auto it = _sendThreads.find(senderNode->getUUID());
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) { if (it == _sendThreads.end()) {
nodeData->initializeOctreeSendThread(this, senderNode); _sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode));
} else if (it->second->isShuttingDown()) {
_sendThreads.erase(it); // Remove right away and wait on thread to be
_sendThreads.emplace(senderNode->getUUID(), createSendThread(senderNode));
} }
} }
} }
@ -1117,8 +1129,8 @@ void OctreeServer::domainSettingsRequestComplete() {
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
#endif #endif
nodeList->linkedDataCreateCallback = [] (Node* node) { nodeList->linkedDataCreateCallback = [this](Node* node) {
auto queryNodeData = _instance->createOctreeQueryNode(); auto queryNodeData = createOctreeQueryNode();
queryNodeData->init(); queryNodeData->init();
node->setLinkedData(std::move(queryNodeData)); node->setLinkedData(std::move(queryNodeData));
}; };
@ -1167,6 +1179,13 @@ void OctreeServer::nodeAdded(SharedNodePointer node) {
void OctreeServer::nodeKilled(SharedNodePointer node) { void OctreeServer::nodeKilled(SharedNodePointer node) {
quint64 start = usecTimestampNow(); quint64 start = usecTimestampNow();
// Shutdown send thread
auto it = _sendThreads.find(node->getUUID());
if (it != _sendThreads.end()) {
auto& sendThread = *it->second;
sendThread.setIsShuttingDown();
}
// calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!! // calling this here since nodeKilled slot in ReceivedPacketProcessor can't be triggered by signals yet!!
_octreeInboundPacketProcessor->nodeKilled(node); _octreeInboundPacketProcessor->nodeKilled(node);
@ -1188,24 +1207,6 @@ void OctreeServer::nodeKilled(SharedNodePointer node) {
trackViewerGone(node->getUUID()); trackViewerGone(node->getUUID());
} }
void OctreeServer::forceNodeShutdown(SharedNodePointer node) {
quint64 start = usecTimestampNow();
qDebug() << qPrintable(_safeServerName) << "server killed node:" << *node;
OctreeQueryNode* nodeData = dynamic_cast<OctreeQueryNode*>(node->getLinkedData());
if (nodeData) {
nodeData->forceNodeShutdown(); // tell our node data and sending threads that we'd like to shut down
} else {
qDebug() << qPrintable(_safeServerName) << "server node missing linked data node:" << *node;
}
quint64 end = usecTimestampNow();
quint64 usecsElapsed = (end - start);
qDebug() << qPrintable(_safeServerName) << "server forceNodeShutdown() took: "
<< usecsElapsed << " usecs for node:" << *node;
}
void OctreeServer::aboutToFinish() { void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish..."; qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish...";
@ -1214,9 +1215,8 @@ void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down..."; qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
// we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects. // we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects.
// This ensures that when we forceNodeShutdown below for each node we don't get any more newly connecting nodes // This ensures that we don't get any more newly connecting nodes
auto nodeList = DependencyManager::get<NodeList>(); DependencyManager::get<NodeList>()->linkedDataCreateCallback = nullptr;
nodeList->linkedDataCreateCallback = NULL;
if (_octreeInboundPacketProcessor) { if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->terminating(); _octreeInboundPacketProcessor->terminating();
@ -1226,21 +1226,15 @@ void OctreeServer::aboutToFinish() {
_jurisdictionSender->terminating(); _jurisdictionSender->terminating();
} }
QSet<SharedNodePointer> nodesToShutdown; // Shut down all the send threads
for (auto& it : _sendThreads) {
// Force a shutdown of all of our OctreeSendThreads. auto& sendThread = *it.second;
// At this point it has to be impossible for a linkedDataCreateCallback to be called for a new node sendThread.setIsShuttingDown();
nodeList->eachNode([&nodesToShutdown](const SharedNodePointer& node) {
nodesToShutdown << node;
});
// What follows is a hack to force OctreeSendThreads to cleanup before the OctreeServer is gone.
// I would prefer to allow the SharedNodePointer ref count drop to zero to do this automatically
// but that isn't possible as long as the OctreeSendThread has an OctreeServer* that it uses.
for (auto& node : nodesToShutdown) {
qDebug() << qPrintable(_safeServerName) << "server about to finish while node still connected node:" << *node;
forceNodeShutdown(node);
} }
// Clear will destruct all the unique_ptr to OctreeSendThreads which will call the GenericThread's dtor
// which waits on the thread to be done before returning
_sendThreads.clear(); // Cleans up all the send threads.
if (_persistThread) { if (_persistThread) {
_persistThread->aboutToFinish(); _persistThread->aboutToFinish();
@ -1459,15 +1453,22 @@ void OctreeServer::didCallWriteDatagram(OctreeSendThread* thread) {
void OctreeServer::stopTrackingThread(OctreeSendThread* thread) { void OctreeServer::stopTrackingThread(OctreeSendThread* thread) {
QMutexLocker lockerA(&_threadsDidProcessMutex); {
QMutexLocker lockerB(&_threadsDidPacketDistributorMutex); QMutexLocker locker(&_threadsDidProcessMutex);
QMutexLocker lockerC(&_threadsDidHandlePacketSendMutex); _threadsDidProcess.remove(thread);
QMutexLocker lockerD(&_threadsDidCallWriteDatagramMutex); }
{
_threadsDidProcess.remove(thread); QMutexLocker locker(&_threadsDidPacketDistributorMutex);
_threadsDidPacketDistributor.remove(thread); _threadsDidPacketDistributor.remove(thread);
_threadsDidHandlePacketSend.remove(thread); }
_threadsDidCallWriteDatagram.remove(thread); {
QMutexLocker locker(&_threadsDidHandlePacketSendMutex);
_threadsDidHandlePacketSend.remove(thread);
}
{
QMutexLocker locker(&_threadsDidCallWriteDatagramMutex);
_threadsDidCallWriteDatagram.remove(thread);
}
} }
int howManyThreadsDidSomething(QMutex& mutex, QMap<OctreeSendThread*, quint64>& something, quint64 since) { int howManyThreadsDidSomething(QMutex& mutex, QMap<OctreeSendThread*, quint64>& something, quint64 since) {

View file

@ -124,7 +124,6 @@ public:
bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler); bool handleHTTPRequest(HTTPConnection* connection, const QUrl& url, bool skipSubHandler);
virtual void aboutToFinish(); virtual void aboutToFinish();
void forceNodeShutdown(SharedNodePointer node);
public slots: public slots:
/// runs the octree server assignment /// runs the octree server assignment
@ -138,8 +137,12 @@ private slots:
void handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode); void handleOctreeQueryPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleOctreeDataNackPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode); void handleOctreeDataNackPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleJurisdictionRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode); void handleJurisdictionRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void removeSendThread();
protected: protected:
using UniqueSendThread = std::unique_ptr<OctreeSendThread>;
using SendThreads = std::unordered_map<QUuid, UniqueSendThread>;
virtual OctreePointer createTree() = 0; virtual OctreePointer createTree() = 0;
bool readOptionBool(const QString& optionName, const QJsonObject& settingsSectionObject, bool& result); bool readOptionBool(const QString& optionName, const QJsonObject& settingsSectionObject, bool& result);
bool readOptionInt(const QString& optionName, const QJsonObject& settingsSectionObject, int& result); bool readOptionInt(const QString& optionName, const QJsonObject& settingsSectionObject, int& result);
@ -153,6 +156,8 @@ protected:
QString getFileLoadTime(); QString getFileLoadTime();
QString getConfiguration(); QString getConfiguration();
QString getStatusLink(); QString getStatusLink();
UniqueSendThread createSendThread(const SharedNodePointer& node);
int _argc; int _argc;
const char** _argv; const char** _argv;
@ -187,11 +192,11 @@ protected:
int _backupInterval; int _backupInterval;
int _maxBackupVersions; int _maxBackupVersions;
static OctreeServer* _instance;
time_t _started; time_t _started;
quint64 _startedUSecs; quint64 _startedUSecs;
QString _safeServerName; QString _safeServerName;
SendThreads _sendThreads;
static int _clientCount; static int _clientCount;
static SimpleMovingAverage _averageLoopTime; static SimpleMovingAverage _averageLoopTime;

View file

@ -23,7 +23,7 @@ const int MAX_PAYLOAD_BYTES = 1024;
const QString emptyPool = QString(); const QString emptyPool = QString();
/// Holds information used for request, creation, and deployment of assignments /// Holds information used for request, creation, and deployment of assignments
class Assignment : public NodeData { class Assignment : public QObject {
Q_OBJECT Q_OBJECT
public: public:

View file

@ -40,7 +40,6 @@ const char SOLO_NODE_TYPES[2] = {
}; };
LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) : LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) :
linkedDataCreateCallback(NULL),
_sessionUUID(), _sessionUUID(),
_nodeHash(), _nodeHash(),
_nodeMutex(QReadWriteLock::Recursive), _nodeMutex(QReadWriteLock::Recursive),

View file

@ -129,7 +129,7 @@ public:
qint64 sendPacketList(std::unique_ptr<NLPacketList> packetList, const HifiSockAddr& sockAddr); qint64 sendPacketList(std::unique_ptr<NLPacketList> packetList, const HifiSockAddr& sockAddr);
qint64 sendPacketList(std::unique_ptr<NLPacketList> packetList, const Node& destinationNode); qint64 sendPacketList(std::unique_ptr<NLPacketList> packetList, const Node& destinationNode);
void (*linkedDataCreateCallback)(Node *); std::function<void(Node*)> linkedDataCreateCallback;
size_t size() const { return _nodeHash.size(); } size_t size() const { return _nodeHash.size(); }

View file

@ -15,7 +15,6 @@
GenericThread::GenericThread() : GenericThread::GenericThread() :
QObject(),
_stopThread(false), _stopThread(false),
_isThreaded(false) // assume non-threaded, must call initialize() _isThreaded(false) // assume non-threaded, must call initialize()
{ {
@ -38,9 +37,10 @@ void GenericThread::initialize(bool isThreaded, QThread::Priority priority) {
_thread->setObjectName(objectName()); _thread->setObjectName(objectName());
// when the worker thread is started, call our engine's run.. // when the worker thread is started, call our engine's run..
connect(_thread, SIGNAL(started()), this, SLOT(threadRoutine())); connect(_thread, &QThread::started, this, &GenericThread::threadRoutine);
connect(_thread, &QThread::finished, this, &GenericThread::finished);
this->moveToThread(_thread); moveToThread(_thread);
// Starts an event loop, and emits _thread->started() // Starts an event loop, and emits _thread->started()
_thread->start(); _thread->start();
@ -82,5 +82,4 @@ void GenericThread::threadRoutine() {
if (_isThreaded && _thread) { if (_isThreaded && _thread) {
_thread->quit(); _thread->quit();
} }
emit finished();
} }