some refactoring for DS, add create assignments to file

This commit is contained in:
Stephen Birarda 2013-09-26 15:56:41 -07:00
parent 70c5d3e349
commit db1dacbbf4
4 changed files with 152 additions and 154 deletions

View file

@ -74,7 +74,7 @@ void DomainServer::civetwebUploadHandler(struct mg_connection *connection, const
}
void DomainServer::nodeAdded(Node* node) {
NodeList::getInstance()->increaseNodeID();
}
void DomainServer::nodeKilled(Node* node) {
@ -98,7 +98,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
_staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())),
_staticAssignmentFileData(NULL),
_voxelServerConfig(NULL)
{
{
DomainServer::setDomainServerInstance(this);
const char CUSTOM_PORT_OPTION[] = "-p";
@ -117,6 +117,83 @@ DomainServer::DomainServer(int argc, char* argv[]) :
const char VOXEL_CONFIG_OPTION[] = "--voxelServerConfig";
_voxelServerConfig = getCmdOption(argc, (const char**) argv, VOXEL_CONFIG_OPTION);
// setup the mongoose web server
struct mg_callbacks callbacks = {};
QString documentRoot = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot.toLocal8Bit().constData(), NULL};
callbacks.begin_request = civetwebRequestHandler;
callbacks.upload = civetwebUploadHandler;
// Start the web server.
mg_start(&callbacks, NULL, options);
}
void DomainServer::prepopulateStaticAssignmentFile() {
const uint NUM_FRESH_STATIC_ASSIGNMENTS = 3;
// write a fresh static assignment array to file
std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS> freshStaticAssignments;
// pre-populate the first static assignment list with assignments for root AuM, AvM, VS
freshStaticAssignments[0] = Assignment(Assignment::CreateCommand,
Assignment::AudioMixerType,
Assignment::LocalLocation);
freshStaticAssignments[1] = Assignment(Assignment::CreateCommand,
Assignment::AvatarMixerType,
Assignment::LocalLocation);
Assignment voxelServerAssignment(Assignment::CreateCommand, Assignment::VoxelServerType, Assignment::LocalLocation);
// Handle Domain/Voxel Server configuration command line arguments
if (_voxelServerConfig) {
qDebug("Reading Voxel Server Configuration.\n");
qDebug() << " config: " << _voxelServerConfig << "\n";
int payloadLength = strlen(_voxelServerConfig) + sizeof(char);
voxelServerAssignment.setPayload((const uchar*)_voxelServerConfig, payloadLength);
}
freshStaticAssignments[2] = voxelServerAssignment;
_staticAssignmentFile.open(QIODevice::WriteOnly);
_staticAssignmentFile.write((char*) &NUM_FRESH_STATIC_ASSIGNMENTS,
sizeof(uint16_t));
_staticAssignmentFile.write((char*) &freshStaticAssignments, sizeof(freshStaticAssignments));
_staticAssignmentFile.resize(MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS * sizeof(Assignment));
_staticAssignmentFile.close();
}
int DomainServer::checkInMatchesStaticAssignment(NODE_TYPE nodeType, const uchar* checkInData) {
// pull the UUID passed with the check in
QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) checkInData + numBytesForPacketHeader(checkInData) +
sizeof(NODE_TYPE),
NUM_BYTES_RFC4122_UUID));
int staticAssignmentIndex = 0;
while (staticAssignmentIndex < _staticFileAssignments->size() - 1
&& !(*_staticFileAssignments)[staticAssignmentIndex].getUUID().isNull()) {
Assignment* staticAssignment = &(*_staticFileAssignments)[staticAssignmentIndex];
if (staticAssignment->getType() == Assignment::typeForNodeType(nodeType)
&& staticAssignment->getUUID() == checkInUUID) {
// return index of match
return staticAssignmentIndex;
} else {
// no match, keep looking
staticAssignmentIndex++;
}
}
// return -1 for no match
return -1;
}
Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssignment) {
@ -172,42 +249,6 @@ void DomainServer::cleanup() {
_staticAssignmentFile.close();
}
void DomainServer::prepopulateStaticAssignmentFile() {
const uint NUM_FRESH_STATIC_ASSIGNMENTS = 3;
// write a fresh static assignment array to file
std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS> freshStaticAssignments;
// pre-populate the first static assignment list with assignments for root AuM, AvM, VS
freshStaticAssignments[0] = Assignment(Assignment::CreateCommand,
Assignment::AudioMixerType,
Assignment::LocalLocation);
freshStaticAssignments[1] = Assignment(Assignment::CreateCommand,
Assignment::AvatarMixerType,
Assignment::LocalLocation);
Assignment voxelServerAssignment(Assignment::CreateCommand, Assignment::VoxelServerType, Assignment::LocalLocation);
// Handle Domain/Voxel Server configuration command line arguments
if (_voxelServerConfig) {
qDebug("Reading Voxel Server Configuration.\n");
qDebug() << " config: " << _voxelServerConfig << "\n";
int payloadLength = strlen(_voxelServerConfig) + sizeof(char);
voxelServerAssignment.setPayload((const uchar*)_voxelServerConfig, payloadLength);
}
freshStaticAssignments[2] = voxelServerAssignment;
_staticAssignmentFile.open(QIODevice::WriteOnly);
_staticAssignmentFile.write((char*) &NUM_FRESH_STATIC_ASSIGNMENTS,
sizeof(uint16_t));
_staticAssignmentFile.write((char*) &freshStaticAssignments, sizeof(freshStaticAssignments));
_staticAssignmentFile.resize(MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS * sizeof(Assignment));
_staticAssignmentFile.close();
}
int DomainServer::run() {
NodeList* nodeList = NodeList::getInstance();
@ -241,34 +282,11 @@ int DomainServer::run() {
_staticFileAssignments = (std::array<Assignment, MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS>*)
(_staticAssignmentFileData + sizeof(*_numAssignmentsInStaticFile));
// construct a local socket to send with our created assignments to the global AS
sockaddr_in localSocket = {};
localSocket.sin_family = AF_INET;
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
localSocket.sin_addr.s_addr = serverLocalAddress;
// setup the mongoose web server
struct mg_context* ctx;
struct mg_callbacks callbacks = {};
QString documentRoot = QString("%1/resources/web").arg(QCoreApplication::applicationDirPath());
// list of options. Last element must be NULL.
const char* options[] = {"listening_ports", "8080",
"document_root", documentRoot.toLocal8Bit().constData(), NULL};
callbacks.begin_request = civetwebRequestHandler;
callbacks.upload = civetwebUploadHandler;
// Start the web server.
ctx = mg_start(&callbacks, NULL, options);
while (true) {
while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match
std::map<char, Node *> newestSoloNodes;
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
@ -287,104 +305,61 @@ int DomainServer::run() {
nodePublicAddress.sin_addr.s_addr = 0;
}
bool matchedUUID = true;
const char STATICALLY_ASSIGNED_NODES[3] = {
NODE_TYPE_AUDIO_MIXER,
NODE_TYPE_AVATAR_MIXER,
NODE_TYPE_VOXEL_SERVER
};
if ((nodeType == NODE_TYPE_AVATAR_MIXER || nodeType == NODE_TYPE_AUDIO_MIXER) &&
!nodeList->soloNodeOfType(nodeType)) {
// if this is an audio-mixer or an avatar-mixer and we don't have one yet
// we need to check the GUID of the assignment in the queue
// (if it exists) to make sure there is a match
// reset matchedUUID to false so there is no match by default
matchedUUID = false;
// pull the UUID passed with the check in
QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) packetData + numBytesSenderHeader +
sizeof(NODE_TYPE),
NUM_BYTES_RFC4122_UUID));
// lock the assignment queue
_assignmentQueueMutex.lock();
std::deque<Assignment*>::iterator assignment = _assignmentQueue.begin();
Assignment::Type matchType = nodeType == NODE_TYPE_AUDIO_MIXER
? Assignment::AudioMixerType : Assignment::AvatarMixerType;
// enumerate the assignments and see if there is a type and UUID match
while (assignment != _assignmentQueue.end()) {
if ((*assignment)->getType() == matchType
&& (*assignment)->getUUID() == checkInUUID) {
// type and UUID match
matchedUUID = true;
// remove this assignment from the queue
_assignmentQueue.erase(assignment);
break;
} else {
// no match, keep looking
assignment++;
}
}
// unlock the assignment queue
_assignmentQueueMutex.unlock();
}
int matchingStaticAssignmentIndex = -1;
if (matchedUUID) {
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
if (memchr(STATICALLY_ASSIGNED_NODES, nodeType, sizeof(STATICALLY_ASSIGNED_NODES)) == NULL ||
(matchingStaticAssignmentIndex = checkInMatchesStaticAssignment(nodeType, packetData)) != -1) {
Node* checkInNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
(sockaddr*) &nodeLocalAddress,
nodeType,
nodeList->getLastNodeID());
// if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it
if (newNode) {
if (newNode->getNodeID() == nodeList->getLastNodeID()) {
nodeList->increaseNodeID();
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
int numBytesUUID = (nodeType == NODE_TYPE_AUDIO_MIXER || nodeType == NODE_TYPE_AVATAR_MIXER)
? NUM_BYTES_RFC4122_UUID
: 0;
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + numBytesUUID +
sizeof(NODE_TYPE) + numBytesSocket + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos;
int numBytesUUID = (nodeType == NODE_TYPE_AUDIO_MIXER || nodeType == NODE_TYPE_AVATAR_MIXER)
? NUM_BYTES_RFC4122_UUID
: 0;
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + numBytesUUID +
sizeof(NODE_TYPE) + numBytesSocket + sizeof(unsigned char);
int numInterestTypes = *(nodeTypesOfInterest - 1);
if (numInterestTypes > 0) {
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
// don't send avatar nodes to other avatars, that will come from avatar mixer
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
}
}
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
newNode->setLastHeardMicrostamp(timeNow);
// add the node ID to the end of the pointer
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
// send the constructed list back to this node
nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket,
broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes);
}
// update last receive to now
uint64_t timeNow = usecTimestampNow();
checkInNode->setLastHeardMicrostamp(timeNow);
// add the node ID to the end of the pointer
currentBufferPos += packNodeId(currentBufferPos, checkInNode->getNodeID());
// send the constructed list back to this node
nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket,
broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes);
}
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
@ -421,8 +396,12 @@ int DomainServer::run() {
_assignmentQueue.push_back(createAssignment);
_assignmentQueueMutex.unlock();
// also add this assignment to the static map of assignments so it exists next time the DS starts up
(*_staticFileAssignments)[_staticFileAssignments->size()] = *createAssignment;
// find the first available spot in the static assignments and put this assignment there
for (int i = 0; i < _staticFileAssignments->size() - 1; i++) {
if ((*_staticFileAssignments)[i].getUUID().isNull()) {
(*_staticFileAssignments)[i] = *createAssignment;
}
}
}
}
}

