Merge pull request #4823 from birarda/master

repairs to AC stop and clean shutdown
This commit is contained in:
Seth Alves 2015-05-11 18:11:52 -07:00
commit 866a6fba9b
30 changed files with 369 additions and 281 deletions

View file

@ -224,5 +224,7 @@ void Agent::run() {
void Agent::aboutToFinish() {
_scriptEngine.stop();
NetworkAccessManager::getInstance().clearAccessCache();
// our entity tree is going to go away so tell that to the EntityScriptingInterface
DependencyManager::get<EntityScriptingInterface>()->setEntityTree(NULL);
}

View file

@ -9,6 +9,8 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <assert.h>
#include <QProcess>
#include <QSettings>
#include <QSharedMemory>
@ -30,22 +32,19 @@
#include <SoundCache.h>
#include "AssignmentFactory.h"
#include "AssignmentThread.h"
#include "AssignmentClient.h"
const QString ASSIGNMENT_CLIENT_TARGET_NAME = "assignment-client";
const long long ASSIGNMENT_REQUEST_INTERVAL_MSECS = 1 * 1000;
SharedAssignmentPointer AssignmentClient::_currentAssignment;
int hifiSockAddrMeta = qRegisterMetaType<HifiSockAddr>("HifiSockAddr");
AssignmentClient::AssignmentClient(int ppid, Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort) :
AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
quint16 assignmentMonitorPort) :
_assignmentServerHostname(DEFAULT_ASSIGNMENT_SERVER_HOSTNAME),
_localASPortSharedMem(NULL),
_localACMPortSharedMem(NULL)
_localASPortSharedMem(NULL)
{
LogUtils::init();
@ -108,19 +107,39 @@ AssignmentClient::AssignmentClient(int ppid, Assignment::Type requestAssignmentT
// Create Singleton objects on main thread
NetworkAccessManager::getInstance();
// Hook up a timer to send this child's status to the Monitor once per second
setUpStatsToMonitor(ppid);
// did we get an assignment-client monitor port?
if (assignmentMonitorPort > 0) {
_assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, assignmentMonitorPort);
qDebug() << "Assignment-client monitor socket is" << _assignmentClientMonitorSocket;
// Hook up a timer to send this child's status to the Monitor once per second
setUpStatsToMonitor();
}
}
void AssignmentClient::stopAssignmentClient() {
qDebug() << "Exiting.";
qDebug() << "Forced stop of assignment-client.";
_requestTimer.stop();
_statsTimerACM.stop();
if (_currentAssignment) {
_currentAssignment->aboutToQuit();
// grab the thread for the current assignment
QThread* currentAssignmentThread = _currentAssignment->thread();
// ask the current assignment to stop
QMetaObject::invokeMethod(_currentAssignment, "stop", Qt::BlockingQueuedConnection);
// ask the current assignment to delete itself on its thread
_currentAssignment->deleteLater();
// when this thread is destroyed we don't need to run our assignment complete method
disconnect(currentAssignmentThread, &QThread::destroyed, this, &AssignmentClient::assignmentCompleted);
// wait on the thread from that assignment - it will be gone once the current assignment deletes
currentAssignmentThread->quit();
currentAssignmentThread->wait();
}
@ -129,24 +148,13 @@ void AssignmentClient::stopAssignmentClient() {
void AssignmentClient::aboutToQuit() {
stopAssignmentClient();
// clear the log handler so that Qt doesn't call the destructor on LogHandler
qInstallMessageHandler(0);
// clear out pointer to the assignment so the destructor gets called. if we don't do this here,
// it will get destroyed along with all the other "static" stuff. various static member variables
// will be destroyed first and things go wrong.
_currentAssignment.clear();
qInstallMessageHandler(0);
}
void AssignmentClient::setUpStatsToMonitor(int ppid) {
// Figure out the address to send out stats to
quint16 localMonitorServerPort = DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT;
auto nodeList = DependencyManager::get<NodeList>();
nodeList->getLocalServerPortFromSharedMemory(QString(ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY) + "-" +
QString::number(ppid), _localACMPortSharedMem, localMonitorServerPort);
_assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, localMonitorServerPort, true);
void AssignmentClient::setUpStatsToMonitor() {
// send a stats packet every 1 seconds
connect(&_statsTimerACM, &QTimer::timeout, this, &AssignmentClient::sendStatsPacketToACM);
_statsTimerACM.start(1000);
@ -202,8 +210,11 @@ void AssignmentClient::readPendingDatagrams() {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (packetTypeForPacket(receivedPacket) == PacketTypeCreateAssignment) {
qDebug() << "Received a PacketTypeCreateAssignment - attempting to unpack.";
// construct the deployed assignment from the packet data
_currentAssignment = SharedAssignmentPointer(AssignmentFactory::unpackAssignment(receivedPacket));
_currentAssignment = AssignmentFactory::unpackAssignment(receivedPacket);
if (_currentAssignment) {
qDebug() << "Received an assignment -" << *_currentAssignment;
@ -216,15 +227,24 @@ void AssignmentClient::readPendingDatagrams() {
qDebug() << "Destination IP for assignment is" << nodeList->getDomainHandler().getIP().toString();
// start the deployed assignment
AssignmentThread* workerThread = new AssignmentThread(_currentAssignment, this);
workerThread->setObjectName("worker");
QThread* workerThread = new QThread;
workerThread->setObjectName("ThreadedAssignment Worker");
connect(workerThread, &QThread::started, _currentAssignment.data(), &ThreadedAssignment::run);
connect(_currentAssignment.data(), &ThreadedAssignment::finished, workerThread, &QThread::quit);
connect(_currentAssignment.data(), &ThreadedAssignment::finished,
this, &AssignmentClient::assignmentCompleted);
// once the ThreadedAssignment says it is finished - we ask it to deleteLater
connect(_currentAssignment.data(), &ThreadedAssignment::finished, _currentAssignment.data(),
&ThreadedAssignment::deleteLater);
// once it is deleted, we quit the worker thread
connect(_currentAssignment.data(), &ThreadedAssignment::destroyed, workerThread, &QThread::quit);
// have the worker thread remove itself once it is done
connect(workerThread, &QThread::finished, workerThread, &QThread::deleteLater);
// once the worker thread says it is done, we consider the assignment completed
connect(workerThread, &QThread::destroyed, this, &AssignmentClient::assignmentCompleted);
_currentAssignment->moveToThread(workerThread);
// move the NodeList to the thread used for the _current assignment
@ -243,8 +263,9 @@ void AssignmentClient::readPendingDatagrams() {
} else if (packetTypeForPacket(receivedPacket) == PacketTypeStopNode) {
if (senderSockAddr.getAddress() == QHostAddress::LocalHost ||
senderSockAddr.getAddress() == QHostAddress::LocalHostIPv6) {
qDebug() << "Network told me to exit.";
emit stopAssignmentClient();
qDebug() << "AssignmentClientMonitor at" << senderSockAddr << "requested stop via PacketTypeStopNode.";
QCoreApplication::quit();
} else {
qDebug() << "Got a stop packet from other than localhost.";
}
@ -281,20 +302,22 @@ void AssignmentClient::handleAuthenticationRequest() {
}
void AssignmentClient::assignmentCompleted() {
// we expect that to be here the previous assignment has completely cleaned up
assert(_currentAssignment.isNull());
// reset our current assignment pointer to NULL now that it has been deleted
_currentAssignment = NULL;
// reset the logging target to the the CHILD_TARGET_NAME
LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME);
qDebug("Assignment finished or never started - waiting for new assignment.");
qDebug() << "Assignment finished or never started - waiting for new assignment.";
auto nodeList = DependencyManager::get<NodeList>();
// have us handle incoming NodeList datagrams again
disconnect(&nodeList->getNodeSocket(), 0, _currentAssignment.data(), 0);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
// clear our current assignment shared pointer now that we're done with it
// if the assignment thread is still around it has its own shared pointer to the assignment
_currentAssignment.clear();
// have us handle incoming NodeList datagrams again, and make sure our ThreadedAssignment isn't handling them
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
// reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NodeType::Unassigned);

View file

@ -13,6 +13,7 @@
#define hifi_AssignmentClient_h
#include <QtCore/QCoreApplication>
#include <QtCore/QPointer>
#include "ThreadedAssignment.h"
@ -22,10 +23,9 @@ class AssignmentClient : public QObject {
Q_OBJECT
public:
AssignmentClient(int ppid, Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort);
static const SharedAssignmentPointer& getCurrentAssignment() { return _currentAssignment; }
AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort,
quint16 assignmentMonitorPort);
private slots:
void sendAssignmentRequest();
void readPendingDatagrams();
@ -38,13 +38,13 @@ public slots:
void aboutToQuit();
private:
void setUpStatsToMonitor(int ppid);
void setUpStatsToMonitor();
Assignment _requestAssignment;
static SharedAssignmentPointer _currentAssignment;
QPointer<ThreadedAssignment> _currentAssignment;
QString _assignmentServerHostname;
HifiSockAddr _assignmentServerSocket;
QSharedMemory* _localASPortSharedMem; // memory shared with domain server
QSharedMemory* _localACMPortSharedMem; // memory shared with assignment client monitor
QTimer _requestTimer; // timer for requesting and assignment
QTimer _statsTimerACM; // timer for sending stats to assignment client monitor

View file

@ -79,9 +79,8 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
const QCommandLineOption maxChildsOption(ASSIGNMENT_MAX_FORKS_OPTION, "maximum number of children", "child-count");
parser.addOption(maxChildsOption);
const QCommandLineOption ppidOption(PARENT_PID_OPTION, "parent's process id", "pid");
parser.addOption(ppidOption);
const QCommandLineOption monitorPortOption(ASSIGNMENT_CLIENT_MONITOR_PORT_OPTION, "assignment-client monitor port", "port");
parser.addOption(monitorPortOption);
if (!parser.parse(QCoreApplication::arguments())) {
qCritical() << parser.errorText() << endl;
@ -113,9 +112,9 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
maxForks = parser.value(maxChildsOption).toInt();
}
int ppid = 0;
if (parser.isSet(ppidOption)) {
ppid = parser.value(ppidOption).toInt();
unsigned short monitorPort = 0;
if (parser.isSet(monitorPortOption)) {
monitorPort = parser.value(monitorPortOption).toUShort();
}
if (!numForks && minForks) {
@ -162,12 +161,11 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toString().toUInt();
}
if (parser.isSet(assignmentServerPortOption)) {
assignmentServerPort = parser.value(assignmentServerPortOption).toInt();
}
if (parser.isSet(numChildsOption)) {
if (minForks && minForks > numForks) {
qCritical() << "--min can't be more than -n";
@ -186,14 +184,17 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
if (numForks || minForks || maxForks) {
AssignmentClientMonitor monitor(numForks, minForks, maxForks, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &monitor, &AssignmentClientMonitor::aboutToQuit);
exec();
AssignmentClientMonitor* monitor = new AssignmentClientMonitor(numForks, minForks, maxForks,
requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname,
assignmentServerPort);
monitor->setParent(this);
connect(this, &QCoreApplication::aboutToQuit, monitor, &AssignmentClientMonitor::aboutToQuit);
} else {
AssignmentClient client(ppid, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &client, &AssignmentClient::aboutToQuit);
exec();
AssignmentClient* client = new AssignmentClient(requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname,
assignmentServerPort, monitorPort);
client->setParent(this);
connect(this, &QCoreApplication::aboutToQuit, client, &AssignmentClient::aboutToQuit);
}
}

View file

@ -23,7 +23,7 @@ const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "p";
const QString ASSIGNMENT_NUM_FORKS_OPTION = "n";
const QString ASSIGNMENT_MIN_FORKS_OPTION = "min";
const QString ASSIGNMENT_MAX_FORKS_OPTION = "max";
const QString PARENT_PID_OPTION = "ppid";
const QString ASSIGNMENT_CLIENT_MONITOR_PORT_OPTION = "monitor-port";
class AssignmentClientApp : public QCoreApplication {

View file

@ -23,7 +23,7 @@
const QString ASSIGNMENT_CLIENT_MONITOR_TARGET_NAME = "assignment-client-monitor";
const int WAIT_FOR_CHILD_MSECS = 500;
const int WAIT_FOR_CHILD_MSECS = 1000;
AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmentClientForks,
const unsigned int minAssignmentClientForks,
@ -41,22 +41,20 @@ AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmen
_assignmentServerPort(assignmentServerPort)
{
qDebug() << "_requestAssignmentType =" << _requestAssignmentType;
// start the Logging class with the parent's target name
LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_MONITOR_TARGET_NAME);
// make sure we output process IDs for a monitor otherwise it's insane to parse
LogHandler::getInstance().setShouldOutputPID(true);
// create a NodeList so we can receive stats from children
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
auto addressManager = DependencyManager::set<AddressManager>();
auto nodeList = DependencyManager::set<LimitedNodeList>(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT);
auto nodeList = DependencyManager::set<LimitedNodeList>();
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClientMonitor::readPendingDatagrams);
qint64 pid = QCoreApplication::applicationPid ();
nodeList->putLocalPortIntoSharedMemory(QString(ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY) + "-" + QString::number(pid),
this, nodeList->getNodeSocket().localPort());
// use QProcess to fork off a process for each of the child assignment clients
for (unsigned int i = 0; i < _numAssignmentClientForks; i++) {
spawnChildClient();
@ -71,61 +69,60 @@ AssignmentClientMonitor::~AssignmentClientMonitor() {
stopChildProcesses();
}
void AssignmentClientMonitor::waitOnChildren(int msecs) {
QMutableListIterator<QProcess*> i(_childProcesses);
while (i.hasNext()) {
QProcess* childProcess = i.next();
bool finished = childProcess->waitForFinished(msecs);
if (finished) {
i.remove();
}
void AssignmentClientMonitor::simultaneousWaitOnChildren(int waitMsecs) {
QElapsedTimer waitTimer;
waitTimer.start();
// loop as long as we still have processes around and we're inside the wait window
while(_childProcesses.size() > 0 && !waitTimer.hasExpired(waitMsecs)) {
// continue processing events so we can handle a process finishing up
QCoreApplication::processEvents();
}
}
void AssignmentClientMonitor::childProcessFinished() {
QProcess* childProcess = qobject_cast<QProcess*>(sender());
qint64 processID = _childProcesses.key(childProcess);
if (processID > 0) {
qDebug() << "Child process" << processID << "has finished. Removing from internal map.";
_childProcesses.remove(processID);
}
}
void AssignmentClientMonitor::stopChildProcesses() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachNode([&](const SharedNodePointer& node) {
qDebug() << "asking child" << node->getUUID() << "to exit.";
node->activateLocalSocket();
QByteArray diePacket = nodeList->byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, *node->getActiveSocket());
});
// try to give all the children time to shutdown
waitOnChildren(WAIT_FOR_CHILD_MSECS);
// ask more firmly
QMutableListIterator<QProcess*> i(_childProcesses);
while (i.hasNext()) {
QProcess* childProcess = i.next();
// ask child processes to terminate
foreach(QProcess* childProcess, _childProcesses) {
qDebug() << "Attempting to terminate child process" << childProcess->processId();
childProcess->terminate();
}
// try to give all the children time to shutdown
waitOnChildren(WAIT_FOR_CHILD_MSECS);
// ask even more firmly
QMutableListIterator<QProcess*> j(_childProcesses);
while (j.hasNext()) {
QProcess* childProcess = j.next();
childProcess->kill();
simultaneousWaitOnChildren(WAIT_FOR_CHILD_MSECS);
if (_childProcesses.size() > 0) {
// ask even more firmly
foreach(QProcess* childProcess, _childProcesses) {
qDebug() << "Attempting to kill child process" << childProcess->processId();
childProcess->kill();
}
simultaneousWaitOnChildren(WAIT_FOR_CHILD_MSECS);
}
waitOnChildren(WAIT_FOR_CHILD_MSECS);
}
void AssignmentClientMonitor::aboutToQuit() {
stopChildProcesses();
// clear the log handler so that Qt doesn't call the destructor on LogHandler
qInstallMessageHandler(0);
}
void AssignmentClientMonitor::spawnChildClient() {
QProcess *assignmentClient = new QProcess(this);
_childProcesses.append(assignmentClient);
QProcess* assignmentClient = new QProcess(this);
// unparse the parts of the command-line that the child cares about
QStringList _childArguments;
if (_assignmentPool != "") {
@ -149,17 +146,21 @@ void AssignmentClientMonitor::spawnChildClient() {
_childArguments.append(QString::number(_requestAssignmentType));
}
// tell children which shared memory key to use
qint64 pid = QCoreApplication::applicationPid ();
_childArguments.append("--" + PARENT_PID_OPTION);
_childArguments.append(QString::number(pid));
// tell children which assignment monitor port to use
// for now they simply talk to us on localhost
_childArguments.append("--" + ASSIGNMENT_CLIENT_MONITOR_PORT_OPTION);
_childArguments.append(QString::number(DependencyManager::get<NodeList>()->getLocalSockAddr().getPort()));
// make sure that the output from the child process appears in our output
assignmentClient->setProcessChannelMode(QProcess::ForwardedChannels);
assignmentClient->start(QCoreApplication::applicationFilePath(), _childArguments);
// make sure we hear that this process has finished when it does
connect(assignmentClient, SIGNAL(finished(int, QProcess::ExitStatus)), this, SLOT(childProcessFinished()));
qDebug() << "Spawned a child client with PID" << assignmentClient->pid();
_childProcesses.insert(assignmentClient->processId(), assignmentClient);
}
void AssignmentClientMonitor::checkSpares() {
@ -193,12 +194,11 @@ void AssignmentClientMonitor::checkSpares() {
qDebug() << "asking child" << aSpareId << "to exit.";
SharedNodePointer childNode = nodeList->nodeWithUUID(aSpareId);
childNode->activateLocalSocket();
QByteArray diePacket = nodeList->byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, childNode);
}
}
waitOnChildren(0);
}

View file

@ -33,17 +33,19 @@ public:
quint16 assignmentServerPort);
~AssignmentClientMonitor();
void waitOnChildren(int msecs);
void stopChildProcesses();
private slots:
void readPendingDatagrams();
void checkSpares();
void childProcessFinished();
public slots:
void aboutToQuit();
private:
void spawnChildClient();
void simultaneousWaitOnChildren(int waitMsecs);
QTimer _checkSparesTimer; // every few seconds see if it need fewer or more spare children
const unsigned int _numAssignmentClientForks;
@ -56,7 +58,7 @@ private:
QString _assignmentServerHostname;
quint16 _assignmentServerPort;
QList<QProcess*> _childProcesses;
QMap<qint64, QProcess*> _childProcesses;
};
#endif // hifi_AssignmentClientMonitor_h

View file

@ -1,19 +0,0 @@
//
// AssignmentThread.cpp
// assignment-client/src
//
// Created by Stephen Birarda on 2014.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "AssignmentThread.h"
AssignmentThread::AssignmentThread(const SharedAssignmentPointer& assignment, QObject* parent) :
QThread(parent),
_assignment(assignment)
{
}

View file

@ -1,26 +0,0 @@
//
// AssignmentThread.h
// assignment-client/src
//
// Created by Stephen Birarda on 2014.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_AssignmentThread_h
#define hifi_AssignmentThread_h
#include <QtCore/QThread>
#include <ThreadedAssignment.h>
class AssignmentThread : public QThread {
public:
AssignmentThread(const SharedAssignmentPointer& assignment, QObject* parent);
private:
SharedAssignmentPointer _assignment;
};
#endif // hifi_AssignmentThread_h

View file

@ -851,7 +851,7 @@ void AudioMixer::run() {
++_numStatFrames;
QCoreApplication::processEvents();
if (_isFinished) {
break;
}

View file

@ -52,6 +52,7 @@ AvatarMixer::~AvatarMixer() {
if (_broadcastTimer) {
_broadcastTimer->deleteLater();
}
_broadcastThread.quit();
_broadcastThread.wait();
}
@ -517,7 +518,7 @@ void AvatarMixer::run() {
};
// setup the timer that will be fired on the broadcast thread
_broadcastTimer = new QTimer();
_broadcastTimer = new QTimer;
_broadcastTimer->setInterval(AVATAR_DATA_SEND_INTERVAL_MSECS);
_broadcastTimer->moveToThread(&_broadcastThread);

View file

@ -9,10 +9,15 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "AssignmentClientApp.h"
#include <QtCore/QDebug>
int main(int argc, char* argv[]) {
AssignmentClientApp app(argc, argv);
return 0;
int acReturn = app.exec();
qDebug() << "assignment-client process" << app.applicationPid() << "exiting with status code" << acReturn;
return acReturn;
}

View file

@ -9,11 +9,15 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "PacketHeaders.h"
#include "SharedUtil.h"
#include "OctreeQueryNode.h"
#include <cstring>
#include <cstdio>
#include <PacketHeaders.h>
#include <SharedUtil.h>
#include <UUID.h>
#include "OctreeSendThread.h"
OctreeQueryNode::OctreeQueryNode() :
@ -91,8 +95,8 @@ void OctreeQueryNode::sendThreadFinished() {
}
}
void OctreeQueryNode::initializeOctreeSendThread(const SharedAssignmentPointer& myAssignment, const SharedNodePointer& node) {
_octreeSendThread = new OctreeSendThread(myAssignment, node);
void OctreeQueryNode::initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) {
_octreeSendThread = new OctreeSendThread(myServer, node);
// we want to be notified when the thread finishes
connect(_octreeSendThread, &GenericThread::finished, this, &OctreeQueryNode::sendThreadFinished);

View file

@ -22,11 +22,11 @@
#include <OctreePacketData.h>
#include <OctreeQuery.h>
#include <OctreeSceneStats.h>
#include <ThreadedAssignment.h> // for SharedAssignmentPointer
#include "SentPacketHistory.h"
#include <qqueue.h>
class OctreeSendThread;
class OctreeServer;
class OctreeQueryNode : public OctreeQuery {
Q_OBJECT
@ -89,7 +89,7 @@ public:
OctreeSceneStats stats;
void initializeOctreeSendThread(const SharedAssignmentPointer& myAssignment, const SharedNodePointer& node);
void initializeOctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node);
bool isOctreeSendThreadInitalized() { return _octreeSendThread; }
void dumpOutOfView();

View file

@ -21,9 +21,8 @@
quint64 startSceneSleepTime = 0;
quint64 endSceneSleepTime = 0;
OctreeSendThread::OctreeSendThread(const SharedAssignmentPointer& myAssignment, const SharedNodePointer& node) :
_myAssignment(myAssignment),
_myServer(static_cast<OctreeServer*>(myAssignment.data())),
OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node) :
_myServer(myServer),
_node(node),
_nodeUUID(node->getUUID()),
_packetData(),
@ -31,9 +30,14 @@ OctreeSendThread::OctreeSendThread(const SharedAssignmentPointer& myAssignment,
_isShuttingDown(false)
{
QString safeServerName("Octree");
// set our QThread object name so we can identify this thread while debugging
setObjectName(QString("Octree Send Thread (%1)").arg(uuidStringWithoutCurlyBraces(node->getUUID())));
if (_myServer) {
safeServerName = _myServer->getMyServerName();
}
qDebug() << qPrintable(safeServerName) << "server [" << _myServer << "]: client connected "
"- starting sending thread [" << this << "]";
@ -53,7 +57,6 @@ OctreeSendThread::~OctreeSendThread() {
OctreeServer::stopTrackingThread(this);
_node.clear();
_myAssignment.clear();
}
void OctreeSendThread::setIsShuttingDown() {
@ -66,15 +69,13 @@ bool OctreeSendThread::process() {
return false; // exit early if we're shutting down
}
// check that our server and assignment is still valid
if (!_myServer || !_myAssignment) {
return false; // exit early if it's not, it means the server is shutting down
}
OctreeServer::didProcess(this);
quint64 start = usecTimestampNow();
// we'd better have a server at this point, or we're in trouble
assert(_myServer);
// don't do any send processing until the initial load of the octree is complete...
if (_myServer->isInitialLoadComplete()) {
if (_node) {

View file

@ -26,7 +26,7 @@ class OctreeServer;
class OctreeSendThread : public GenericThread {
Q_OBJECT
public:
OctreeSendThread(const SharedAssignmentPointer& myAssignment, const SharedNodePointer& node);
OctreeSendThread(OctreeServer* myServer, const SharedNodePointer& node);
virtual ~OctreeSendThread();
void setIsShuttingDown();
@ -43,7 +43,6 @@ protected:
virtual bool process();
private:
SharedAssignmentPointer _myAssignment;
OctreeServer* _myServer;
SharedNodePointer _node;
QUuid _nodeUUID;

View file

@ -828,42 +828,42 @@ void OctreeServer::parsePayload() {
void OctreeServer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
auto nodeList = DependencyManager::get<NodeList>();
// If we know we're shutting down we just drop these packets on the floor.
// This stops us from initializing send threads we just shut down.
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
if (!_isShuttingDown) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
PacketType packetType = packetTypeForPacket(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (packetType == getMyQueryMessageType()) {
// If we got a query packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
nodeList->updateNodeWithDataFromPacket(matchingNode, receivedPacket);
// NOTE: this is an important aspect of the proper ref counting. The send threads/node data need to
// know that the OctreeServer/Assignment will not get deleted on it while it's still active. The
// solution is to get the shared pointer for the current assignment. We need to make sure this is the
// same SharedAssignmentPointer that was ref counted by the assignment client.
SharedAssignmentPointer sharedAssignment = AssignmentClient::getCurrentAssignment();
nodeData->initializeOctreeSendThread(sharedAssignment, matchingNode);
OctreeQueryNode* nodeData = (OctreeQueryNode*) matchingNode->getLinkedData();
if (nodeData && !nodeData->isOctreeSendThreadInitalized()) {
nodeData->initializeOctreeSendThread(this, matchingNode);
}
}
}
} else if (packetType == PacketTypeOctreeDataNack) {
// If we got a nack packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData) {
nodeData->parseNackPacket(receivedPacket);
} else if (packetType == PacketTypeOctreeDataNack) {
// If we got a nack packet, then we're talking to an agent, and we
// need to make sure we have it in our nodeList.
if (matchingNode) {
OctreeQueryNode* nodeData = (OctreeQueryNode*)matchingNode->getLinkedData();
if (nodeData) {
nodeData->parseNackPacket(receivedPacket);
}
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
DependencyManager::get<NodeList>()->processNodeData(senderSockAddr, receivedPacket);
}
} else if (packetType == PacketTypeJurisdictionRequest) {
_jurisdictionSender->queueReceivedPacket(matchingNode, receivedPacket);
} else if (_octreeInboundPacketProcessor && getOctree()->handlesEditPacketType(packetType)) {
_octreeInboundPacketProcessor->queueReceivedPacket(matchingNode, receivedPacket);
} else {
// let processNodeData handle it.
DependencyManager::get<NodeList>()->processNodeData(senderSockAddr, receivedPacket);
}
}
}
@ -1216,8 +1216,16 @@ void OctreeServer::forceNodeShutdown(SharedNodePointer node) {
void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish...";
_isShuttingDown = true;
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
// we're going down - set the NodeList linkedDataCallback to NULL so we do not create any more OctreeQueryNode objects.
// This ensures that when we forceNodeShutdown below for each node we don't get any more newly connecting nodes
auto nodeList = DependencyManager::get<NodeList>();
nodeList->linkedDataCreateCallback = NULL;
if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->terminating();
}
@ -1226,7 +1234,9 @@ void OctreeServer::aboutToFinish() {
_jurisdictionSender->terminating();
}
DependencyManager::get<NodeList>()->eachNode([this](const SharedNodePointer& node) {
// force a shutdown of all of our OctreeSendThreads - at this point it has to be impossible for a
// linkedDataCreateCallback to be called for a new node
nodeList->eachNode([this](const SharedNodePointer& node) {
qDebug() << qPrintable(_safeServerName) << "server about to finish while node still connected node:" << *node;
forceNodeShutdown(node);
});

View file

@ -144,12 +144,14 @@ protected:
QString getStatusLink();
void setupDatagramProcessingThread();
int _argc;
const char** _argv;
char** _parsedArgV;
QJsonObject _settings;
bool _isShuttingDown = false;
HTTPManager* _httpManager;
int _statusPort;
QString _statusHost;

View file

@ -69,6 +69,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
_iceServerSocket(ICE_SERVER_DEFAULT_HOSTNAME, ICE_SERVER_DEFAULT_PORT)
{
LogUtils::init();
Setting::init();
setOrganizationName("High Fidelity");
setOrganizationDomain("highfidelity.io");

View file

@ -0,0 +1,68 @@
//
// loadTestServers.js
// examples/utilities/diagnostics
//
// Created by Stephen Birarda on 05/08/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
// This script is made for load testing HF servers. It connects to the HF servers and sends/receives data.
// Run this on an assignment-client.
//
var count = 0;
var yawDirection = -1;
var yaw = 45;
var yawMax = 70;
var yawMin = 20;
var vantagePoint = {x: 5000, y: 500, z: 5000};
var isLocal = false;
// set up our EntityViewer with a position and orientation
var orientation = Quat.fromPitchYawRollDegrees(0, yaw, 0);
EntityViewer.setPosition(vantagePoint);
EntityViewer.setOrientation(orientation);
EntityViewer.queryOctree();
Agent.isListeningToAudioStream = true;
Agent.isAvatar = true;
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min + 1)) + min;
}
function keepLooking(deltaTime) {
count++;
if (count % getRandomInt(5, 15) == 0) {
yaw += yawDirection;
orientation = Quat.fromPitchYawRollDegrees(0, yaw, 0);
if (yaw > yawMax || yaw < yawMin) {
yawDirection = yawDirection * -1;
}
EntityViewer.setOrientation(orientation);
EntityViewer.queryOctree();
}
// approximately every second, consider stopping
if (count % 60 == 0) {
print("considering stop.... elementCount:" + EntityViewer.getOctreeElementsCount());
var stopProbability = 0.05; // 5% chance of stopping
if (Math.random() < stopProbability) {
print("stopping.... elementCount:" + EntityViewer.getOctreeElementsCount());
Script.stop();
}
}
}
// register the call back so it fires before each data send
Script.update.connect(keepLooking);

View file

@ -26,6 +26,7 @@
#include <QAbstractNativeEventFilter>
#include <QActionGroup>
#include <QColorDialog>
#include <QCoreApplication>
#include <QDesktopWidget>
#include <QCheckBox>
#include <QImage>
@ -250,6 +251,8 @@ bool setupEssentials(int& argc, char** argv) {
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
DependencyManager::registerInheritance<AvatarHashMap, AvatarManager>();
Setting::init();
// Set dependencies
auto addressManager = DependencyManager::set<AddressManager>();
@ -341,7 +344,6 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
#ifdef Q_OS_WIN
installNativeEventFilter(&MyNativeEventFilter::getInstance());
#endif
_logger = new FileLogger(this); // After setting organization name in order to get correct directory

View file

@ -37,7 +37,6 @@ static BOOL CALLBACK enumWindowsCallback(HWND hWnd, LPARAM lParam) {
}
#endif
int main(int argc, const char* argv[]) {
#ifdef Q_OS_WIN
// Run only one instance of Interface at a time.

View file

@ -52,10 +52,8 @@ const unsigned short STUN_SERVER_PORT = 3478;
const QString DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY = "domain-server.local-port";
const QString DOMAIN_SERVER_LOCAL_HTTP_PORT_SMEM_KEY = "domain-server.local-http-port";
const QString DOMAIN_SERVER_LOCAL_HTTPS_PORT_SMEM_KEY = "domain-server.local-https-port";
const QString ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY = "assignment-client-monitor.local-port";
const char DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME[] = "localhost";
const unsigned short DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT = 40104;
const QHostAddress DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME = QHostAddress::LocalHost;
const QString USERNAME_UUID_REPLACEMENT_STATS_KEY = "$username";

View file

@ -150,7 +150,11 @@ void NodeList::timePingReply(const QByteArray& packet, const SharedNodePointer&
void NodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet) {
switch (packetTypeForPacket(packet)) {
case PacketTypeDomainList: {
processDomainServerList(packet);
if (!_domainHandler.getSockAddr().isNull()) {
// only process a list from domain-server if we're talking to a domain
// TODO: how do we make sure this is actually the domain we want the list from (DTLS probably)
processDomainServerList(packet);
}
break;
}
case PacketTypeDomainServerRequireDTLS: {

View file

@ -27,40 +27,47 @@ ThreadedAssignment::ThreadedAssignment(const QByteArray& packet) :
}
void ThreadedAssignment::setFinished(bool isFinished) {
_isFinished = isFinished;
if (_isFinished != isFinished) {
_isFinished = isFinished;
if (_isFinished) {
if (_domainServerTimer) {
_domainServerTimer->stop();
delete _domainServerTimer;
_domainServerTimer = nullptr;
}
if (_statsTimer) {
_statsTimer->stop();
delete _statsTimer;
_statsTimer = nullptr;
}
if (_isFinished) {
aboutToFinish();
auto nodeList = DependencyManager::get<NodeList>();
// if we have a datagram processing thread, quit it and wait on it to make sure that
// the node socket is back on the same thread as the NodeList
if (_datagramProcessingThread) {
// tell the datagram processing thread to quit and wait until it is done, then return the node socket to the NodeList
_datagramProcessingThread->quit();
_datagramProcessingThread->wait();
qDebug() << "ThreadedAssignment::setFinished(true) called - finishing up.";
if (_domainServerTimer) {
_domainServerTimer->stop();
}
if (_statsTimer) {
_statsTimer->stop();
}
// set node socket parent back to NodeList
nodeList->getNodeSocket().setParent(nodeList.data());
// stop processing datagrams from the node socket
// this ensures we won't process a domain list while we are going down
auto nodeList = DependencyManager::get<NodeList>();
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
// call our virtual aboutToFinish method - this gives the ThreadedAssignment subclass a chance to cleanup
aboutToFinish();
// if we have a datagram processing thread, quit it and wait on it to make sure that
// the node socket is back on the same thread as the NodeList
if (_datagramProcessingThread) {
// tell the datagram processing thread to quit and wait until it is done,
// then return the node socket to the NodeList
_datagramProcessingThread->quit();
_datagramProcessingThread->wait();
// set node socket parent back to NodeList
nodeList->getNodeSocket().setParent(nodeList.data());
}
// move the NodeList back to the QCoreApplication instance's thread
nodeList->moveToThread(QCoreApplication::instance()->thread());
emit finished();
}
// move the NodeList back to the QCoreApplication instance's thread
nodeList->moveToThread(QCoreApplication::instance()->thread());
emit finished();
}
}

View file

@ -20,6 +20,7 @@ class ThreadedAssignment : public Assignment {
Q_OBJECT
public:
ThreadedAssignment(const QByteArray& packet);
~ThreadedAssignment() { stop(); }
void setFinished(bool isFinished);
virtual void aboutToFinish() { };
@ -32,11 +33,6 @@ public slots:
virtual void readPendingDatagrams() = 0;
virtual void sendStatsPacket();
public slots:
virtual void aboutToQuit() {
QMetaObject::invokeMethod(this, "stop");
}
signals:
void finished();

View file

@ -32,6 +32,9 @@ void GenericThread::initialize(bool isThreaded) {
if (_isThreaded) {
_thread = new QThread(this);
// match the thread name to our object name
_thread->setObjectName(objectName());
// when the worker thread is started, call our engine's run..
connect(_thread, SIGNAL(started()), this, SLOT(threadRoutine()));

View file

@ -36,7 +36,7 @@ namespace Setting {
}
// Sets up the settings private instance. Should only be run once at startup
void setupPrivateInstance() {
void init() {
// read the ApplicationInfo.ini file for Name/Version/Domain information
QSettings::setDefaultFormat(QSettings::IniFormat);
QSettings applicationInfo(PathUtils::resourcesPath() + "info/ApplicationInfo.ini", QSettings::IniFormat);
@ -59,14 +59,11 @@ namespace Setting {
QObject::connect(thread, SIGNAL(finished()), thread, SLOT(deleteLater()));
privateInstance->moveToThread(thread);
thread->start();
qCDebug(shared) << "Settings thread started.";
qCDebug(shared) << "Settings thread started.";
// Register cleanupPrivateInstance to run inside QCoreApplication's destructor.
qAddPostRoutine(cleanupPrivateInstance);
}
// Register setupPrivateInstance to run after QCoreApplication's constructor.
Q_COREAPP_STARTUP_FUNCTION(setupPrivateInstance)
}
Interface::~Interface() {
if (privateInstance) {
@ -76,16 +73,20 @@ namespace Setting {
void Interface::init() {
if (!privateInstance) {
qWarning() << "Setting::Interface::init(): Manager not yet created, bailing";
return;
// WARNING: As long as we are using QSettings this should always be triggered for each Setting::Handle
// in an assignment-client - the QSettings backing we use for this means persistence of these
// settings from an AC (when there can be multiple terminating at same time on one machine)
// is currently not supported
qWarning() << "Setting::Interface::init() for key" << _key << "- Manager not yet created." <<
"Settings persistence disabled.";
} else {
// Register Handle
privateInstance->registerHandle(this);
_isInitialized = true;
// Load value from disk
load();
}
// Register Handle
privateInstance->registerHandle(this);
_isInitialized = true;
// Load value from disk
load();
}
void Interface::maybeInit() {

View file

@ -16,11 +16,14 @@
#include <QVariant>
namespace Setting {
void init();
void cleanupSettings();
class Interface {
public:
QString getKey() const { return _key; }
bool isSet() const { return _isSet; }
bool isSet() const { return _isSet; }
virtual void setVariant(const QVariant& variant) = 0;
virtual QVariant getVariant() = 0;

View file

@ -18,11 +18,12 @@ namespace Setting {
Manager::~Manager() {
// Cleanup timer
stopTimer();
disconnect(_saveTimer, 0, 0, 0);
delete _saveTimer;
// Save all settings before exit
saveAll();
sync();
// sync will be called in the QSettings destructor
}
void Manager::registerHandle(Setting::Interface* handle) {