decouple DS from main, move to C++11

This commit is contained in:
Stephen Birarda 2013-09-26 15:03:43 -07:00
parent 6fe47b47a3
commit 70c5d3e349
6 changed files with 527 additions and 434 deletions

View file

@ -12,6 +12,23 @@ set(CMAKE_INCLUDE_CURRENT_DIR ON)
# Instruct CMake to run moc automatically when needed.
set(CMAKE_AUTOMOC ON)
# Initialize CXXFLAGS.
set(CMAKE_CXX_FLAGS "-std=c++11")
# Compiler-specific C++11 activation.
if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "GNU")
execute_process(
COMMAND ${CMAKE_CXX_COMPILER} -dumpversion OUTPUT_VARIABLE GCC_VERSION)
if (NOT (GCC_VERSION VERSION_GREATER 4.7 OR GCC_VERSION VERSION_EQUAL 4.7))
message(FATAL_ERROR "${PROJECT_NAME} requires g++ 4.7 or greater.")
endif ()
elseif ("${CMAKE_CXX_COMPILER_ID}" MATCHES "Clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -stdlib=libc++")
else ()
message(FATAL_ERROR "Your C++ compiler does not support C++11.")
endif ()
add_subdirectory(animation-server)
add_subdirectory(assignment-client)
add_subdirectory(domain-server)
@ -20,4 +37,4 @@ add_subdirectory(interface)
add_subdirectory(injector)
add_subdirectory(pairing-server)
add_subdirectory(space-server)
add_subdirectory(voxel-edit)
add_subdirectory(voxel-edit)

View file

