Merge pull request #4726 from sethalves/assignment-client-exits

assignment-clients exit cleanly when they receive sigterm
This commit is contained in:
Brad Hefta-Gaub 2015-04-29 21:46:11 -07:00
commit 74c569ec10
16 changed files with 119 additions and 27 deletions

View file

@ -114,7 +114,23 @@ void AssignmentClient::stopAssignmentClient() {
qDebug() << "Exiting.";
_requestTimer.stop();
_statsTimerACM.stop();
QCoreApplication::quit();
if (_currentAssignment) {
_currentAssignment->aboutToQuit();
QThread* currentAssignmentThread = _currentAssignment->thread();
currentAssignmentThread->quit();
currentAssignmentThread->wait();
}
}
void AssignmentClient::aboutToQuit() {
stopAssignmentClient();
// clear the log handler so that Qt doesn't call the destructor on LogHandler
qInstallMessageHandler(0);
// clear out pointer to the assignment so the destructor gets called. if we don't do this here,
// it will get destroyed along with all the other "static" stuff. various static member variables
// will be destroyed first and things go wrong.
_currentAssignment.clear();
}
@ -197,6 +213,7 @@ void AssignmentClient::readPendingDatagrams() {
// start the deployed assignment
AssignmentThread* workerThread = new AssignmentThread(_currentAssignment, this);
workerThread->setObjectName("worker");
connect(workerThread, &QThread::started, _currentAssignment.data(), &ThreadedAssignment::run);
connect(_currentAssignment.data(), &ThreadedAssignment::finished, workerThread, &QThread::quit);

View file

@ -34,6 +34,9 @@ private slots:
void sendStatsPacketToACM();
void stopAssignmentClient();
public slots:
void aboutToQuit();
private:
void setUpStatsToMonitor(int ppid);
Assignment _requestAssignment;

View file

@ -10,6 +10,7 @@
//
#include <QCommandLineParser>
#include <QThread>
#include <LogHandler.h>
#include <SharedUtil.h>
@ -180,14 +181,19 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
}
}
QThread::currentThread()->setObjectName("main thread");
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
if (numForks || minForks || maxForks) {
AssignmentClientMonitor monitor(numForks, minForks, maxForks, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &monitor, &AssignmentClientMonitor::aboutToQuit);
exec();
} else {
AssignmentClient client(ppid, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &client, &AssignmentClient::aboutToQuit);
exec();
}
}

View file

@ -69,6 +69,17 @@ AssignmentClientMonitor::~AssignmentClientMonitor() {
stopChildProcesses();
}
void AssignmentClientMonitor::waitOnChildren(int msecs) {
QMutableListIterator<QProcess*> i(_childProcesses);
while (i.hasNext()) {
QProcess* childProcess = i.next();
bool finished = childProcess->waitForFinished(msecs);
if (finished) {
i.remove();
}
}
}
void AssignmentClientMonitor::stopChildProcesses() {
auto nodeList = DependencyManager::get<NodeList>();
@ -78,11 +89,22 @@ void AssignmentClientMonitor::stopChildProcesses() {
QByteArray diePacket = byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, *node->getActiveSocket());
});
// try to give all the children time to shutdown
waitOnChildren(15000);
}
void AssignmentClientMonitor::aboutToQuit() {
stopChildProcesses();
// clear the log handler so that Qt doesn't call the destructor on LogHandler
qInstallMessageHandler(0);
}
void AssignmentClientMonitor::spawnChildClient() {
QProcess *assignmentClient = new QProcess(this);
_childProcesses.append(assignmentClient);
// unparse the parts of the command-line that the child cares about
QStringList _childArguments;
if (_assignmentPool != "") {
@ -119,8 +141,6 @@ void AssignmentClientMonitor::spawnChildClient() {
qDebug() << "Spawned a child client with PID" << assignmentClient->pid();
}
void AssignmentClientMonitor::checkSpares() {
auto nodeList = DependencyManager::get<NodeList>();
QUuid aSpareId = "";
@ -156,6 +176,8 @@ void AssignmentClientMonitor::checkSpares() {
nodeList->writeUnverifiedDatagram(diePacket, childNode);
}
}
waitOnChildren(0);
}

View file

@ -32,12 +32,16 @@ public:
QString assignmentPool, QUuid walletUUID, QString assignmentServerHostname,
quint16 assignmentServerPort);
~AssignmentClientMonitor();
void waitOnChildren(int msecs);
void stopChildProcesses();
private slots:
void readPendingDatagrams();
void checkSpares();
public slots:
void aboutToQuit();
private:
void spawnChildClient();
QTimer _checkSparesTimer; // every few seconds see if it need fewer or more spare children
@ -52,6 +56,7 @@ private:
QString _assignmentServerHostname;
quint16 _assignmentServerPort;
QList<QProcess*> _childProcesses;
};
#endif // hifi_AssignmentClientMonitor_h

