make DomainServer a proper QCoreApplication instance, closes

This commit is contained in:
Stephen Birarda 2013-12-05 10:00:20 -08:00
parent 46804f4002
commit a376ebbca2
3 changed files with 258 additions and 272 deletions

View file

@ -12,6 +12,7 @@
#include <QtCore/QJsonDocument>
#include <QtCore/QJsonObject>
#include <QtCore/QStringList>
#include <QtCore/QTimer>
#include <PacketHeaders.h>
#include <SharedUtil.h>
@ -19,11 +20,224 @@
#include "DomainServer.h"
const int RESTART_HOLD_TIME_MSECS = 5 * 1000;
DomainServer* DomainServer::domainServerInstance = NULL;
void DomainServer::signalHandler(int signal) {
domainServerInstance->cleanup();
exit(1);
DomainServer::DomainServer(int argc, char* argv[]) :
QCoreApplication(argc, argv),
_assignmentQueueMutex(),
_assignmentQueue(),
_staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())),
_staticAssignmentFileData(NULL),
_voxelServerConfig(NULL),
_hasCompletedRestartHold(false)
{
DomainServer::setDomainServerInstance(this);
const char CUSTOM_PORT_OPTION[] = "-p";
const char* customPortString = getCmdOption(argc, (const char**) argv, CUSTOM_PORT_OPTION);
unsigned short domainServerPort = customPortString ? atoi(customPortString) : DEFAULT_DOMAIN_SERVER_PORT;
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, domainServerPort);
const char VOXEL_CONFIG_OPTION[] = "--voxelServerConfig";
_voxelServerConfig = getCmdOption(argc, (const char**) argv, VOXEL_CONFIG_OPTION);
// setup the mongoose web server
struct mg_callbacks callbacks = {};
QString documentRootString = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
char documentRoot[documentRootString.size() + 1];
strcpy(documentRoot, documentRootString.toLocal8Bit().constData());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot, NULL};
callbacks.begin_request = civetwebRequestHandler;
callbacks.upload = civetwebUploadHandler;
// Start the web server.
mg_start(&callbacks, NULL, options);
nodeList->addHook(this);
if (!_staticAssignmentFile.exists() || _voxelServerConfig) {
if (_voxelServerConfig) {
// we have a new VS config, clear the existing file to start fresh
_staticAssignmentFile.remove();
}
prepopulateStaticAssignmentFile();
}
_staticAssignmentFile.open(QIODevice::ReadWrite);
_staticAssignmentFileData = _staticAssignmentFile.map(0, _staticAssignmentFile.size());
_staticAssignments = (Assignment*) _staticAssignmentFileData;
QTimer* silentNodeTimer = new QTimer(this);
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), SLOT(readAvailableDatagrams()));
// fire a single shot timer to add static assignments back into the queue after a restart
QTimer::singleShot(RESTART_HOLD_TIME_MSECS, this, SLOT(addStaticAssignmentsBackToQueueAfterRestart()));
}
void DomainServer::exit(int retCode) {
cleanup();
}
void DomainServer::readAvailableDatagrams() {
NodeList* nodeList = NodeList::getInstance();
HifiSockAddr senderSockAddr, nodePublicAddress, nodeLocalAddress;
static unsigned char packetData[MAX_PACKET_SIZE];
static unsigned char broadcastPacket[MAX_PACKET_SIZE];
static unsigned char* currentBufferPos;
static unsigned char* startPointer;
int receivedBytes = 0;
while (nodeList->getNodeSocket().hasPendingDatagrams()) {
if ((receivedBytes = nodeList->getNodeSocket().readDatagram((char*) packetData, MAX_PACKET_SIZE,
senderSockAddr.getAddressPointer(),
senderSockAddr.getPortPointer()))
&& packetVersionMatch((unsigned char*) packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match
int numBytesSenderHeader = numBytesForPacketHeader((unsigned char*) packetData);
NODE_TYPE nodeType = *(packetData + numBytesSenderHeader);
int packetIndex = numBytesSenderHeader + sizeof(NODE_TYPE);
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray(((char*) packetData + packetIndex), NUM_BYTES_RFC4122_UUID));
packetIndex += NUM_BYTES_RFC4122_UUID;
int numBytesPrivateSocket = HifiSockAddr::unpackSockAddr(packetData + packetIndex, nodePublicAddress);
packetIndex += numBytesPrivateSocket;
if (nodePublicAddress.getAddress().isNull()) {
// this node wants to use us its STUN server
// so set the node public address to whatever we perceive the public address to be
// if the sender is on our box then leave its public address to 0 so that
// other users attempt to reach it on the same address they have for the domain-server
if (senderSockAddr.getAddress().isLoopback()) {
nodePublicAddress.setAddress(QHostAddress());
} else {
nodePublicAddress.setAddress(senderSockAddr.getAddress());
}
}
int numBytesPublicSocket = HifiSockAddr::unpackSockAddr(packetData + packetIndex, nodeLocalAddress);
packetIndex += numBytesPublicSocket;
const char STATICALLY_ASSIGNED_NODES[3] = {
NODE_TYPE_AUDIO_MIXER,
NODE_TYPE_AVATAR_MIXER,
NODE_TYPE_VOXEL_SERVER
};
Assignment* matchingStaticAssignment = NULL;
if (memchr(STATICALLY_ASSIGNED_NODES, nodeType, sizeof(STATICALLY_ASSIGNED_NODES)) == NULL
|| ((matchingStaticAssignment = matchingStaticAssignmentForCheckIn(nodeUUID, nodeType))
|| checkInWithUUIDMatchesExistingNode(nodePublicAddress,
nodeLocalAddress,
nodeUUID)))
{
Node* checkInNode = nodeList->addOrUpdateNode(nodeUUID,
nodeType,
nodePublicAddress,
nodeLocalAddress);
if (matchingStaticAssignment) {
// this was a newly added node with a matching static assignment
if (_hasCompletedRestartHold) {
// remove the matching assignment from the assignment queue so we don't take the next check in
removeAssignmentFromQueue(matchingStaticAssignment);
}
// set the linked data for this node to a copy of the matching assignment
// so we can re-queue it should the node die
Assignment* nodeCopyOfMatchingAssignment = new Assignment(*matchingStaticAssignment);
checkInNode->setLinkedData(nodeCopyOfMatchingAssignment);
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
unsigned char* nodeTypesOfInterest = packetData + packetIndex + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getUUID() != nodeUUID &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
}
}
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
checkInNode->setLastHeardMicrostamp(timeNow);
// send the constructed list back to this node
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes,
senderSockAddr.getAddress(), senderSockAddr.getPort());
}
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
qDebug("Received a request for assignment.\n");
if (_assignmentQueue.size() > 0) {
// construct the requested assignment from the packet data
Assignment requestAssignment(packetData, receivedBytes);
Assignment* assignmentToDeploy = deployableAssignmentForRequest(requestAssignment);
if (assignmentToDeploy) {
// give this assignment out, either the type matches or the requestor said they will take any
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = assignmentToDeploy->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket, numHeaderBytes + numAssignmentBytes,
senderSockAddr.getAddress(), senderSockAddr.getPort());
if (assignmentToDeploy->getNumberOfInstances() == 0) {
// there are no more instances of this script to send out, delete it
delete assignmentToDeploy;
}
}
}
}
}
}
}
void DomainServer::setDomainServerInstance(DomainServer* domainServer) {
@ -292,52 +506,6 @@ unsigned char* DomainServer::addNodeToBroadcastPacket(unsigned char* currentPosi
return currentPosition;
}
DomainServer::DomainServer(int argc, char* argv[]) :
_assignmentQueueMutex(),
_assignmentQueue(),
_staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())),
_staticAssignmentFileData(NULL),
_voxelServerConfig(NULL),
_hasCompletedRestartHold(false)
{
DomainServer::setDomainServerInstance(this);
const char CUSTOM_PORT_OPTION[] = "-p";
const char* customPortString = getCmdOption(argc, (const char**) argv, CUSTOM_PORT_OPTION);
unsigned short domainServerPort = customPortString ? atoi(customPortString) : DEFAULT_DOMAIN_SERVER_PORT;
NodeList::createInstance(NODE_TYPE_DOMAIN, domainServerPort);
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = DomainServer::signalHandler;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
const char VOXEL_CONFIG_OPTION[] = "--voxelServerConfig";
_voxelServerConfig = getCmdOption(argc, (const char**) argv, VOXEL_CONFIG_OPTION);
// setup the mongoose web server
struct mg_callbacks callbacks = {};
QString documentRootString = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
char documentRoot[documentRootString.size() + 1];
strcpy(documentRoot, documentRootString.toLocal8Bit().constData());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot, NULL};
callbacks.begin_request = civetwebRequestHandler;
callbacks.upload = civetwebUploadHandler;
// Start the web server.
mg_start(&callbacks, NULL, options);
}
void DomainServer::prepopulateStaticAssignmentFile() {
int numFreshStaticAssignments = 0;
@ -521,234 +689,51 @@ bool DomainServer::checkInWithUUIDMatchesExistingNode(const HifiSockAddr& nodePu
return false;
}
void DomainServer::possiblyAddStaticAssignmentsBackToQueueAfterRestart(timeval* startTime) {
void DomainServer::addStaticAssignmentsBackToQueueAfterRestart() {
_hasCompletedRestartHold = true;
// if the domain-server has just restarted,
// check if there are static assignments in the file that we need to
// throw into the assignment queue
const uint64_t RESTART_HOLD_TIME_USECS = 5 * 1000 * 1000;
if (!_hasCompletedRestartHold && usecTimestampNow() - usecTimestamp(startTime) > RESTART_HOLD_TIME_USECS) {
_hasCompletedRestartHold = true;
// pull anything in the static assignment file that isn't spoken for and add to the assignment queue
for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) {
if (_staticAssignments[i].getUUID().isNull()) {
// reached the end of static assignments, bail
break;
}
// pull anything in the static assignment file that isn't spoken for and add to the assignment queue
for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) {
if (_staticAssignments[i].getUUID().isNull()) {
// reached the end of static assignments, bail
break;
}
bool foundMatchingAssignment = false;
NodeList* nodeList = NodeList::getInstance();
// enumerate the nodes and check if there is one with an attached assignment with matching UUID
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
Assignment* linkedAssignment = (Assignment*) node->getLinkedData();
if (linkedAssignment->getUUID() == _staticAssignments[i].getUUID()) {
foundMatchingAssignment = true;
break;
}
bool foundMatchingAssignment = false;
NodeList* nodeList = NodeList::getInstance();
// enumerate the nodes and check if there is one with an attached assignment with matching UUID
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
Assignment* linkedAssignment = (Assignment*) node->getLinkedData();
if (linkedAssignment->getUUID() == _staticAssignments[i].getUUID()) {
foundMatchingAssignment = true;
break;
}
}
}
if (!foundMatchingAssignment) {
// this assignment has not been fulfilled - reset the UUID and add it to the assignment queue
_staticAssignments[i].resetUUID();
if (!foundMatchingAssignment) {
// this assignment has not been fulfilled - reset the UUID and add it to the assignment queue
_staticAssignments[i].resetUUID();
qDebug() << "Adding static assignment to queue -" << _staticAssignments[i] << "\n";
_assignmentQueueMutex.lock();
_assignmentQueue.push_back(&_staticAssignments[i]);
_assignmentQueueMutex.unlock();
}
qDebug() << "Adding static assignment to queue -" << _staticAssignments[i] << "\n";
_assignmentQueueMutex.lock();
_assignmentQueue.push_back(&_staticAssignments[i]);
_assignmentQueueMutex.unlock();
}
}
}
void DomainServer::cleanup() {
qDebug() << "cleanup called!\n";
_staticAssignmentFile.unmap(_staticAssignmentFileData);
_staticAssignmentFile.close();
}
int DomainServer::run() {
NodeList* nodeList = NodeList::getInstance();
nodeList->addHook(this);
ssize_t receivedBytes = 0;
char nodeType = '\0';
unsigned char broadcastPacket[MAX_PACKET_SIZE];
unsigned char packetData[MAX_PACKET_SIZE];
unsigned char* currentBufferPos;
unsigned char* startPointer;
QHostAddress senderAddress;
quint16 senderPort;
HifiSockAddr nodePublicAddress, nodeLocalAddress;
nodeList->startSilentNodeRemovalThread();
if (!_staticAssignmentFile.exists() || _voxelServerConfig) {
if (_voxelServerConfig) {
// we have a new VS config, clear the existing file to start fresh
_staticAssignmentFile.remove();
}
prepopulateStaticAssignmentFile();
}
_staticAssignmentFile.open(QIODevice::ReadWrite);
_staticAssignmentFileData = _staticAssignmentFile.map(0, _staticAssignmentFile.size());
_staticAssignments = (Assignment*) _staticAssignmentFileData;
timeval startTime;
gettimeofday(&startTime, NULL);
while (true) {
while (nodeList->getNodeSocket().hasPendingDatagrams()
&& nodeList->getNodeSocket().readDatagram((char*) packetData, MAX_PACKET_SIZE, &senderAddress, &senderPort) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
nodeType = *(packetData + numBytesSenderHeader);
int packetIndex = numBytesSenderHeader + sizeof(NODE_TYPE);
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray(((char*) packetData + packetIndex), NUM_BYTES_RFC4122_UUID));
packetIndex += NUM_BYTES_RFC4122_UUID;
int numBytesPrivateSocket = HifiSockAddr::unpackSockAddr(packetData + packetIndex, nodePublicAddress);
packetIndex += numBytesPrivateSocket;
if (nodePublicAddress.getAddress().isNull()) {
// this node wants to use us its STUN server
// so set the node public address to whatever we perceive the public address to be
// if the sender is on our box then leave its public address to 0 so that
// other users attempt to reach it on the same address they have for the domain-server
if (senderAddress.isLoopback()) {
nodePublicAddress.setAddress(QHostAddress());
} else {
nodePublicAddress.setAddress(senderAddress);
}
}
int numBytesPublicSocket = HifiSockAddr::unpackSockAddr(packetData + packetIndex, nodeLocalAddress);
packetIndex += numBytesPublicSocket;
const char STATICALLY_ASSIGNED_NODES[3] = {
NODE_TYPE_AUDIO_MIXER,
NODE_TYPE_AVATAR_MIXER,
NODE_TYPE_VOXEL_SERVER
};
Assignment* matchingStaticAssignment = NULL;
if (memchr(STATICALLY_ASSIGNED_NODES, nodeType, sizeof(STATICALLY_ASSIGNED_NODES)) == NULL
|| ((matchingStaticAssignment = matchingStaticAssignmentForCheckIn(nodeUUID, nodeType))
|| checkInWithUUIDMatchesExistingNode(nodePublicAddress,
nodeLocalAddress,
nodeUUID)))
{
Node* checkInNode = nodeList->addOrUpdateNode(nodeUUID,
nodeType,
nodePublicAddress,
nodeLocalAddress);
if (matchingStaticAssignment) {
// this was a newly added node with a matching static assignment
if (_hasCompletedRestartHold) {
// remove the matching assignment from the assignment queue so we don't take the next check in
removeAssignmentFromQueue(matchingStaticAssignment);
}
// set the linked data for this node to a copy of the matching assignment
// so we can re-queue it should the node die
Assignment* nodeCopyOfMatchingAssignment = new Assignment(*matchingStaticAssignment);
checkInNode->setLinkedData(nodeCopyOfMatchingAssignment);
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
unsigned char* nodeTypesOfInterest = packetData + packetIndex + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getUUID() != nodeUUID &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
}
}
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
checkInNode->setLastHeardMicrostamp(timeNow);
// send the constructed list back to this node
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes,
senderAddress, senderPort);
}
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
qDebug("Received a request for assignment.\n");
if (!_hasCompletedRestartHold) {
possiblyAddStaticAssignmentsBackToQueueAfterRestart(&startTime);
}
if (_assignmentQueue.size() > 0) {
// construct the requested assignment from the packet data
Assignment requestAssignment(packetData, receivedBytes);
Assignment* assignmentToDeploy = deployableAssignmentForRequest(requestAssignment);
if (assignmentToDeploy) {
// give this assignment out, either the type matches or the requestor said they will take any
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = assignmentToDeploy->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket().writeDatagram((char*) broadcastPacket, numHeaderBytes + numAssignmentBytes,
senderAddress, senderPort);
if (assignmentToDeploy->getNumberOfInstances() == 0) {
// there are no more instances of this script to send out, delete it
delete assignmentToDeploy;
}
}
}
}
}
if (!_hasCompletedRestartHold) {
possiblyAddStaticAssignmentsBackToQueueAfterRestart(&startTime);
}
}
this->cleanup();
return 0;
}