@ -0,0 +1,433 @@
//
// DomainServer.cpp
// hifi
//
// Created by Stephen Birarda on 9/26/13.
// Copyright (c) 2013 HighFidelity, Inc. All rights reserved.
//
#include <signal.h>
#include <PacketHeaders.h>
#include <SharedUtil.h>
#include "DomainServer.h"
DomainServer* DomainServer::domainServerInstance = NULL;
void DomainServer::signalHandler(int signal) {
domainServerInstance->cleanup();
}
void DomainServer::setDomainServerInstance(DomainServer* domainServer) {
domainServerInstance = domainServer;
}
int DomainServer::civetwebRequestHandler(struct mg_connection *connection) {
const struct mg_request_info* ri = mg_get_request_info(connection);
if (strcmp(ri->uri, "/assignment") == 0 && strcmp(ri->request_method, "POST") == 0) {
// return a 200
mg_printf(connection, "%s", "HTTP/1.0 200 OK\r\n\r\n");
// upload the file
mg_upload(connection, "/tmp");
return 1;
} else {
// have mongoose process this request from the document_root
return 0;
}
}
const char ASSIGNMENT_SCRIPT_HOST_LOCATION[] = "resources/web/assignment";
void DomainServer::civetwebUploadHandler(struct mg_connection *connection, const char *path) {
// create an assignment for this saved script, for now make it local only
Assignment *scriptAssignment = new Assignment(Assignment::CreateCommand, Assignment::AgentType, Assignment::LocalLocation);
// check how many instances of this assignment the user wants by checking the ASSIGNMENT-INSTANCES header
const char ASSIGNMENT_INSTANCES_HTTP_HEADER[] = "ASSIGNMENT-INSTANCES";
const char *requestInstancesHeader = mg_get_header(connection, ASSIGNMENT_INSTANCES_HTTP_HEADER);
if (requestInstancesHeader) {
// the user has requested a number of instances greater than 1
// so set that on the created assignment
scriptAssignment->setNumberOfInstances(atoi(requestInstancesHeader));
}
QString newPath(ASSIGNMENT_SCRIPT_HOST_LOCATION);
newPath += "/";
// append the UUID for this script as the new filename, remove the curly braces
newPath += scriptAssignment->getUUIDStringWithoutCurlyBraces();
// rename the saved script to the GUID of the assignment and move it to the script host locaiton
rename(path, newPath.toLocal8Bit().constData());
qDebug("Saved a script for assignment at %s\n", newPath.toLocal8Bit().constData());
// add the script assigment to the assignment queue
// lock the assignment queue mutex since we're operating on a different thread than DS main
domainServerInstance->_assignmentQueueMutex.lock();
domainServerInstance->_assignmentQueue.push_back(scriptAssignment);
domainServerInstance->_assignmentQueueMutex.unlock();
}
void DomainServer::nodeAdded(Node* node) {
}
void DomainServer::nodeKilled(Node* node) {
}
unsigned char* DomainServer::addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd) {
*currentPosition++ = nodeToAdd->getType();
currentPosition += packNodeId(currentPosition, nodeToAdd->getNodeID());
currentPosition += packSocket(currentPosition, nodeToAdd->getPublicSocket());
currentPosition += packSocket(currentPosition, nodeToAdd->getLocalSocket());
// return the new unsigned char * for broadcast packet
return currentPosition;
}
DomainServer::DomainServer(int argc, char* argv[]) :
_assignmentQueueMutex(),
_assignmentQueue(),
_staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())),
_staticAssignmentFileData(NULL),
_voxelServerConfig(NULL)
{
DomainServer::setDomainServerInstance(this);
const char CUSTOM_PORT_OPTION[] = "-p";
const char* customPortString = getCmdOption(argc, (const char**) argv, CUSTOM_PORT_OPTION);
unsigned short domainServerPort = customPortString ? atoi(customPortString) : DEFAULT_DOMAIN_SERVER_PORT;
NodeList::createInstance(NODE_TYPE_DOMAIN, domainServerPort);
struct sigaction sigIntHandler;
sigIntHandler.sa_handler = DomainServer::signalHandler;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
const char VOXEL_CONFIG_OPTION[] = "--voxelServerConfig";
_voxelServerConfig = getCmdOption(argc, (const char**) argv, VOXEL_CONFIG_OPTION);
}
Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssignment) {
_assignmentQueueMutex.lock();
// this is an unassigned client talking to us directly for an assignment
// go through our queue and see if there are any assignments to give out
std::deque<Assignment*>::iterator assignment = _assignmentQueue.begin();
while (assignment != _assignmentQueue.end()) {
if (requestAssignment.getType() == Assignment::AllTypes ||
(*assignment)->getType() == requestAssignment.getType()) {
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
// or if there is more than one instance to send out, simpy decrease the number of instances
if ((*assignment)->getNumberOfInstances() > 1) {
(*assignment)->decrementNumberOfInstances();
} else {
_assignmentQueue.erase(assignment);
delete *assignment;
}
} else {
Assignment *sentAssignment = *assignment;
// remove the assignment from the queue
_assignmentQueue.erase(assignment);
if (sentAssignment->getType() != Assignment::VoxelServerType) {
// keep audio-mixer and avatar-mixer assignments in the queue
// until we get a check-in from that GUID
// but stick it at the back so the others have a chance to go out
_assignmentQueue.push_back(sentAssignment);
}
}
// stop looping, we've handed out an assignment
_assignmentQueueMutex.unlock();
return *assignment;
} else {
// push forward the iterator to check the next assignment
assignment++;
}
}
_assignmentQueueMutex.unlock();
return NULL;
}
void DomainServer::cleanup() {
_staticAssignmentFile.unmap(_staticAssignmentFileData);
_staticAssignmentFile.close();
}
void DomainServer::prepopulateStaticAssignmentFile() {
const uint NUM_FRESH_STATIC_ASSIGNMENTS = 3;
// write a fresh static assignment array to file
std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS> freshStaticAssignments;
// pre-populate the first static assignment list with assignments for root AuM, AvM, VS
freshStaticAssignments[0] = Assignment(Assignment::CreateCommand,
Assignment::AudioMixerType,
Assignment::LocalLocation);
freshStaticAssignments[1] = Assignment(Assignment::CreateCommand,
Assignment::AvatarMixerType,
Assignment::LocalLocation);
Assignment voxelServerAssignment(Assignment::CreateCommand, Assignment::VoxelServerType, Assignment::LocalLocation);
// Handle Domain/Voxel Server configuration command line arguments
if (_voxelServerConfig) {
qDebug("Reading Voxel Server Configuration.\n");
qDebug() << " config: " << _voxelServerConfig << "\n";
int payloadLength = strlen(_voxelServerConfig) + sizeof(char);
voxelServerAssignment.setPayload((const uchar*)_voxelServerConfig, payloadLength);
}
freshStaticAssignments[2] = voxelServerAssignment;
_staticAssignmentFile.open(QIODevice::WriteOnly);
_staticAssignmentFile.write((char*) &NUM_FRESH_STATIC_ASSIGNMENTS,
sizeof(uint16_t));
_staticAssignmentFile.write((char*) &freshStaticAssignments, sizeof(freshStaticAssignments));
_staticAssignmentFile.resize(MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS * sizeof(Assignment));
_staticAssignmentFile.close();
}
int DomainServer::run() {
NodeList* nodeList = NodeList::getInstance();
nodeList->addHook(this);
ssize_t receivedBytes = 0;
char nodeType = '\0';
unsigned char broadcastPacket[MAX_PACKET_SIZE];
unsigned char packetData[MAX_PACKET_SIZE];
unsigned char* currentBufferPos;
unsigned char* startPointer;
sockaddr_in nodePublicAddress, nodeLocalAddress, replyDestinationSocket;
nodeLocalAddress.sin_family = AF_INET;
in_addr_t serverLocalAddress = getLocalAddress();
nodeList->startSilentNodeRemovalThread();
if (!_staticAssignmentFile.exists()) {
prepopulateStaticAssignmentFile();
}
_staticAssignmentFile.open(QIODevice::ReadWrite);
_staticAssignmentFileData = _staticAssignmentFile.map(0, _staticAssignmentFile.size());
_numAssignmentsInStaticFile = (uint16_t*) _staticAssignmentFileData;
_staticFileAssignments = (std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS>*)
(_staticAssignmentFileData + sizeof(*_numAssignmentsInStaticFile));
// construct a local socket to send with our created assignments to the global AS
sockaddr_in localSocket = {};
localSocket.sin_family = AF_INET;
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
localSocket.sin_addr.s_addr = serverLocalAddress;
// setup the mongoose web server
struct mg_context* ctx;
struct mg_callbacks callbacks = {};
QString documentRoot = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot.toLocal8Bit().constData(), NULL};
callbacks.begin_request = civetwebRequestHandler;
callbacks.upload = civetwebUploadHandler;
// Start the web server.
ctx = mg_start(&callbacks, NULL, options);
while (true) {
while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match
std::map<char, Node *> newestSoloNodes;
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
nodeType = *(packetData + numBytesSenderHeader);
int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE),
(sockaddr*) &nodeLocalAddress);
replyDestinationSocket = nodePublicAddress;
// check the node public address
// if it matches our local address
// or if it's the loopback address we're on the same box
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress ||
nodePublicAddress.sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
nodePublicAddress.sin_addr.s_addr = 0;
}
bool matchedUUID = true;
if ((nodeType == NODE_TYPE_AVATAR_MIXER || nodeType == NODE_TYPE_AUDIO_MIXER) &&
!nodeList->soloNodeOfType(nodeType)) {
// if this is an audio-mixer or an avatar-mixer and we don't have one yet
// we need to check the GUID of the assignment in the queue
// (if it exists) to make sure there is a match
// reset matchedUUID to false so there is no match by default
matchedUUID = false;
// pull the UUID passed with the check in
QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) packetData + numBytesSenderHeader +
sizeof(NODE_TYPE),
NUM_BYTES_RFC4122_UUID));
// lock the assignment queue
_assignmentQueueMutex.lock();
std::deque<Assignment*>::iterator assignment = _assignmentQueue.begin();
Assignment::Type matchType = nodeType == NODE_TYPE_AUDIO_MIXER
? Assignment::AudioMixerType : Assignment::AvatarMixerType;
// enumerate the assignments and see if there is a type and UUID match
while (assignment != _assignmentQueue.end()) {
if ((*assignment)->getType() == matchType
&& (*assignment)->getUUID() == checkInUUID) {
// type and UUID match
matchedUUID = true;
// remove this assignment from the queue
_assignmentQueue.erase(assignment);
break;
} else {
// no match, keep looking
assignment++;
}
}
// unlock the assignment queue
_assignmentQueueMutex.unlock();
}
if (matchedUUID) {
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
(sockaddr*) &nodeLocalAddress,
nodeType,
nodeList->getLastNodeID());
// if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it
if (newNode) {
if (newNode->getNodeID() == nodeList->getLastNodeID()) {
nodeList->increaseNodeID();
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
int numBytesUUID = (nodeType == NODE_TYPE_AUDIO_MIXER || nodeType == NODE_TYPE_AVATAR_MIXER)
? NUM_BYTES_RFC4122_UUID
: 0;
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + numBytesUUID +
sizeof(NODE_TYPE) + numBytesSocket + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
}
}
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
newNode->setLastHeardMicrostamp(timeNow);
// add the node ID to the end of the pointer
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
// send the constructed list back to this node
nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket,
broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes);
}
}
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
qDebug("Received a request for assignment.\n");
if (_assignmentQueue.size() > 0) {
// construct the requested assignment from the packet data
Assignment requestAssignment(packetData, receivedBytes);
Assignment* assignmentToDeploy = deployableAssignmentForRequest(requestAssignment);
if (assignmentToDeploy) {
// give this assignment out, either the type matches or the requestor said they will take any
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = assignmentToDeploy->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
broadcastPacket,
numHeaderBytes + numAssignmentBytes);
}
}
} else if (packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
// this is a create assignment likely recieved from a server needed more clients to help with load
// unpack it
Assignment* createAssignment = new Assignment(packetData, receivedBytes);
qDebug() << "Received a create assignment -" << *createAssignment << "\n";
// add the assignment at the back of the queue
_assignmentQueueMutex.lock();
_assignmentQueue.push_back(createAssignment);
_assignmentQueueMutex.unlock();
// also add this assignment to the static map of assignments so it exists next time the DS starts up
(*_staticFileAssignments)[_staticFileAssignments->size()] = *createAssignment;
}
}
}
this->cleanup();
return 0;
}

