Merge pull request #4335 from sethalves/assignment-client-keep-a-spare-x

Assignment client keep a spare
This commit is contained in:
Philip Rosedale 2015-02-27 07:39:06 -08:00
commit df553ac65e
18 changed files with 595 additions and 179 deletions

View file

@ -18,10 +18,9 @@
#include <AccountManager.h>
#include <AddressManager.h>
#include <Assignment.h>
#include <AvatarHashMap.h>
#include <HifiConfigVariantMap.h>
#include <LogHandler.h>
#include <LogUtils.h>
#include <LimitedNodeList.h>
#include <NodeList.h>
#include <PacketHeaders.h>
#include <SharedUtil.h>
@ -40,79 +39,43 @@ SharedAssignmentPointer AssignmentClient::_currentAssignment;
int hifiSockAddrMeta = qRegisterMetaType<HifiSockAddr>("HifiSockAddr");
AssignmentClient::AssignmentClient(int &argc, char **argv) :
QCoreApplication(argc, argv),
AssignmentClient::AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool, QUuid walletUUID,
QString assignmentServerHostname, quint16 assignmentServerPort) :
_assignmentServerHostname(DEFAULT_ASSIGNMENT_SERVER_HOSTNAME),
_localASPortSharedMem(NULL)
_localASPortSharedMem(NULL),
_localACMPortSharedMem(NULL)
{
LogUtils::init();
setOrganizationName("High Fidelity");
setOrganizationDomain("highfidelity.io");
setApplicationName("assignment-client");
QSettings::setDefaultFormat(QSettings::IniFormat);
// create a NodeList as an unassigned client
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
auto addressManager = DependencyManager::set<AddressManager>();
auto nodeList = DependencyManager::set<NodeList>(NodeType::Unassigned);
auto avatarHashMap = DependencyManager::set<AvatarHashMap>();
// setup a shutdown event listener to handle SIGTERM or WM_CLOSE for us
#ifdef _WIN32
installNativeEventFilter(&ShutdownEventListener::getInstance());
#else
ShutdownEventListener::getInstance();
#endif
// make up a uuid for this child so the parent can tell us apart. This id will be changed
// when the domain server hands over an assignment.
QUuid nodeUUID = QUuid::createUuid();
nodeList->setSessionUUID(nodeUUID);
// set the logging target to the the CHILD_TARGET_NAME
LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME);
const QVariantMap argumentVariantMap = HifiConfigVariantMap::mergeCLParametersWithJSONConfig(arguments());
const QString ASSIGNMENT_TYPE_OVERRIDE_OPTION = "t";
const QString ASSIGNMENT_POOL_OPTION = "pool";
const QString ASSIGNMENT_WALLET_DESTINATION_ID_OPTION = "wallet";
const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "a";
const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "p";
Assignment::Type requestAssignmentType = Assignment::AllTypes;
// check for an assignment type passed on the command line or in the config
if (argumentVariantMap.contains(ASSIGNMENT_TYPE_OVERRIDE_OPTION)) {
requestAssignmentType = (Assignment::Type) argumentVariantMap.value(ASSIGNMENT_TYPE_OVERRIDE_OPTION).toInt();
}
QString assignmentPool;
// check for an assignment pool passed on the command line or in the config
if (argumentVariantMap.contains(ASSIGNMENT_POOL_OPTION)) {
assignmentPool = argumentVariantMap.value(ASSIGNMENT_POOL_OPTION).toString();
}
// setup our _requestAssignment member variable from the passed arguments
_requestAssignment = Assignment(Assignment::RequestCommand, requestAssignmentType, assignmentPool);
// check for a wallet UUID on the command line or in the config
// this would represent where the user running AC wants funds sent to
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
QUuid walletUUID = argumentVariantMap.value(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION).toString();
if (!walletUUID.isNull()) {
qDebug() << "The destination wallet UUID for credits is" << uuidStringWithoutCurlyBraces(walletUUID);
_requestAssignment.setWalletUUID(walletUUID);
}
quint16 assignmentServerPort = DEFAULT_DOMAIN_SERVER_PORT;
// check for an overriden assignment server hostname
if (argumentVariantMap.contains(CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION)) {
if (assignmentServerHostname != "") {
// change the hostname for our assignment server
_assignmentServerHostname = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION).toString();
}
// check for an overriden assignment server port
if (argumentVariantMap.contains(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION)) {
assignmentServerPort =
argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toString().toUInt();
_assignmentServerHostname = assignmentServerHostname;
}
_assignmentServerSocket = HifiSockAddr(_assignmentServerHostname, assignmentServerPort, true);
@ -123,9 +86,12 @@ AssignmentClient::AssignmentClient(int &argc, char **argv) :
// call a timer function every ASSIGNMENT_REQUEST_INTERVAL_MSECS to ask for assignment, if required
qDebug() << "Waiting for assignment -" << _requestAssignment;
QTimer* timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), SLOT(sendAssignmentRequest()));
timer->start(ASSIGNMENT_REQUEST_INTERVAL_MSECS);
if (_assignmentServerHostname != "localhost") {
qDebug () << "- will attempt to connect to domain-server on" << _assignmentServerSocket.getPort();
}
connect(&_requestTimer, SIGNAL(timeout()), SLOT(sendAssignmentRequest()));
_requestTimer.start(ASSIGNMENT_REQUEST_INTERVAL_MSECS);
// connect our readPendingDatagrams method to the readyRead() signal of the socket
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClient::readPendingDatagrams);
@ -136,6 +102,45 @@ AssignmentClient::AssignmentClient(int &argc, char **argv) :
// Create Singleton objects on main thread
NetworkAccessManager::getInstance();
// Hook up a timer to send this child's status to the Monitor once per second
setUpStatsToMonitor();
}
void AssignmentClient::stopAssignmentClient() {
qDebug() << "Exiting.";
_requestTimer.stop();
_statsTimerACM.stop();
QCoreApplication::quit();
}
void AssignmentClient::setUpStatsToMonitor() {
// Figure out the address to send out stats to
quint16 localMonitorServerPort = DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT;
auto nodeList = DependencyManager::get<NodeList>();
nodeList->getLocalServerPortFromSharedMemory(ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY,
_localACMPortSharedMem, localMonitorServerPort);
_assignmentClientMonitorSocket = HifiSockAddr(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME, localMonitorServerPort, true);
// send a stats packet every 1 seconds
connect(&_statsTimerACM, &QTimer::timeout, this, &AssignmentClient::sendStatsPacketToACM);
_statsTimerACM.start(1000);
}
void AssignmentClient::sendStatsPacketToACM() {
// tell the assignment client monitor what this assignment client is doing (if anything)
QJsonObject statsObject;
auto nodeList = DependencyManager::get<NodeList>();
if (_currentAssignment) {
statsObject["assignment_type"] = _currentAssignment->getTypeName();
} else {
statsObject["assignment_type"] = "none";
}
nodeList->sendStats(statsObject, _assignmentClientMonitorSocket);
}
void AssignmentClient::sendAssignmentRequest() {
@ -145,23 +150,9 @@ void AssignmentClient::sendAssignmentRequest() {
if (_assignmentServerHostname == "localhost") {
// we want to check again for the local domain-server port in case the DS has restarted
if (!_localASPortSharedMem) {
_localASPortSharedMem = new QSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this);
if (!_localASPortSharedMem->attach(QSharedMemory::ReadOnly)) {
qWarning() << "Could not attach to shared memory at key" << DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY
<< "- will attempt to connect to domain-server on" << _assignmentServerSocket.getPort();
}
}
if (_localASPortSharedMem->isAttached()) {
_localASPortSharedMem->lock();
quint16 localAssignmentServerPort;
memcpy(&localAssignmentServerPort, _localASPortSharedMem->data(), sizeof(localAssignmentServerPort));
_localASPortSharedMem->unlock();
quint16 localAssignmentServerPort;
if (nodeList->getLocalServerPortFromSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, _localASPortSharedMem,
localAssignmentServerPort)) {
if (localAssignmentServerPort != _assignmentServerSocket.getPort()) {
qDebug() << "Port for local assignment server read from shared memory is"
<< localAssignmentServerPort;
@ -170,7 +161,6 @@ void AssignmentClient::sendAssignmentRequest() {
nodeList->setAssignmentServerSocket(_assignmentServerSocket);
}
}
}
nodeList->sendAssignment(_requestAssignment);
@ -227,6 +217,14 @@ void AssignmentClient::readPendingDatagrams() {
} else {
qDebug() << "Received an assignment that could not be unpacked. Re-requesting.";
}
} else if (packetTypeForPacket(receivedPacket) == PacketTypeStopNode) {
if (senderSockAddr.getAddress() == QHostAddress::LocalHost ||
senderSockAddr.getAddress() == QHostAddress::LocalHostIPv6) {
qDebug() << "Network told me to exit.";
emit stopAssignmentClient();
} else {
qDebug() << "Got a stop packet from other than localhost.";
}
} else {
// have the NodeList attempt to handle it
nodeList->processNodeData(senderSockAddr, receivedPacket);

View file

@ -18,10 +18,12 @@
class QSharedMemory;
class AssignmentClient : public QCoreApplication {
class AssignmentClient : public QObject {
Q_OBJECT
public:
AssignmentClient(int &argc, char **argv);
AssignmentClient(Assignment::Type requestAssignmentType, QString assignmentPool,
QUuid walletUUID, QString assignmentServerHostname, quint16 assignmentServerPort);
static const SharedAssignmentPointer& getCurrentAssignment() { return _currentAssignment; }
private slots:
@ -29,13 +31,22 @@ private slots:
void readPendingDatagrams();
void assignmentCompleted();
void handleAuthenticationRequest();
void sendStatsPacketToACM();
void stopAssignmentClient();
private:
void setUpStatsToMonitor();
Assignment _requestAssignment;
static SharedAssignmentPointer _currentAssignment;
QString _assignmentServerHostname;
HifiSockAddr _assignmentServerSocket;
QSharedMemory* _localASPortSharedMem;
QSharedMemory* _localASPortSharedMem; // memory shared with domain server
QSharedMemory* _localACMPortSharedMem; // memory shared with assignment client monitor
QTimer _requestTimer; // timer for requesting and assignment
QTimer _statsTimerACM; // timer for sending stats to assignment client monitor
protected:
HifiSockAddr _assignmentClientMonitorSocket;
};
#endif // hifi_AssignmentClient_h

View file

@ -0,0 +1,185 @@
//
// AssignmentClientapp.cpp
// assignment-client/src
//
// Created by Seth Alves on 2/19/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <QCommandLineParser>
#include <LogHandler.h>
#include <SharedUtil.h>
#include <HifiConfigVariantMap.h>
#include <ShutdownEventListener.h>
#include "Assignment.h"
#include "AssignmentClient.h"
#include "AssignmentClientMonitor.h"
#include "AssignmentClientApp.h"
AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
QCoreApplication(argc, argv)
{
# ifndef WIN32
setvbuf(stdout, NULL, _IOLBF, 0);
# endif
// setup a shutdown event listener to handle SIGTERM or WM_CLOSE for us
# ifdef _WIN32
installNativeEventFilter(&ShutdownEventListener::getInstance());
# else
ShutdownEventListener::getInstance();
# endif
setOrganizationName("High Fidelity");
setOrganizationDomain("highfidelity.io");
setApplicationName("assignment-client");
// use the verbose message handler in Logging
qInstallMessageHandler(LogHandler::verboseMessageHandler);
// parse command-line
QCommandLineParser parser;
parser.setApplicationDescription("High Fidelity Assignment Client");
parser.addHelpOption();
const QCommandLineOption helpOption = parser.addHelpOption();
const QCommandLineOption clientTypeOption(ASSIGNMENT_TYPE_OVERRIDE_OPTION,
"run single assignment client of given type", "type");
parser.addOption(clientTypeOption);
const QCommandLineOption poolOption(ASSIGNMENT_POOL_OPTION, "set assignment pool", "pool-name");
parser.addOption(poolOption);
const QCommandLineOption walletDestinationOption(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION,
"set wallet destination", "wallet-uuid");
parser.addOption(walletDestinationOption);
const QCommandLineOption assignmentServerHostnameOption(CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION,
"set assignment-server hostname", "hostname");
parser.addOption(assignmentServerHostnameOption);
const QCommandLineOption assignmentServerPortOption(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION,
"set assignment-server port", "port");
parser.addOption(assignmentServerPortOption);
const QCommandLineOption numChildsOption(ASSIGNMENT_NUM_FORKS_OPTION, "number of children to fork", "child-count");
parser.addOption(numChildsOption);
const QCommandLineOption minChildsOption(ASSIGNMENT_MIN_FORKS_OPTION, "minimum number of children", "child-count");
parser.addOption(minChildsOption);
const QCommandLineOption maxChildsOption(ASSIGNMENT_MAX_FORKS_OPTION, "maximum number of children", "child-count");
parser.addOption(maxChildsOption);
if (!parser.parse(QCoreApplication::arguments())) {
qCritical() << parser.errorText() << endl;
parser.showHelp();
Q_UNREACHABLE();
}
if (parser.isSet(helpOption)) {
parser.showHelp();
Q_UNREACHABLE();
}
const QVariantMap argumentVariantMap = HifiConfigVariantMap::mergeCLParametersWithJSONConfig(arguments());
unsigned int numForks = 0;
if (parser.isSet(numChildsOption)) {
numForks = parser.value(numChildsOption).toInt();
}
unsigned int minForks = 0;
if (parser.isSet(minChildsOption)) {
minForks = parser.value(minChildsOption).toInt();
}
unsigned int maxForks = 0;
if (parser.isSet(maxChildsOption)) {
maxForks = parser.value(maxChildsOption).toInt();
}
if (!numForks && minForks) {
// if the user specified --min but not -n, set -n to --min
numForks = minForks;
}
Assignment::Type requestAssignmentType = Assignment::AllTypes;
if (argumentVariantMap.contains(ASSIGNMENT_TYPE_OVERRIDE_OPTION)) {
requestAssignmentType = (Assignment::Type) argumentVariantMap.value(ASSIGNMENT_TYPE_OVERRIDE_OPTION).toInt();
}
if (parser.isSet(clientTypeOption)) {
requestAssignmentType = (Assignment::Type) parser.value(clientTypeOption).toInt();
}
QString assignmentPool;
// check for an assignment pool passed on the command line or in the config
if (argumentVariantMap.contains(ASSIGNMENT_POOL_OPTION)) {
assignmentPool = argumentVariantMap.value(ASSIGNMENT_POOL_OPTION).toString();
}
if (parser.isSet(poolOption)) {
assignmentPool = parser.value(poolOption);
}
QUuid walletUUID;
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
walletUUID = argumentVariantMap.value(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION).toString();
}
if (parser.isSet(walletDestinationOption)) {
walletUUID = parser.value(walletDestinationOption);
}
QString assignmentServerHostname;
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
assignmentServerHostname = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION).toString();
}
if (parser.isSet(assignmentServerHostnameOption)) {
assignmentServerHostname = parser.value(assignmentServerHostnameOption);
}
// check for an overriden assignment server port
quint16 assignmentServerPort = DEFAULT_DOMAIN_SERVER_PORT;
if (argumentVariantMap.contains(ASSIGNMENT_WALLET_DESTINATION_ID_OPTION)) {
assignmentServerPort = argumentVariantMap.value(CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION).toString().toUInt();
}
if (parser.isSet(assignmentServerPortOption)) {
assignmentServerPort = parser.value(assignmentServerPortOption).toInt();
}
if (parser.isSet(numChildsOption)) {
if (minForks && minForks > numForks) {
qCritical() << "--min can't be more than -n";
parser.showHelp();
Q_UNREACHABLE();
}
if (maxForks && maxForks < numForks) {
qCritical() << "--max can't be less than -n";
parser.showHelp();
Q_UNREACHABLE();
}
}
if (numForks || minForks || maxForks) {
AssignmentClientMonitor monitor(numForks, minForks, maxForks, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
exec();
} else {
AssignmentClient client(requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort);
exec();
}
}

View file

@ -0,0 +1,34 @@
//
// AssignmentClientapp.h
// assignment-client/src
//
// Created by Seth Alves on 2/19/15.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_AssignmentClientApp_h
#define hifi_AssignmentClientApp_h
#include <QApplication>
const QString ASSIGNMENT_TYPE_OVERRIDE_OPTION = "t";
const QString ASSIGNMENT_POOL_OPTION = "pool";
const QString ASSIGNMENT_WALLET_DESTINATION_ID_OPTION = "wallet";
const QString CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION = "a";
const QString CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION = "p";
const QString ASSIGNMENT_NUM_FORKS_OPTION = "n";
const QString ASSIGNMENT_MIN_FORKS_OPTION = "min";
const QString ASSIGNMENT_MAX_FORKS_OPTION = "max";
class AssignmentClientApp : public QCoreApplication {
Q_OBJECT
public:
AssignmentClientApp(int argc, char* argv[]);
};
#endif // hifi_AssignmentClientApp_h

View file

@ -0,0 +1,8 @@
#include "AssignmentClientChildData.h"
AssignmentClientChildData::AssignmentClientChildData(QString childType) :
_childType(childType)
{
}

View file

@ -0,0 +1,32 @@
//
// AssignmentClientChildData.h
// assignment-client/src
//
// Created by Seth Alves on 2/23/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_AssignmentClientChildData_h
#define hifi_AssignmentClientChildData_h
#include <Assignment.h>
class AssignmentClientChildData : public NodeData {
public:
AssignmentClientChildData(QString childType);
QString getChildType() { return _childType; }
void setChildType(QString childType) { _childType = childType; }
// implement parseData to return 0 so we can be a subclass of NodeData
int parseData(const QByteArray& packet) { return 0; }
private:
QString _childType;
};
#endif // hifi_AssignmentClientChildData_h

View file

@ -12,40 +12,51 @@
#include <signal.h>
#include <LogHandler.h>
#include <ShutdownEventListener.h>
#include <AddressManager.h>
#include "AssignmentClientMonitor.h"
#include "AssignmentClientApp.h"
#include "AssignmentClientChildData.h"
#include "PacketHeaders.h"
#include "SharedUtil.h"
const char* NUM_FORKS_PARAMETER = "-n";
const QString ASSIGNMENT_CLIENT_MONITOR_TARGET_NAME = "assignment-client-monitor";
AssignmentClientMonitor::AssignmentClientMonitor(int &argc, char **argv, int numAssignmentClientForks) :
QCoreApplication(argc, argv)
AssignmentClientMonitor::AssignmentClientMonitor(const unsigned int numAssignmentClientForks,
const unsigned int minAssignmentClientForks,
const unsigned int maxAssignmentClientForks,
QString assignmentPool, QUuid walletUUID, QString assignmentServerHostname,
quint16 assignmentServerPort) :
_numAssignmentClientForks(numAssignmentClientForks),
_minAssignmentClientForks(minAssignmentClientForks),
_maxAssignmentClientForks(maxAssignmentClientForks),
_assignmentPool(assignmentPool),
_walletUUID(walletUUID),
_assignmentServerHostname(assignmentServerHostname),
_assignmentServerPort(assignmentServerPort)
{
// start the Logging class with the parent's target name
LogHandler::getInstance().setTargetName(ASSIGNMENT_CLIENT_MONITOR_TARGET_NAME);
// setup a shutdown event listener to handle SIGTERM or WM_CLOSE for us
#ifdef _WIN32
installNativeEventFilter(&ShutdownEventListener::getInstance());
#else
ShutdownEventListener::getInstance();
#endif
_childArguments = arguments();
// remove the parameter for the number of forks so it isn't passed to the child forked processes
int forksParameterIndex = _childArguments.indexOf(NUM_FORKS_PARAMETER);
// this removes both the "-n" parameter and the number of forks passed
_childArguments.removeAt(forksParameterIndex);
_childArguments.removeAt(forksParameterIndex);
// create a NodeList so we can receive stats from children
DependencyManager::registerInheritance<LimitedNodeList, NodeList>();
auto addressManager = DependencyManager::set<AddressManager>();
auto nodeList = DependencyManager::set<LimitedNodeList>(DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT);
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, this, &AssignmentClientMonitor::readPendingDatagrams);
nodeList->putLocalPortIntoSharedMemory(ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY, this);
// use QProcess to fork off a process for each of the child assignment clients
for (int i = 0; i < numAssignmentClientForks; i++) {
for (unsigned int i = 0; i < _numAssignmentClientForks; i++) {
spawnChildClient();
}
connect(&_checkSparesTimer, &QTimer::timeout, this, &AssignmentClientMonitor::checkSpares);
_checkSparesTimer.start(NODE_SILENCE_THRESHOLD_MSECS * 3);
}
AssignmentClientMonitor::~AssignmentClientMonitor() {
@ -53,46 +64,145 @@ AssignmentClientMonitor::~AssignmentClientMonitor() {
}
void AssignmentClientMonitor::stopChildProcesses() {
QList<QPointer<QProcess> >::Iterator it = _childProcesses.begin();
while (it != _childProcesses.end()) {
if (!it->isNull()) {
qDebug() << "Monitor is terminating child process" << it->data();
// don't re-spawn this child when it goes down
disconnect(it->data(), 0, this, 0);
it->data()->terminate();
it->data()->waitForFinished();
}
it = _childProcesses.erase(it);
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachNode([&](const SharedNodePointer& node) {
qDebug() << "asking child" << node->getUUID() << "to exit.";
node->activateLocalSocket();
QByteArray diePacket = byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, *node->getActiveSocket());
});
}
void AssignmentClientMonitor::spawnChildClient() {
QProcess *assignmentClient = new QProcess(this);
_childProcesses.append(QPointer<QProcess>(assignmentClient));
// unparse the parts of the command-line that the child cares about
QStringList _childArguments;
if (_assignmentPool != "") {
_childArguments.append("--" + ASSIGNMENT_POOL_OPTION);
_childArguments.append(_assignmentPool);
}
if (!_walletUUID.isNull()) {
_childArguments.append("--" + ASSIGNMENT_WALLET_DESTINATION_ID_OPTION);
_childArguments.append(_walletUUID.toString());
}
if (_assignmentServerHostname != "") {
_childArguments.append("--" + CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION);
_childArguments.append(_assignmentServerHostname);
}
if (_assignmentServerPort != DEFAULT_DOMAIN_SERVER_PORT) {
_childArguments.append("--" + CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION);
_childArguments.append(QString::number(_assignmentServerPort));
}
// make sure that the output from the child process appears in our output
assignmentClient->setProcessChannelMode(QProcess::ForwardedChannels);
assignmentClient->start(applicationFilePath(), _childArguments);
// link the child processes' finished slot to our childProcessFinished slot
connect(assignmentClient, SIGNAL(finished(int, QProcess::ExitStatus)), this,
SLOT(childProcessFinished(int, QProcess::ExitStatus)));
assignmentClient->start(QCoreApplication::applicationFilePath(), _childArguments);
qDebug() << "Spawned a child client with PID" << assignmentClient->pid();
}
void AssignmentClientMonitor::childProcessFinished(int exitCode, QProcess::ExitStatus exitStatus) {
qDebug("Replacing dead child assignment client with a new one");
// remove the old process from our list of child processes
qDebug() << "need to remove" << QPointer<QProcess>(qobject_cast<QProcess*>(sender()));
_childProcesses.removeOne(QPointer<QProcess>(qobject_cast<QProcess*>(sender())));
spawnChildClient();
void AssignmentClientMonitor::checkSpares() {
auto nodeList = DependencyManager::get<NodeList>();
QUuid aSpareId = "";
unsigned int spareCount = 0;
unsigned int totalCount = 0;
nodeList->removeSilentNodes();
nodeList->eachNode([&](const SharedNodePointer& node) {
AssignmentClientChildData *childData = static_cast<AssignmentClientChildData*>(node->getLinkedData());
totalCount ++;
if (childData->getChildType() == "none") {
spareCount ++;
aSpareId = node->getUUID();
}
});
// Spawn or kill children, as needed. If --min or --max weren't specified, allow the child count
// to drift up or down as far as needed.
if (spareCount < 1 || totalCount < _minAssignmentClientForks) {
if (!_maxAssignmentClientForks || totalCount < _maxAssignmentClientForks) {
spawnChildClient();
}
}
if (spareCount > 1) {
if (!_minAssignmentClientForks || totalCount > _minAssignmentClientForks) {
// kill aSpareId
qDebug() << "asking child" << aSpareId << "to exit.";
SharedNodePointer childNode = nodeList->nodeWithUUID(aSpareId);
childNode->activateLocalSocket();
QByteArray diePacket = byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, childNode);
}
}
}
void AssignmentClientMonitor::readPendingDatagrams() {
auto nodeList = DependencyManager::get<NodeList>();
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
while (nodeList->getNodeSocket().hasPendingDatagrams()) {
receivedPacket.resize(nodeList->getNodeSocket().pendingDatagramSize());
nodeList->getNodeSocket().readDatagram(receivedPacket.data(), receivedPacket.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
if (packetTypeForPacket(receivedPacket) == PacketTypeNodeJsonStats) {
QUuid packetUUID = uuidFromPacketHeader(receivedPacket);
SharedNodePointer matchingNode = nodeList->sendingNodeForPacket(receivedPacket);
if (!matchingNode) {
// The parent only expects to be talking with prorams running on this same machine.
if (senderSockAddr.getAddress() == QHostAddress::LocalHost ||
senderSockAddr.getAddress() == QHostAddress::LocalHostIPv6) {
if (!packetUUID.isNull()) {
matchingNode = DependencyManager::get<LimitedNodeList>()->addOrUpdateNode
(packetUUID, NodeType::Unassigned, senderSockAddr, senderSockAddr, false);
AssignmentClientChildData *childData = new AssignmentClientChildData("unknown");
matchingNode->setLinkedData(childData);
} else {
// tell unknown assignment-client child to exit.
qDebug() << "asking unknown child to exit.";
QByteArray diePacket = byteArrayWithPopulatedHeader(PacketTypeStopNode);
nodeList->writeUnverifiedDatagram(diePacket, senderSockAddr);
}
}
}
if (matchingNode) {
// update our records about how to reach this child
matchingNode->setLocalSocket(senderSockAddr);
// push past the packet header
QDataStream packetStream(receivedPacket);
packetStream.skipRawData(numBytesForPacketHeader(receivedPacket));
// decode json
QVariantMap unpackedVariantMap;
packetStream >> unpackedVariantMap;
QJsonObject unpackedStatsJSON = QJsonObject::fromVariantMap(unpackedVariantMap);
// get child's assignment type out of the decoded json
QString childType = unpackedStatsJSON["assignment_type"].toString();
AssignmentClientChildData *childData =
static_cast<AssignmentClientChildData*>(matchingNode->getLinkedData());
childData->setChildType(childType);
// note when this child talked
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
}
} else {
// have the NodeList attempt to handle it
nodeList->processNodeData(senderSockAddr, receivedPacket);
}
}
}
}

View file

@ -15,25 +15,40 @@
#include <QtCore/QCoreApplication>
#include <QtCore/qpointer.h>
#include <QtCore/QProcess>
#include <QtCore/QDateTime>
#include <Assignment.h>
#include "AssignmentClientChildData.h"
extern const char* NUM_FORKS_PARAMETER;
class AssignmentClientMonitor : public QCoreApplication {
class AssignmentClientMonitor : public QObject {
Q_OBJECT
public:
AssignmentClientMonitor(int &argc, char **argv, int numAssignmentClientForks);
AssignmentClientMonitor(const unsigned int numAssignmentClientForks, const unsigned int minAssignmentClientForks,
const unsigned int maxAssignmentClientForks, QString assignmentPool, QUuid walletUUID,
QString assignmentServerHostname, quint16 assignmentServerPort);
~AssignmentClientMonitor();
void stopChildProcesses();
private slots:
void childProcessFinished(int exitCode, QProcess::ExitStatus exitStatus);
void readPendingDatagrams();
void checkSpares();
private:
void spawnChildClient();
QList<QPointer<QProcess> > _childProcesses;
QStringList _childArguments;
QTimer _checkSparesTimer; // every few seconds see if it need fewer or more spare children
const unsigned int _numAssignmentClientForks;
const unsigned int _minAssignmentClientForks;
const unsigned int _maxAssignmentClientForks;
QString _assignmentPool;
QUuid _walletUUID;
QString _assignmentServerHostname;
quint16 _assignmentServerPort;
};
#endif // hifi_AssignmentClientMonitor_h

View file

@ -9,34 +9,10 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <LogHandler.h>
#include <SharedUtil.h>
#include "Assignment.h"
#include "AssignmentClient.h"
#include "AssignmentClientMonitor.h"
#include "AssignmentClientApp.h"
int main(int argc, char* argv[]) {
#ifndef WIN32
setvbuf(stdout, NULL, _IOLBF, 0);
#endif
// use the verbose message handler in Logging
qInstallMessageHandler(LogHandler::verboseMessageHandler);
const char* numForksString = getCmdOption(argc, (const char**)argv, NUM_FORKS_PARAMETER);
int numForks = 0;
if (numForksString) {
numForks = atoi(numForksString);
}
if (numForks) {
AssignmentClientMonitor monitor(argc, argv, numForks);
return monitor.exec();
} else {
AssignmentClient client(argc, argv);
return client.exec();
}
AssignmentClientApp app(argc, argv);
return 0;
}

View file

@ -246,19 +246,7 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
auto nodeList = DependencyManager::set<LimitedNodeList>(domainServerPort, domainServerDTLSPort);
// no matter the local port, save it to shared mem so that local assignment clients can ask what it is
QSharedMemory* sharedPortMem = new QSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this);
quint16 localPort = nodeList->getNodeSocket().localPort();
// attempt to create the shared memory segment
if (sharedPortMem->create(sizeof(localPort)) || sharedPortMem->attach()) {
sharedPortMem->lock();
memcpy(sharedPortMem->data(), &localPort, sizeof(localPort));
sharedPortMem->unlock();
qDebug() << "Wrote local listening port" << localPort << "to shared memory at key" << DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY;
} else {
qWarning() << "Failed to create and attach to shared memory to share local port with assignment-client children.";
}
nodeList->putLocalPortIntoSharedMemory(DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY, this);
// set our LimitedNodeList UUID to match the UUID from our config
// nodes will currently use this to add resources to data-web that relate to our domain

View file

@ -76,7 +76,7 @@ const QString AddressManager::currentPath(bool withOrientation) const {
pathString += "/" + orientationString;
} else {
qDebug() << "Cannot add orientation to path without a getter for position."
<< "Call AdressManager::setOrientationGetter to pass a function that will return a glm::quat";
<< "Call AddressManager::setOrientationGetter to pass a function that will return a glm::quat";
}
}
@ -84,7 +84,7 @@ const QString AddressManager::currentPath(bool withOrientation) const {
return pathString;
} else {
qDebug() << "Cannot create address path without a getter for position."
<< "Call AdressManager::setPositionGetter to pass a function that will return a const glm::vec3&";
<< "Call AddressManager::setPositionGetter to pass a function that will return a const glm::vec3&";
return QString();
}
}