View file

@ -42,13 +42,15 @@ private:
static DomainServer* domainServerInstance;
void cleanup();
void prepopulateStaticAssignmentFile();
int checkInMatchesStaticAssignment(NODE_TYPE nodeType, const uchar* checkInUUID);
Assignment* deployableAssignmentForRequest(Assignment& requestAssignment);
void cleanup();
unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd);
Assignment* deployableAssignmentForRequest(Assignment& requestAssignment);
QMutex _assignmentQueueMutex;
std::deque<Assignment*> _assignmentQueue;

View file

@ -16,6 +16,21 @@
const char IPv4_ADDRESS_DESIGNATOR = 4;
const char IPv6_ADDRESS_DESIGNATOR = 6;
Assignment::Type Assignment::typeForNodeType(NODE_TYPE nodeType) {
switch (nodeType) {
case NODE_TYPE_AUDIO_MIXER:
return Assignment::AudioMixerType;
case NODE_TYPE_AVATAR_MIXER:
return Assignment::AvatarMixerType;
case NODE_TYPE_AGENT:
return Assignment::AgentType;
case NODE_TYPE_VOXEL_SERVER:
return Assignment::VoxelServerType;
default:
return Assignment::AllTypes;
}
}
Assignment::Assignment() :
_uuid(),
_command(Assignment::RequestCommand),

View file

@ -42,6 +42,8 @@ public:
LocalLocation
};
static Assignment::Type typeForNodeType(NODE_TYPE nodeType);
Assignment();
Assignment(Assignment::Command command,
Assignment::Type type,