View file

@ -0,0 +1,64 @@
//
// DomainServer.h
// hifi
//
// Created by Stephen Birarda on 9/26/13.
// Copyright (c) 2013 HighFidelity, Inc. All rights reserved.
//
#ifndef __hifi__DomainServer__
#define __hifi__DomainServer__
#include <array>
#include <deque>
#include <QtCore/QCoreApplication>
#include <QtCore/QFile>
#include <QtCore/QMutex>
#include <Assignment.h>
#include <NodeList.h>
#include <civetweb.h>
const int MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS = 1000;
class DomainServer : public NodeListHook {
public:
DomainServer(int argc, char* argv[]);
int run();
static void signalHandler(int signal);
static void setDomainServerInstance(DomainServer* domainServer);
/// Called by NodeList to inform us that a node has been added.
void nodeAdded(Node* node);
/// Called by NodeList to inform us that a node has been killed.
void nodeKilled(Node* node);
private:
static int civetwebRequestHandler(struct mg_connection *connection);
static void civetwebUploadHandler(struct mg_connection *connection, const char *path);
static DomainServer* domainServerInstance;
void cleanup();
void prepopulateStaticAssignmentFile();
unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd);
Assignment* deployableAssignmentForRequest(Assignment& requestAssignment);
QMutex _assignmentQueueMutex;
std::deque<Assignment*> _assignmentQueue;
QFile _staticAssignmentFile;
uchar* _staticAssignmentFileData;
uint16_t* _numAssignmentsInStaticFile;
std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS>* _staticFileAssignments;
const char* _voxelServerConfig;
};
#endif /* defined(__hifi__DomainServer__) */