View file

@ -669,3 +669,41 @@ void LimitedNodeList::sendHeartbeatToIceServer(const HifiSockAddr& iceServerSock
writeUnverifiedDatagram(iceRequestByteArray, iceServerSockAddr);
}
void LimitedNodeList::putLocalPortIntoSharedMemory(const QString key, QObject* parent) {
// save our local port to shared memory so that assignment client children know how to talk to this parent
QSharedMemory* sharedPortMem = new QSharedMemory(key, parent);
quint16 localPort = getNodeSocket().localPort();
// attempt to create the shared memory segment
if (sharedPortMem->create(sizeof(localPort)) || sharedPortMem->attach()) {
sharedPortMem->lock();
memcpy(sharedPortMem->data(), &localPort, sizeof(localPort));
sharedPortMem->unlock();
qDebug() << "Wrote local listening port" << localPort << "to shared memory at key" << key;
} else {
qWarning() << "Failed to create and attach to shared memory to share local port with assignment-client children.";
}
}
bool LimitedNodeList::getLocalServerPortFromSharedMemory(const QString key, QSharedMemory*& sharedMem,
quint16& localPort) {
if (!sharedMem) {
sharedMem = new QSharedMemory(key, this);
if (!sharedMem->attach(QSharedMemory::ReadOnly)) {
qWarning() << "Could not attach to shared memory at key" << key;
}
}
if (sharedMem->isAttached()) {
sharedMem->lock();
memcpy(&localPort, sharedMem->data(), sizeof(localPort));
sharedMem->unlock();
return true;
}
return false;
}