View file

@ -45,6 +45,9 @@ AvatarMixer::AvatarMixer(const QByteArray& packet) :
}
AvatarMixer::~AvatarMixer() {
if (_broadcastTimer) {
_broadcastTimer->deleteLater();
}
_broadcastThread.quit();
_broadcastThread.wait();
}
@ -343,13 +346,13 @@ void AvatarMixer::run() {
nodeList->linkedDataCreateCallback = attachAvatarDataToNode;
// setup the timer that will be fired on the broadcast thread
QTimer* broadcastTimer = new QTimer();
broadcastTimer->setInterval(AVATAR_DATA_SEND_INTERVAL_MSECS);
broadcastTimer->moveToThread(&_broadcastThread);
_broadcastTimer = new QTimer();
_broadcastTimer->setInterval(AVATAR_DATA_SEND_INTERVAL_MSECS);
_broadcastTimer->moveToThread(&_broadcastThread);
// connect appropriate signals and slots
connect(broadcastTimer, &QTimer::timeout, this, &AvatarMixer::broadcastAvatarData, Qt::DirectConnection);
connect(&_broadcastThread, SIGNAL(started()), broadcastTimer, SLOT(start()));
connect(_broadcastTimer, &QTimer::timeout, this, &AvatarMixer::broadcastAvatarData, Qt::DirectConnection);
connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start()));
// start the broadcastThread
_broadcastThread.start();

View file

@ -47,6 +47,8 @@ private:
int _numStatFrames;
int _sumBillboardPackets;
int _sumIdentityPackets;
QTimer* _broadcastTimer = nullptr;
};
#endif // hifi_AvatarMixer_h

View file

@ -27,6 +27,11 @@ EntityServer::EntityServer(const QByteArray& packet)
}
EntityServer::~EntityServer() {
if (_pruneDeletedEntitiesTimer) {
_pruneDeletedEntitiesTimer->stop();
_pruneDeletedEntitiesTimer->deleteLater();
}
EntityTree* tree = (EntityTree*)_tree;
tree->removeNewlyCreatedHook(this);
}
@ -48,10 +53,10 @@ Octree* EntityServer::createTree() {
}
void EntityServer::beforeRun() {
QTimer* pruneDeletedEntitiesTimer = new QTimer(this);
connect(pruneDeletedEntitiesTimer, SIGNAL(timeout()), this, SLOT(pruneDeletedEntities()));
_pruneDeletedEntitiesTimer = new QTimer();
connect(_pruneDeletedEntitiesTimer, SIGNAL(timeout()), this, SLOT(pruneDeletedEntities()));
const int PRUNE_DELETED_MODELS_INTERVAL_MSECS = 1 * 1000; // once every second
pruneDeletedEntitiesTimer->start(PRUNE_DELETED_MODELS_INTERVAL_MSECS);
_pruneDeletedEntitiesTimer->start(PRUNE_DELETED_MODELS_INTERVAL_MSECS);
}
void EntityServer::entityCreated(const EntityItem& newEntity, const SharedNodePointer& senderNode) {

View file

@ -51,6 +51,7 @@ protected:
private:
EntitySimulation* _entitySimulation;
QTimer* _pruneDeletedEntitiesTimer = nullptr;
};
#endif // hifi_EntityServer_h

View file

@ -74,7 +74,7 @@ public:
NodeToSenderStatsMap& getSingleSenderStats() { return _singleSenderStats; }
void shuttingDown() { _shuttingDown = true;}
virtual void terminating() { _shuttingDown = true; ReceivedPacketProcessor::terminating(); }
protected:

View file