View file

@ -11,443 +11,22 @@
// The connection is stateless... the domain server will set you inactive if it does not hear from
// you in LOGOFF_CHECK_INTERVAL milliseconds, meaning your info will not be sent to other users.
//
// Each packet from an node has as first character the type of server:
//
// I - Interactive Node
// M - Audio Mixer
//
#include <arpa/inet.h>
#include <fcntl.h>
#include <deque>
#include <map>
#include <math.h>
#include <signal.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <QtCore/QCoreApplication>
#include <QtCore/QFile>
#include <QtCore/QMap>
#include <QtCore/QMutex>
#include <civetweb.h>
#include <Logging.h>
#include "Assignment.h"
#include "NodeList.h"
#include "NodeTypes.h"
#include "Logging.h"
#include "PacketHeaders.h"
#include "SharedUtil.h"
#include "DomainServer.h"
const int DOMAIN_LISTEN_PORT = 40102;
unsigned char packetData[MAX_PACKET_SIZE];
const int NODE_COUNT_STAT_INTERVAL_MSECS = 5000;
QMutex assignmentQueueMutex;
std::deque<Assignment*> assignmentQueue;
uchar* staticAssignmentFileData;
QFile* staticAssignmentFile = NULL;
unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd) {
*currentPosition++ = nodeToAdd->getType();
int main(int argc, char* argv[]) {
currentPosition += packNodeId(currentPosition, nodeToAdd->getNodeID());
currentPosition += packSocket(currentPosition, nodeToAdd->getPublicSocket());
currentPosition += packSocket(currentPosition, nodeToAdd->getLocalSocket());
// return the new unsigned char * for broadcast packet
return currentPosition;
}
static int mongooseRequestHandler(struct mg_connection *conn) {
const struct mg_request_info* ri = mg_get_request_info(conn);
if (strcmp(ri->uri, "/assignment") == 0 && strcmp(ri->request_method, "POST") == 0) {
// return a 200
mg_printf(conn, "%s", "HTTP/1.0 200 OK\r\n\r\n");
// upload the file
mg_upload(conn, "/tmp");
return 1;
} else {
// have mongoose process this request from the document_root
return 0;
}
}
const char ASSIGNMENT_SCRIPT_HOST_LOCATION[] = "resources/web/assignment";
static void mongooseUploadHandler(struct mg_connection *conn, const char *path) {
// create an assignment for this saved script, for now make it local only
Assignment *scriptAssignment = new Assignment(Assignment::CreateCommand, Assignment::AgentType, Assignment::LocalLocation);
// check how many instances of this assignment the user wants by checking the ASSIGNMENT-INSTANCES header
const char ASSIGNMENT_INSTANCES_HTTP_HEADER[] = "ASSIGNMENT-INSTANCES";
const char *requestInstancesHeader = mg_get_header(conn, ASSIGNMENT_INSTANCES_HTTP_HEADER);
if (requestInstancesHeader) {
// the user has requested a number of instances greater than 1
// so set that on the created assignment
scriptAssignment->setNumberOfInstances(atoi(requestInstancesHeader));
}
QString newPath(ASSIGNMENT_SCRIPT_HOST_LOCATION);
newPath += "/";
// append the UUID for this script as the new filename, remove the curly braces
newPath += scriptAssignment->getUUIDStringWithoutCurlyBraces();
// rename the saved script to the GUID of the assignment and move it to the script host locaiton
rename(path, newPath.toStdString().c_str());
qDebug("Saved a script for assignment at %s\n", newPath.toStdString().c_str());
// add the script assigment to the assignment queue
// lock the assignment queue mutex since we're operating on a different thread than DS main
::assignmentQueueMutex.lock();
::assignmentQueue.push_back(scriptAssignment);
::assignmentQueueMutex.unlock();
}
void signalHandler(int signal) {
qDebug() << "Caught SIGTERM. Unmapping config file and exiting.\n";
::staticAssignmentFile->unmap(::staticAssignmentFileData);
::staticAssignmentFile->close();
delete ::staticAssignmentFile;
exit(1);
}
int main(int argc, const char* argv[]) {
QCoreApplication domainServer(argc, (char**) argv);
setvbuf(stdout, NULL, _IOLBF, 0);
qInstallMessageHandler(Logging::verboseMessageHandler);
struct sigaction sigIntHandler;
QCoreApplication application(argc, argv);
DomainServer domainServer(argc, argv);
sigIntHandler.sa_handler = signalHandler;
sigemptyset(&sigIntHandler.sa_mask);
sigIntHandler.sa_flags = 0;
sigaction(SIGINT, &sigIntHandler, NULL);
const char CUSTOM_PORT_OPTION[] = "-p";
const char* customPortString = getCmdOption(argc, argv, CUSTOM_PORT_OPTION);
unsigned short domainServerPort = customPortString ? atoi(customPortString) : DOMAIN_LISTEN_PORT;
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, domainServerPort);
setvbuf(stdout, NULL, _IOLBF, 0);
ssize_t receivedBytes = 0;
char nodeType = '\0';
unsigned char broadcastPacket[MAX_PACKET_SIZE];
unsigned char* currentBufferPos;
unsigned char* startPointer;
sockaddr_in nodePublicAddress, nodeLocalAddress, replyDestinationSocket;
nodeLocalAddress.sin_family = AF_INET;
in_addr_t serverLocalAddress = getLocalAddress();
nodeList->startSilentNodeRemovalThread();
timeval lastStatSendTime = {};
const int MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS = 1000;
const QString STATIC_ASSIGNMENT_FILENAME = QString("%1/config.ds").arg(QCoreApplication::applicationDirPath());
staticAssignmentFile = new QFile(STATIC_ASSIGNMENT_FILENAME);
if (!::staticAssignmentFile->exists()) {
const uint NUM_FRESH_STATIC_ASSIGNMENTS = 3;
// write a fresh static assignment list to file
Assignment freshStaticAssignmentList[NUM_FRESH_STATIC_ASSIGNMENTS];
// pre-populate the first static assignment list with assignments for root AuM, AvM, VS
freshStaticAssignmentList[0] = Assignment(Assignment::CreateCommand,
Assignment::AudioMixerType,
Assignment::LocalLocation);
freshStaticAssignmentList[1] = Assignment(Assignment::CreateCommand,
Assignment::AvatarMixerType,
Assignment::LocalLocation);
Assignment voxelServerAssignment(Assignment::CreateCommand, Assignment::VoxelServerType, Assignment::LocalLocation);
// Handle Domain/Voxel Server configuration command line arguments
const char VOXEL_CONFIG_OPTION[] = "--voxelServerConfig";
const char* voxelServerConfig = getCmdOption(argc, argv, VOXEL_CONFIG_OPTION);
if (voxelServerConfig) {
qDebug("Reading Voxel Server Configuration.\n");
qDebug() << " config: " << voxelServerConfig << "\n";
int payloadLength = strlen(voxelServerConfig) + sizeof(char);
voxelServerAssignment.setPayload((const uchar*)voxelServerConfig, payloadLength);
}
freshStaticAssignmentList[2] = voxelServerAssignment;
::staticAssignmentFile->open(QIODevice::WriteOnly);
::staticAssignmentFile->write((char*) &NUM_FRESH_STATIC_ASSIGNMENTS,
sizeof(uint16_t));
::staticAssignmentFile->write((char*) &freshStaticAssignmentList, sizeof(freshStaticAssignmentList));
::staticAssignmentFile->resize(MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS * sizeof(Assignment));
::staticAssignmentFile->close();
}
::staticAssignmentFile->open(QIODevice::ReadWrite);
::staticAssignmentFileData = ::staticAssignmentFile->map(0, ::staticAssignmentFile->size());
uint16_t* numAssignmentsInStaticFile = (uint16_t*) ::staticAssignmentFileData;
Assignment* staticFileAssignments = (Assignment*) (::staticAssignmentFileData + sizeof(*numAssignmentsInStaticFile));
// construct a local socket to send with our created assignments to the global AS
sockaddr_in localSocket = {};
localSocket.sin_family = AF_INET;
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
localSocket.sin_addr.s_addr = serverLocalAddress;
// setup the mongoose web server
struct mg_context* ctx;
struct mg_callbacks callbacks = {};
QString documentRoot = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot.toStdString().c_str(), NULL};
callbacks.begin_request = mongooseRequestHandler;
callbacks.upload = mongooseUploadHandler;
// Start the web server.
ctx = mg_start(&callbacks, NULL, options);
while (true) {
while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match
std::map<char, Node *> newestSoloNodes;
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
nodeType = *(packetData + numBytesSenderHeader);
int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE),
(sockaddr*) &nodeLocalAddress);
replyDestinationSocket = nodePublicAddress;
// check the node public address
// if it matches our local address
// or if it's the loopback address we're on the same box
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress ||
nodePublicAddress.sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
nodePublicAddress.sin_addr.s_addr = 0;
}
bool matchedUUID = true;
if ((nodeType == NODE_TYPE_AVATAR_MIXER || nodeType == NODE_TYPE_AUDIO_MIXER) &&
!nodeList->soloNodeOfType(nodeType)) {
// if this is an audio-mixer or an avatar-mixer and we don't have one yet
// we need to check the GUID of the assignment in the queue
// (if it exists) to make sure there is a match
// reset matchedUUID to false so there is no match by default
matchedUUID = false;
// pull the UUID passed with the check in
QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) packetData + numBytesSenderHeader +
sizeof(NODE_TYPE),
NUM_BYTES_RFC4122_UUID));
// lock the assignment queue
::assignmentQueueMutex.lock();
std::deque<Assignment*>::iterator assignment = ::assignmentQueue.begin();
Assignment::Type matchType = nodeType == NODE_TYPE_AUDIO_MIXER
? Assignment::AudioMixerType : Assignment::AvatarMixerType;
// enumerate the assignments and see if there is a type and UUID match
while (assignment != ::assignmentQueue.end()) {
if ((*assignment)->getType() == matchType
&& (*assignment)->getUUID() == checkInUUID) {
// type and UUID match
matchedUUID = true;
// remove this assignment from the queue
::assignmentQueue.erase(assignment);
break;
} else {
// no match, keep looking
assignment++;
}
}
// unlock the assignment queue
::assignmentQueueMutex.unlock();
}
if (matchedUUID) {
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
(sockaddr*) &nodeLocalAddress,
nodeType,
nodeList->getLastNodeID());
// if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it
if (newNode) {
if (newNode->getNodeID() == nodeList->getLastNodeID()) {
nodeList->increaseNodeID();
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
int numBytesUUID = (nodeType == NODE_TYPE_AUDIO_MIXER || nodeType == NODE_TYPE_AVATAR_MIXER)
? NUM_BYTES_RFC4122_UUID
: 0;
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + numBytesUUID +
sizeof(NODE_TYPE) + numBytesSocket + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
}
}
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
newNode->setLastHeardMicrostamp(timeNow);
// add the node ID to the end of the pointer
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
// send the constructed list back to this node
nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket,
broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes);
}
}
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
qDebug("Received a request for assignment.\n");
::assignmentQueueMutex.lock();
// this is an unassigned client talking to us directly for an assignment
// go through our queue and see if there are any assignments to give out
std::deque<Assignment*>::iterator assignment = ::assignmentQueue.begin();
while (assignment != ::assignmentQueue.end()) {
// construct the requested assignment from the packet data
Assignment requestAssignment(packetData, receivedBytes);
if (requestAssignment.getType() == Assignment::AllTypes ||
(*assignment)->getType() == requestAssignment.getType()) {
// give this assignment out, either the type matches or the requestor said they will take any
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
broadcastPacket,
numHeaderBytes + numAssignmentBytes);
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
// or if there is more than one instance to send out, simpy decrease the number of instances
if ((*assignment)->getNumberOfInstances() > 1) {
(*assignment)->decrementNumberOfInstances();
} else {
::assignmentQueue.erase(assignment);
delete *assignment;
}
} else {
Assignment *sentAssignment = *assignment;
// remove the assignment from the queue
::assignmentQueue.erase(assignment);
if (sentAssignment->getType() != Assignment::VoxelServerType) {
// keep audio-mixer and avatar-mixer assignments in the queue
// until we get a check-in from that GUID
// but stick it at the back so the others have a chance to go out
::assignmentQueue.push_back(sentAssignment);
}
}
// stop looping, we've handed out an assignment
break;
} else {
// push forward the iterator to check the next assignment
assignment++;
}
}
::assignmentQueueMutex.unlock();
} else if (packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
// this is a create assignment likely recieved from a server needed more clients to help with load
// unpack it
Assignment* createAssignment = new Assignment(packetData, receivedBytes);
qDebug() << "Received a create assignment -" << *createAssignment << "\n";
// add the assignment at the back of the queue
::assignmentQueueMutex.lock();
::assignmentQueue.push_back(createAssignment);
::assignmentQueueMutex.unlock();
// also add this assignment to the static map of assignments so it exists next time the DS starts up
staticFileAssignments[(*numAssignmentsInStaticFile)++] = *createAssignment;
}
}
if (Logging::shouldSendStats()) {
if (usecTimestampNow() - usecTimestamp(&lastStatSendTime) >= (NODE_COUNT_STAT_INTERVAL_MSECS * 1000)) {
// time to send our count of nodes and servers to logstash
const char NODE_COUNT_LOGSTASH_KEY[] = "ds-node-count";
Logging::stashValue(STAT_TYPE_TIMER, NODE_COUNT_LOGSTASH_KEY, nodeList->getNumAliveNodes());
gettimeofday(&lastStatSendTime, NULL);
}
}
}
::staticAssignmentFile->unmap(::staticAssignmentFileData);
return 0;
return domainServer.run();
}