View file

@ -26,6 +26,7 @@
#include <qsharedpointer.h>
#include <QtNetwork/qudpsocket.h>
#include <QtNetwork/qhostaddress.h>
#include <QSharedMemory>
#include <tbb/concurrent_unordered_map.h>
@ -49,6 +50,10 @@ const char STUN_SERVER_HOSTNAME[] = "stun.highfidelity.io";
const unsigned short STUN_SERVER_PORT = 3478;
const QString DOMAIN_SERVER_LOCAL_PORT_SMEM_KEY = "domain-server.local-port";
const QString ASSIGNMENT_CLIENT_MONITOR_LOCAL_PORT_SMEM_KEY = "assignment-client-monitor.local-port";
const char DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME[] = "localhost";
const unsigned short DEFAULT_ASSIGNMENT_CLIENT_MONITOR_PORT = 40104;
class HifiSockAddr;
@ -168,6 +173,9 @@ public:
return SharedNodePointer();
}
void putLocalPortIntoSharedMemory(const QString key, QObject* parent);
bool getLocalServerPortFromSharedMemory(const QString key, QSharedMemory*& sharedMem, quint16& localPort);
public slots:
void reset();

View file

@ -149,7 +149,12 @@ QDataStream& operator>>(QDataStream& in, Node& node) {
}
QDebug operator<<(QDebug debug, const Node &node) {
debug.nospace() << NodeType::getNodeTypeName(node.getType()) << " (" << node.getType() << ")";
debug.nospace() << NodeType::getNodeTypeName(node.getType());
if (node.getType() == NodeType::Unassigned) {
debug.nospace() << " (1)";
} else {
debug.nospace() << " (" << node.getType() << ")";
}
debug << " " << node.getUUID().toString().toLocal8Bit().constData() << " ";
debug.nospace() << node.getPublicSocket() << "/" << node.getLocalSocket();
return debug.nospace();

View file

@ -62,13 +62,17 @@ NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned
connect(&AccountManager::getInstance(), &AccountManager::logoutComplete , this, &NodeList::reset);
}
qint64 NodeList::sendStatsToDomainServer(const QJsonObject& statsObject) {
qint64 NodeList::sendStats(const QJsonObject& statsObject, HifiSockAddr destination) {
QByteArray statsPacket = byteArrayWithPopulatedHeader(PacketTypeNodeJsonStats);
QDataStream statsPacketStream(&statsPacket, QIODevice::Append);
statsPacketStream << statsObject.toVariantMap();
return writeUnverifiedDatagram(statsPacket, _domainHandler.getSockAddr());
return writeUnverifiedDatagram(statsPacket, destination);
}
qint64 NodeList::sendStatsToDomainServer(const QJsonObject& statsObject) {
return sendStats(statsObject, _domainHandler.getSockAddr());
}
void NodeList::timePingReply(const QByteArray& packet, const SharedNodePointer& sendingNode) {

View file

@ -47,6 +47,7 @@ public:
NodeType_t getOwnerType() const { return _ownerType; }
void setOwnerType(NodeType_t ownerType) { _ownerType = ownerType; }
qint64 sendStats(const QJsonObject& statsObject, HifiSockAddr destination);
qint64 sendStatsToDomainServer(const QJsonObject& statsObject);
int getNumNoReplyDomainCheckIns() const { return _numNoReplyDomainCheckIns; }

View file

@ -70,6 +70,8 @@ PacketVersion versionForPacketType(PacketType type) {
return 2;
case PacketTypeOctreeStats:
return 1;
case PacketTypeStopNode:
return 1;
case PacketTypeEntityAddOrEdit:
case PacketTypeEntityData:
return VERSION_MODEL_ENTITIES_SUPPORT_SHAPE_TYPE;
@ -124,6 +126,7 @@ QString nameForPacketType(PacketType type) {
PACKET_TYPE_NAME_LOOKUP(PacketTypeEntityErase);
PACKET_TYPE_NAME_LOOKUP(PacketTypeEntityAddResponse);
PACKET_TYPE_NAME_LOOKUP(PacketTypeOctreeDataNack);
PACKET_TYPE_NAME_LOOKUP(PacketTypeStopNode);
PACKET_TYPE_NAME_LOOKUP(PacketTypeAudioEnvironment);
PACKET_TYPE_NAME_LOOKUP(PacketTypeEntityEditNack);
PACKET_TYPE_NAME_LOOKUP(PacketTypeSignedTransactionPayment);

View file

@ -67,7 +67,7 @@ enum PacketType {
PacketTypeEntityErase,
PacketTypeEntityAddResponse,
PacketTypeOctreeDataNack, // 45
UNUSED_10,
PacketTypeStopNode,
PacketTypeAudioEnvironment,
PacketTypeEntityEditNack,
PacketTypeSignedTransactionPayment,
@ -86,7 +86,7 @@ const QSet<PacketType> NON_VERIFIED_PACKETS = QSet<PacketType>()
<< PacketTypeNodeJsonStats << PacketTypeEntityQuery
<< PacketTypeOctreeDataNack << PacketTypeEntityEditNack
<< PacketTypeIceServerHeartbeat << PacketTypeIceServerHeartbeatResponse
<< PacketTypeUnverifiedPing << PacketTypeUnverifiedPingReply;
<< PacketTypeUnverifiedPing << PacketTypeUnverifiedPingReply << PacketTypeStopNode;
const int NUM_BYTES_MD5_HASH = 16;
const int NUM_STATIC_HEADER_BYTES = sizeof(PacketVersion) + NUM_BYTES_RFC4122_UUID;