diff --git a/domain-server/CMakeLists.txt b/domain-server/CMakeLists.txt index 55558fa7de..4e1f0de298 100644 --- a/domain-server/CMakeLists.txt +++ b/domain-server/CMakeLists.txt @@ -8,15 +8,6 @@ set(TARGET_NAME domain-server) include(${MACRO_DIR}/SetupHifiProject.cmake) setup_hifi_project(${TARGET_NAME} TRUE) -# remove and then copy the files for the webserver -add_custom_command(TARGET ${TARGET_NAME} POST_BUILD - COMMAND ${CMAKE_COMMAND} -E remove_directory - $/resources/web) -add_custom_command(TARGET ${TARGET_NAME} POST_BUILD - COMMAND ${CMAKE_COMMAND} -E copy_directory - "${PROJECT_SOURCE_DIR}/resources/web" - $/resources/web) - # link the shared hifi library include(${MACRO_DIR}/LinkHifiLibrary.cmake) link_hifi_library(shared ${TARGET_NAME} ${ROOT_DIR}) \ No newline at end of file diff --git a/domain-server/src/main.cpp b/domain-server/src/main.cpp index 959130833f..7b44648736 100644 --- a/domain-server/src/main.cpp +++ b/domain-server/src/main.cpp @@ -19,15 +19,12 @@ #include #include -#include #include #include #include #include #include -#include - #include "Assignment.h" #include "NodeList.h" #include "NodeTypes.h" @@ -35,16 +32,11 @@ #include "PacketHeaders.h" #include "SharedUtil.h" -#include "mongoose.h" - const int DOMAIN_LISTEN_PORT = 40102; unsigned char packetData[MAX_PACKET_SIZE]; const int NODE_COUNT_STAT_INTERVAL_MSECS = 5000; -QMutex assignmentQueueMutex; -std::deque assignmentQueue; - unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd) { *currentPosition++ = nodeToAdd->getType(); @@ -56,51 +48,22 @@ unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* no return currentPosition; } -static int mongooseRequestHandler(struct mg_connection *conn) { - const struct mg_request_info *ri = mg_get_request_info(conn); - - if (strcmp(ri->uri, "/assignment") == 0 && strcmp(ri->request_method, "POST") == 0) { - // return a 200 - mg_printf(conn, "%s", "HTTP/1.0 200 OK\r\n\r\n"); - // upload the file - mg_upload(conn, "/tmp"); - - return 1; - } else { - // have mongoose process this request from the document_root - return 0; - } -} - -const char ASSIGNMENT_SCRIPT_HOST_LOCATION[] = "resources/web/assignment"; - -static void mongooseUploadHandler(struct mg_connection *conn, const char *path) { - // create an assignment for this saved script, for now make it local only - Assignment *scriptAssignment = new Assignment(Assignment::CreateCommand, Assignment::AgentType, Assignment::LocalLocation); - - QString newPath(ASSIGNMENT_SCRIPT_HOST_LOCATION); - newPath += "/"; - // append the UUID for this script as the new filename, remove the curly braces - newPath += scriptAssignment->getUUIDStringWithoutCurlyBraces(); - - // rename the saved script to the GUID of the assignment and move it to the script host locaiton - rename(path, newPath.toStdString().c_str()); - - qDebug("Saved a script for assignment at %s\n", newPath.toStdString().c_str()); - - // add the script assigment to the assignment queue - // lock the assignment queue mutex since we're operating on a different thread than DS main - ::assignmentQueueMutex.lock(); - ::assignmentQueue.push_back(scriptAssignment); - ::assignmentQueueMutex.unlock(); - -} - int main(int argc, const char* argv[]) { - - qInstallMessageHandler(Logging::verboseMessageHandler); - NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT); + // If user asks to run in "local" mode then we do NOT replace the IP + // with the EC2 IP. Otherwise, we will replace the IP like we used to + // this allows developers to run a local domain without recompiling the + // domain server + bool isLocalMode = cmdOptionExists(argc, (const char**) argv, "--local"); + if (isLocalMode) { + printf("NOTE: Running in local mode!\n"); + } else { + printf("--------------------------------------------------\n"); + printf("NOTE: Not running in local mode. \n"); + printf("If you're a developer testing a local system, you\n"); + printf("probably want to include --local on command line.\n"); + printf("--------------------------------------------------\n"); + } setvbuf(stdout, NULL, _IOLBF, 0); @@ -108,11 +71,12 @@ int main(int argc, const char* argv[]) { char nodeType = '\0'; unsigned char broadcastPacket[MAX_PACKET_SIZE]; + int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN); unsigned char* currentBufferPos; unsigned char* startPointer; - sockaddr_in nodePublicAddress, nodeLocalAddress, replyDestinationSocket; + sockaddr_in nodePublicAddress, nodeLocalAddress; nodeLocalAddress.sin_family = AF_INET; in_addr_t serverLocalAddress = getLocalAddress(); @@ -120,8 +84,13 @@ int main(int argc, const char* argv[]) { nodeList->startSilentNodeRemovalThread(); timeval lastStatSendTime = {}; + + const char ASSIGNMENT_POOL_OPTION[] = "-p"; const char ASSIGNMENT_SERVER_OPTION[] = "-a"; + // set our assignment pool from argv, if it exists + const char* assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_OPTION); + // grab the overriden assignment-server hostname from argv, if it exists const char* customAssignmentServer = getCmdOption(argc, argv, ASSIGNMENT_SERVER_OPTION); if (customAssignmentServer) { @@ -130,224 +99,149 @@ int main(int argc, const char* argv[]) { } // use a map to keep track of iterations of silence for assignment creation requests - const long long GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000; - timeval lastGlobalAssignmentRequest = {}; + const long long ASSIGNMENT_SILENCE_MAX_USECS = 5 * 1000 * 1000; // as a domain-server we will always want an audio mixer and avatar mixer - // setup the create assignments for those - Assignment audioMixerAssignment(Assignment::CreateCommand, - Assignment::AudioMixerType, - Assignment::LocalLocation); + // setup the create assignment pointers for those + Assignment* audioAssignment = NULL; + Assignment* avatarAssignment = NULL; - Assignment avatarMixerAssignment(Assignment::CreateCommand, - Assignment::AvatarMixerType, - Assignment::LocalLocation); - - // construct a local socket to send with our created assignments to the global AS + // construct a local socket to send with our created assignments sockaddr_in localSocket = {}; localSocket.sin_family = AF_INET; localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort()); localSocket.sin_addr.s_addr = serverLocalAddress; - // setup the mongoose web server - struct mg_context *ctx; - struct mg_callbacks callbacks = {}; - - // list of options. Last element must be NULL. - const char *options[] = {"listening_ports", "8080", - "document_root", "./resources/web", NULL}; - - callbacks.begin_request = mongooseRequestHandler; - callbacks.upload = mongooseUploadHandler; - - // Start the web server. - ctx = mg_start(&callbacks, NULL, options); - while (true) { - - ::assignmentQueueMutex.lock(); - // check if our audio-mixer or avatar-mixer are dead and we don't have existing assignments in the queue - // so we can add those assignments back to the front of the queue since they are high-priority - if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER) && - std::find(::assignmentQueue.begin(), assignmentQueue.end(), &avatarMixerAssignment) == ::assignmentQueue.end()) { - qDebug("Missing an avatar mixer and assignment not in queue. Adding.\n"); - ::assignmentQueue.push_front(&avatarMixerAssignment); - } - - if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER) && - std::find(::assignmentQueue.begin(), ::assignmentQueue.end(), &audioMixerAssignment) == ::assignmentQueue.end()) { - qDebug("Missing an audio mixer and assignment not in queue. Adding.\n"); - ::assignmentQueue.push_front(&audioMixerAssignment); - } - ::assignmentQueueMutex.unlock(); - - while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) && - packetVersionMatch(packetData)) { - if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) { - // this is an RFD or domain list request packet, and there is a version match - std::map newestSoloNodes; + if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) { + if (!audioAssignment + || usecTimestampNow() - usecTimestamp(&audioAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) { - int numBytesSenderHeader = numBytesForPacketHeader(packetData); - - nodeType = *(packetData + numBytesSenderHeader); - int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE), - (sockaddr*) &nodeLocalAddress); - - replyDestinationSocket = nodePublicAddress; - - // check the node public address - // if it matches our local address - // or if it's the loopback address we're on the same box - if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress || - nodePublicAddress.sin_addr.s_addr == htonl(INADDR_LOOPBACK)) { - - nodePublicAddress.sin_addr.s_addr = 0; + if (!audioAssignment) { + audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool); + audioAssignment->setAttachedLocalSocket((sockaddr*) &localSocket); } - Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress, - (sockaddr*) &nodeLocalAddress, - nodeType, - nodeList->getLastNodeID()); + nodeList->sendAssignment(*audioAssignment); + audioAssignment->setCreateTimeToNow(); + } + } + + if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) { + if (!avatarAssignment + || usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) { + if (!avatarAssignment) { + avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool); + avatarAssignment->setAttachedLocalSocket((sockaddr*) &localSocket); + } - // if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it - if (newNode) { - if (newNode->getNodeID() == nodeList->getLastNodeID()) { - nodeList->increaseNodeID(); - } - - int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN); - - currentBufferPos = broadcastPacket + numHeaderBytes; - startPointer = currentBufferPos; - - unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE) - + numBytesSocket + sizeof(unsigned char); - int numInterestTypes = *(nodeTypesOfInterest - 1); - - if (numInterestTypes > 0) { - // if the node has sent no types of interest, assume they want nothing but their own ID back - for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { - if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) && - memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) { - // this is not the node themselves - // and this is an node of a type in the passed node types of interest - // or the node did not pass us any specific types they are interested in + nodeList->sendAssignment(*avatarAssignment); + + // reset the create time on the assignment so re-request is in ASSIGNMENT_SILENCE_MAX_USECS + avatarAssignment->setCreateTimeToNow(); + } + + } + + if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) && + (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) && + packetVersionMatch(packetData)) { + // this is an RFD or domain list request packet, and there is a version match + std::map newestSoloNodes; + + int numBytesSenderHeader = numBytesForPacketHeader(packetData); + + nodeType = *(packetData + numBytesSenderHeader); + int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE), + (sockaddr*) &nodeLocalAddress); + + sockaddr* destinationSocket = (sockaddr*) &nodePublicAddress; + + // check the node public address + // if it matches our local address we're on the same box + // so hardcode the EC2 public address for now + if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress) { + // If we're not running "local" then we do replace the IP + // with 0. This designates to clients that the server is reachable + // at the same IP address + if (!isLocalMode) { + nodePublicAddress.sin_addr.s_addr = 0; + destinationSocket = (sockaddr*) &nodeLocalAddress; + } + } + + Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress, + (sockaddr*) &nodeLocalAddress, + nodeType, + nodeList->getLastNodeID()); + + // if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it + if (newNode) { + if (newNode->getNodeID() == nodeList->getLastNodeID()) { + nodeList->increaseNodeID(); + } + + currentBufferPos = broadcastPacket + numHeaderBytes; + startPointer = currentBufferPos; + + unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE) + + numBytesSocket + sizeof(unsigned char); + int numInterestTypes = *(nodeTypesOfInterest - 1); + + if (numInterestTypes > 0) { + // if the node has sent no types of interest, assume they want nothing but their own ID back + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) && + memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) { + // this is not the node themselves + // and this is an node of a type in the passed node types of interest + // or the node did not pass us any specific types they are interested in + + if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) { + // this is an node of which there can be multiple, just add them to the packet + // don't send avatar nodes to other avatars, that will come from avatar mixer + if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) { + currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node)); + } - if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) { - // this is an node of which there can be multiple, just add them to the packet - // don't send avatar nodes to other avatars, that will come from avatar mixer - if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) { - currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node)); - } - - } else { - // solo node, we need to only send newest - if (newestSoloNodes[node->getType()] == NULL || - newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) { - // we have to set the newer solo node to add it to the broadcast later - newestSoloNodes[node->getType()] = &(*node); - } + } else { + // solo node, we need to only send newest + if (newestSoloNodes[node->getType()] == NULL || + newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) { + // we have to set the newer solo node to add it to the broadcast later + newestSoloNodes[node->getType()] = &(*node); } } } - - for (std::map::iterator soloNode = newestSoloNodes.begin(); - soloNode != newestSoloNodes.end(); - soloNode++) { - // this is the newest alive solo node, add them to the packet - currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, soloNode->second); - } } - // update last receive to now - uint64_t timeNow = usecTimestampNow(); - newNode->setLastHeardMicrostamp(timeNow); - - if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY - && memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) { - newNode->setWakeMicrostamp(timeNow); + for (std::map::iterator soloNode = newestSoloNodes.begin(); + soloNode != newestSoloNodes.end(); + soloNode++) { + // this is the newest alive solo node, add them to the packet + currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, soloNode->second); } - - // add the node ID to the end of the pointer - currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID()); - - // send the constructed list back to this node - nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket, - broadcastPacket, - (currentBufferPos - startPointer) + numHeaderBytes); - } - } else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) { - - qDebug("Received a request for assignment.\n"); - - ::assignmentQueueMutex.lock(); - - // this is an unassigned client talking to us directly for an assignment - // go through our queue and see if there are any assignments to give out - std::deque::iterator assignment = ::assignmentQueue.begin(); - - while (assignment != ::assignmentQueue.end()) { - - // give this assignment out, no conditions stop us from giving it to the local assignment client - int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT); - int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes); - - nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress, - broadcastPacket, - numHeaderBytes + numAssignmentBytes); - - // remove the assignment from the queue - ::assignmentQueue.erase(assignment); - - if ((*assignment)->getType() == Assignment::AgentType) { - // if this is a script assignment we need to delete it to avoid a memory leak - delete *assignment; - } - - // stop looping, we've handed out an assignment - break; } - ::assignmentQueueMutex.unlock(); + // update last receive to now + uint64_t timeNow = usecTimestampNow(); + newNode->setLastHeardMicrostamp(timeNow); + + if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY + && memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) { + newNode->setWakeMicrostamp(timeNow); + } + + // add the node ID to the end of the pointer + currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID()); + + // send the constructed list back to this node + nodeList->getNodeSocket()->send(destinationSocket, + broadcastPacket, + (currentBufferPos - startPointer) + numHeaderBytes); } } - // if ASSIGNMENT_REQUEST_INTERVAL_USECS have passed since last global assignment request then fire off another - if (usecTimestampNow() - usecTimestamp(&lastGlobalAssignmentRequest) >= GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS) { - gettimeofday(&lastGlobalAssignmentRequest, NULL); - - ::assignmentQueueMutex.lock(); - - // go through our queue and see if there are any assignments to send to the global assignment server - std::deque::iterator assignment = ::assignmentQueue.begin(); - - while (assignment != assignmentQueue.end()) { - - if ((*assignment)->getLocation() != Assignment::LocalLocation) { - // attach our local socket to the assignment so the assignment-server can optionally hand it out - (*assignment)->setAttachedLocalSocket((sockaddr*) &localSocket); - - nodeList->sendAssignment(*(*assignment)); - - // remove the assignment from the queue - ::assignmentQueue.erase(assignment); - - if ((*assignment)->getType() == Assignment::AgentType) { - // if this is a script assignment we need to delete it to avoid a memory leak - delete *assignment; - } - - // stop looping, we've handed out an assignment - break; - } else { - // push forward the iterator to check the next assignment - assignment++; - } - } - - ::assignmentQueueMutex.unlock(); - } - if (Logging::shouldSendStats()) { if (usecTimestampNow() - usecTimestamp(&lastStatSendTime) >= (NODE_COUNT_STAT_INTERVAL_MSECS * 1000)) { // time to send our count of nodes and servers to logstash @@ -359,6 +253,9 @@ int main(int argc, const char* argv[]) { } } } + + delete audioAssignment; + delete avatarAssignment; return 0; }