@ -266,16 +266,19 @@ OctreeServer::~OctreeServer() {
}
if (_jurisdictionSender) {
_jurisdictionSender->terminating();
_jurisdictionSender->terminate();
_jurisdictionSender->deleteLater();
}
if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->terminating();
_octreeInboundPacketProcessor->terminate();
_octreeInboundPacketProcessor->deleteLater();
}
if (_persistThread) {
_persistThread->terminating();
_persistThread->terminate();
_persistThread->deleteLater();
}
@ -1095,8 +1098,6 @@ void OctreeServer::readConfiguration() {
}
void OctreeServer::run() {
qInstallMessageHandler(LogHandler::verboseMessageHandler);
_safeServerName = getMyServerName();
// Before we do anything else, create our tree...
@ -1219,8 +1220,15 @@ void OctreeServer::forceNodeShutdown(SharedNodePointer node) {
void OctreeServer::aboutToFinish() {
qDebug() << qPrintable(_safeServerName) << "server STARTING about to finish...";
qDebug() << qPrintable(_safeServerName) << "inform Octree Inbound Packet Processor that we are shutting down...";
_octreeInboundPacketProcessor->shuttingDown();
if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->terminating();
}
if (_jurisdictionSender) {
_jurisdictionSender->terminating();
}
DependencyManager::get<NodeList>()->eachNode([this](const SharedNodePointer& node) {
qDebug() << qPrintable(_safeServerName) << "server about to finish while node still connected node:" << *node;
forceNodeShutdown(node);
@ -1228,6 +1236,7 @@ void OctreeServer::aboutToFinish() {
if (_persistThread) {
_persistThread->aboutToFinish();
_persistThread->terminating();
}
qDebug() << qPrintable(_safeServerName) << "server ENDING about to finish...";

View file

@ -237,8 +237,6 @@ protected:
HifiSockAddr _localSockAddr;
HifiSockAddr _publicSockAddr;
HifiSockAddr _stunSockAddr;
QTimer* _silentNodeTimer;
// XXX can BandwidthRecorder be used for this?
int _numCollectedPackets;

View file

@ -47,6 +47,8 @@ public:
/// How many received packets waiting are to be processed
int packetsToProcessCount() const { return _packets.size(); }
virtual void terminating();
public slots:
void nodeKilled(SharedNodePointer node);
@ -71,8 +73,6 @@ protected:
/// Override to do work after the packets processing loop. Default does nothing.
virtual void postProcess() { }
virtual void terminating();
protected:
QVector<NetworkPacket> _packets;

View file

@ -30,6 +30,17 @@ void ThreadedAssignment::setFinished(bool isFinished) {
_isFinished = isFinished;
if (_isFinished) {
if (_domainServerTimer) {
_domainServerTimer->stop();
delete _domainServerTimer;
_domainServerTimer = nullptr;
}
if (_statsTimer) {
_statsTimer->stop();
delete _statsTimer;
_statsTimer = nullptr;
}
aboutToFinish();
auto nodeList = DependencyManager::get<NodeList>();
@ -63,15 +74,15 @@ void ThreadedAssignment::commonInit(const QString& targetName, NodeType_t nodeTy
// this is a temp fix for Qt 5.3 - rebinding the node socket gives us readyRead for the socket on this thread
nodeList->rebindNodeSocket();
QTimer* domainServerTimer = new QTimer(this);
connect(domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
_domainServerTimer = new QTimer();
connect(_domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
_domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
if (shouldSendStats) {
// send a stats packet every 1 second
QTimer* statsTimer = new QTimer(this);
connect(statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket);
statsTimer->start(1000);
_statsTimer = new QTimer();
connect(_statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket);
_statsTimer->start(1000);
}
}

View file

@ -28,8 +28,15 @@ public:
public slots:
/// threaded run of assignment
virtual void run() = 0;
Q_INVOKABLE virtual void stop() { setFinished(true); }
virtual void readPendingDatagrams() = 0;
virtual void sendStatsPacket();
public slots:
virtual void aboutToQuit() {
QMetaObject::invokeMethod(this, "stop");
}
signals:
void finished();
@ -38,6 +45,8 @@ protected:
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
bool _isFinished;
QThread* _datagramProcessingThread;
QTimer* _domainServerTimer = nullptr;
QTimer* _statsTimer = nullptr;
private slots:
void checkInWithDomainServerOrExit();

View file

@ -228,6 +228,7 @@ void OctreePersistThread::aboutToFinish() {
qCDebug(octree) << "Persist thread about to finish...";
persist();
qCDebug(octree) << "Persist thread done with about to finish...";
_stopThread = true;
}
void OctreePersistThread::persist() {