View file

@ -22,20 +22,20 @@
const int MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS = 1000;
class DomainServer : public NodeListHook {
class DomainServer : public QCoreApplication, public NodeListHook {
Q_OBJECT
public:
DomainServer(int argc, char* argv[]);
int run();
static void signalHandler(int signal);
void exit(int retCode = 0);
static void setDomainServerInstance(DomainServer* domainServer);
/// Called by NodeList to inform us that a node has been added.
void nodeAdded(Node* node);
/// Called by NodeList to inform us that a node has been killed.
void nodeKilled(Node* node);
private:
private:
static int civetwebRequestHandler(struct mg_connection *connection);
static void civetwebUploadHandler(struct mg_connection *connection, const char *path);
@ -48,7 +48,6 @@ private:
bool checkInWithUUIDMatchesExistingNode(const HifiSockAddr& nodePublicSocket,
const HifiSockAddr& nodeLocalSocket,
const QUuid& checkInUUI);
void possiblyAddStaticAssignmentsBackToQueueAfterRestart(timeval* startTime);
void addReleasedAssignmentBackToQueue(Assignment* releasedAssignment);
void cleanup();
@ -66,6 +65,9 @@ private:
const char* _voxelServerConfig;
bool _hasCompletedRestartHold;
private slots:
void readAvailableDatagrams();
void addStaticAssignmentsBackToQueueAfterRestart();
};
#endif /* defined(__hifi__DomainServer__) */

View file

@ -24,9 +24,8 @@ int main(int argc, char* argv[]) {
qInstallMessageHandler(Logging::verboseMessageHandler);
QCoreApplication application(argc, argv);
DomainServer domainServer(argc, argv);
return domainServer.run();
return domainServer.exec();
}