View file

@ -290,7 +290,7 @@ void NodeList::sendDomainServerCheckIn(const char* assignmentUUID) {
// Lookup the IP address of the domain server if we need to
if (_domainIP.isNull()) {
qDebug("Looking up DS hostname %s.\n", _domainHostname.toStdString().c_str());
qDebug("Looking up DS hostname %s.\n", _domainHostname.toLocal8Bit().constData());
QHostInfo domainServerHostInfo = QHostInfo::fromName(_domainHostname);
@ -298,7 +298,8 @@ void NodeList::sendDomainServerCheckIn(const char* assignmentUUID) {
if (domainServerHostInfo.addresses()[i].protocol() == QAbstractSocket::IPv4Protocol) {
_domainIP = domainServerHostInfo.addresses()[i];
qDebug("DS at %s is at %s\n", _domainHostname.toStdString().c_str(), _domainIP.toString().toStdString().c_str());
qDebug("DS at %s is at %s\n", _domainHostname.toLocal8Bit().constData(),
_domainIP.toString().toLocal8Bit().constData());
printedDomainServerIP = true;
@ -311,7 +312,7 @@ void NodeList::sendDomainServerCheckIn(const char* assignmentUUID) {
}
}
} else if (!printedDomainServerIP) {
qDebug("Domain Server IP: %s\n", _domainIP.toString().toStdString().c_str());
qDebug("Domain Server IP: %s\n", _domainIP.toString().toLocal8Bit().constData());
printedDomainServerIP = true;
}
@ -361,7 +362,7 @@ void NodeList::sendDomainServerCheckIn(const char* assignmentUUID) {
_numBytesCheckInPacket = packetPosition - _checkInPacket;
}
_nodeSocket.send(_domainIP.toString().toStdString().c_str(), _domainPort, _checkInPacket, _numBytesCheckInPacket);
_nodeSocket.send(_domainIP.toString().toLocal8Bit().constData(), _domainPort, _checkInPacket, _numBytesCheckInPacket);
// increment the count of un-replied check-ins
_numNoReplyDomainCheckIns++;

View file

@ -56,7 +56,6 @@ public:
virtual void nodeKilled(Node* node) = 0;
};
class NodeList {
public:
static NodeList* createInstance(char ownerType, unsigned short int socketListenPort = 0);