Hopefully the final fix to all those merges

This commit is contained in:
atlante45 2013-09-14 14:48:28 -07:00
parent 494242f363
commit 2b64e4553f
2 changed files with 251 additions and 139 deletions

View file

@ -8,6 +8,15 @@ set(TARGET_NAME domain-server)
include(${MACRO_DIR}/SetupHifiProject.cmake) include(${MACRO_DIR}/SetupHifiProject.cmake)
setup_hifi_project(${TARGET_NAME} TRUE) setup_hifi_project(${TARGET_NAME} TRUE)
# remove and then copy the files for the webserver
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E remove_directory
$<TARGET_FILE_DIR:${TARGET_NAME}>/resources/web)
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND ${CMAKE_COMMAND} -E copy_directory
"${PROJECT_SOURCE_DIR}/resources/web"
$<TARGET_FILE_DIR:${TARGET_NAME}>/resources/web)
# link the shared hifi library # link the shared hifi library
include(${MACRO_DIR}/LinkHifiLibrary.cmake) include(${MACRO_DIR}/LinkHifiLibrary.cmake)
link_hifi_library(shared ${TARGET_NAME} ${ROOT_DIR}) link_hifi_library(shared ${TARGET_NAME} ${ROOT_DIR})

View file

@ -19,12 +19,15 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h> #include <fcntl.h>
#include <deque>
#include <map> #include <map>
#include <math.h> #include <math.h>
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <QtCore/QMutex>
#include "Assignment.h" #include "Assignment.h"
#include "NodeList.h" #include "NodeList.h"
#include "NodeTypes.h" #include "NodeTypes.h"
@ -32,11 +35,16 @@
#include "PacketHeaders.h" #include "PacketHeaders.h"
#include "SharedUtil.h" #include "SharedUtil.h"
#include "mongoose.h"
const int DOMAIN_LISTEN_PORT = 40102; const int DOMAIN_LISTEN_PORT = 40102;
unsigned char packetData[MAX_PACKET_SIZE]; unsigned char packetData[MAX_PACKET_SIZE];
const int NODE_COUNT_STAT_INTERVAL_MSECS = 5000; const int NODE_COUNT_STAT_INTERVAL_MSECS = 5000;
QMutex assignmentQueueMutex;
std::deque<Assignment*> assignmentQueue;
unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd) { unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* nodeToAdd) {
*currentPosition++ = nodeToAdd->getType(); *currentPosition++ = nodeToAdd->getType();
@ -48,22 +56,51 @@ unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* no
return currentPosition; return currentPosition;
} }
int main(int argc, const char* argv[]) { static int mongooseRequestHandler(struct mg_connection *conn) {
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT); const struct mg_request_info *ri = mg_get_request_info(conn);
// If user asks to run in "local" mode then we do NOT replace the IP
// with the EC2 IP. Otherwise, we will replace the IP like we used to if (strcmp(ri->uri, "/assignment") == 0 && strcmp(ri->request_method, "POST") == 0) {
// this allows developers to run a local domain without recompiling the // return a 200
// domain server mg_printf(conn, "%s", "HTTP/1.0 200 OK\r\n\r\n");
bool isLocalMode = cmdOptionExists(argc, (const char**) argv, "--local"); // upload the file
if (isLocalMode) { mg_upload(conn, "/tmp");
printf("NOTE: Running in local mode!\n");
return 1;
} else { } else {
printf("--------------------------------------------------\n"); // have mongoose process this request from the document_root
printf("NOTE: Not running in local mode. \n"); return 0;
printf("If you're a developer testing a local system, you\n");
printf("probably want to include --local on command line.\n");
printf("--------------------------------------------------\n");
} }
}
const char ASSIGNMENT_SCRIPT_HOST_LOCATION[] = "resources/web/assignment";
static void mongooseUploadHandler(struct mg_connection *conn, const char *path) {
// create an assignment for this saved script, for now make it local only
Assignment *scriptAssignment = new Assignment(Assignment::CreateCommand, Assignment::AgentType, Assignment::LocalLocation);
QString newPath(ASSIGNMENT_SCRIPT_HOST_LOCATION);
newPath += "/";
// append the UUID for this script as the new filename, remove the curly braces
newPath += scriptAssignment->getUUIDStringWithoutCurlyBraces();
// rename the saved script to the GUID of the assignment and move it to the script host locaiton
rename(path, newPath.toStdString().c_str());
qDebug("Saved a script for assignment at %s\n", newPath.toStdString().c_str());
// add the script assigment to the assignment queue
// lock the assignment queue mutex since we're operating on a different thread than DS main
::assignmentQueueMutex.lock();
::assignmentQueue.push_back(scriptAssignment);
::assignmentQueueMutex.unlock();
}
int main(int argc, const char* argv[]) {
qInstallMessageHandler(Logging::verboseMessageHandler);
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT);
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
@ -71,12 +108,11 @@ int main(int argc, const char* argv[]) {
char nodeType = '\0'; char nodeType = '\0';
unsigned char broadcastPacket[MAX_PACKET_SIZE]; unsigned char broadcastPacket[MAX_PACKET_SIZE];
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
unsigned char* currentBufferPos; unsigned char* currentBufferPos;
unsigned char* startPointer; unsigned char* startPointer;
sockaddr_in nodePublicAddress, nodeLocalAddress; sockaddr_in nodePublicAddress, nodeLocalAddress, replyDestinationSocket;
nodeLocalAddress.sin_family = AF_INET; nodeLocalAddress.sin_family = AF_INET;
in_addr_t serverLocalAddress = getLocalAddress(); in_addr_t serverLocalAddress = getLocalAddress();
@ -84,13 +120,8 @@ int main(int argc, const char* argv[]) {
nodeList->startSilentNodeRemovalThread(); nodeList->startSilentNodeRemovalThread();
timeval lastStatSendTime = {}; timeval lastStatSendTime = {};
const char ASSIGNMENT_POOL_OPTION[] = "-p";
const char ASSIGNMENT_SERVER_OPTION[] = "-a"; const char ASSIGNMENT_SERVER_OPTION[] = "-a";
// set our assignment pool from argv, if it exists
const char* assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_OPTION);
// grab the overriden assignment-server hostname from argv, if it exists // grab the overriden assignment-server hostname from argv, if it exists
const char* customAssignmentServer = getCmdOption(argc, argv, ASSIGNMENT_SERVER_OPTION); const char* customAssignmentServer = getCmdOption(argc, argv, ASSIGNMENT_SERVER_OPTION);
if (customAssignmentServer) { if (customAssignmentServer) {
@ -99,53 +130,60 @@ int main(int argc, const char* argv[]) {
} }
// use a map to keep track of iterations of silence for assignment creation requests // use a map to keep track of iterations of silence for assignment creation requests
const long long ASSIGNMENT_SILENCE_MAX_USECS = 5 * 1000 * 1000; const long long GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000;
timeval lastGlobalAssignmentRequest = {};
// as a domain-server we will always want an audio mixer and avatar mixer // as a domain-server we will always want an audio mixer and avatar mixer
// setup the create assignment pointers for those // setup the create assignments for those
Assignment* audioAssignment = NULL; Assignment audioMixerAssignment(Assignment::CreateCommand,
Assignment* avatarAssignment = NULL; Assignment::AudioMixerType,
Assignment::LocalLocation);
// construct a local socket to send with our created assignments Assignment avatarMixerAssignment(Assignment::CreateCommand,
Assignment::AvatarMixerType,
Assignment::LocalLocation);
// construct a local socket to send with our created assignments to the global AS
sockaddr_in localSocket = {}; sockaddr_in localSocket = {};
localSocket.sin_family = AF_INET; localSocket.sin_family = AF_INET;
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort()); localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
localSocket.sin_addr.s_addr = serverLocalAddress; localSocket.sin_addr.s_addr = serverLocalAddress;
// setup the mongoose web server
struct mg_context *ctx;
struct mg_callbacks callbacks = {};
// list of options. Last element must be NULL.
const char *options[] = {"listening_ports", "8080",
"document_root", "./resources/web", NULL};
callbacks.begin_request = mongooseRequestHandler;
callbacks.upload = mongooseUploadHandler;
// Start the web server.
ctx = mg_start(&callbacks, NULL, options);
while (true) { while (true) {
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) {
if (!audioAssignment
|| usecTimestampNow() - usecTimestamp(&audioAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
if (!audioAssignment) { ::assignmentQueueMutex.lock();
audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool); // check if our audio-mixer or avatar-mixer are dead and we don't have existing assignments in the queue
audioAssignment->setAttachedLocalSocket((sockaddr*) &localSocket); // so we can add those assignments back to the front of the queue since they are high-priority
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER) &&
std::find(::assignmentQueue.begin(), assignmentQueue.end(), &avatarMixerAssignment) == ::assignmentQueue.end()) {
qDebug("Missing an avatar mixer and assignment not in queue. Adding.\n");
::assignmentQueue.push_front(&avatarMixerAssignment);
} }
nodeList->sendAssignment(*audioAssignment); if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER) &&
audioAssignment->setCreateTimeToNow(); std::find(::assignmentQueue.begin(), ::assignmentQueue.end(), &audioMixerAssignment) == ::assignmentQueue.end()) {
} qDebug("Missing an audio mixer and assignment not in queue. Adding.\n");
::assignmentQueue.push_front(&audioMixerAssignment);
} }
::assignmentQueueMutex.unlock();
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) { while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
if (!avatarAssignment
|| usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
if (!avatarAssignment) {
avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
avatarAssignment->setAttachedLocalSocket((sockaddr*) &localSocket);
}
nodeList->sendAssignment(*avatarAssignment);
// reset the create time on the assignment so re-request is in ASSIGNMENT_SILENCE_MAX_USECS
avatarAssignment->setCreateTimeToNow();
}
}
if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
(packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) &&
packetVersionMatch(packetData)) { packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
// this is an RFD or domain list request packet, and there is a version match // this is an RFD or domain list request packet, and there is a version match
std::map<char, Node *> newestSoloNodes; std::map<char, Node *> newestSoloNodes;
@ -155,19 +193,15 @@ int main(int argc, const char* argv[]) {
int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE), int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE),
(sockaddr*) &nodeLocalAddress); (sockaddr*) &nodeLocalAddress);
sockaddr* destinationSocket = (sockaddr*) &nodePublicAddress; replyDestinationSocket = nodePublicAddress;
// check the node public address // check the node public address
// if it matches our local address we're on the same box // if it matches our local address
// so hardcode the EC2 public address for now // or if it's the loopback address we're on the same box
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress) { if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress ||
// If we're not running "local" then we do replace the IP nodePublicAddress.sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
// with 0. This designates to clients that the server is reachable
// at the same IP address
if (!isLocalMode) {
nodePublicAddress.sin_addr.s_addr = 0; nodePublicAddress.sin_addr.s_addr = 0;
destinationSocket = (sockaddr*) &nodeLocalAddress;
}
} }
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress, Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
@ -181,6 +215,8 @@ int main(int argc, const char* argv[]) {
nodeList->increaseNodeID(); nodeList->increaseNodeID();
} }
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
currentBufferPos = broadcastPacket + numHeaderBytes; currentBufferPos = broadcastPacket + numHeaderBytes;
startPointer = currentBufferPos; startPointer = currentBufferPos;
@ -236,10 +272,80 @@ int main(int argc, const char* argv[]) {
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID()); currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
// send the constructed list back to this node // send the constructed list back to this node
nodeList->getNodeSocket()->send(destinationSocket, nodeList->getNodeSocket()->send((sockaddr*)&replyDestinationSocket,
broadcastPacket, broadcastPacket,
(currentBufferPos - startPointer) + numHeaderBytes); (currentBufferPos - startPointer) + numHeaderBytes);
} }
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
qDebug("Received a request for assignment.\n");
::assignmentQueueMutex.lock();
// this is an unassigned client talking to us directly for an assignment
// go through our queue and see if there are any assignments to give out
std::deque<Assignment*>::iterator assignment = ::assignmentQueue.begin();
while (assignment != ::assignmentQueue.end()) {
// give this assignment out, no conditions stop us from giving it to the local assignment client
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes);
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
broadcastPacket,
numHeaderBytes + numAssignmentBytes);
// remove the assignment from the queue
::assignmentQueue.erase(assignment);
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
delete *assignment;
}
// stop looping, we've handed out an assignment
break;
}
::assignmentQueueMutex.unlock();
}
}
// if ASSIGNMENT_REQUEST_INTERVAL_USECS have passed since last global assignment request then fire off another
if (usecTimestampNow() - usecTimestamp(&lastGlobalAssignmentRequest) >= GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS) {
gettimeofday(&lastGlobalAssignmentRequest, NULL);
::assignmentQueueMutex.lock();
// go through our queue and see if there are any assignments to send to the global assignment server
std::deque<Assignment*>::iterator assignment = ::assignmentQueue.begin();
while (assignment != assignmentQueue.end()) {
if ((*assignment)->getLocation() != Assignment::LocalLocation) {
// attach our local socket to the assignment so the assignment-server can optionally hand it out
(*assignment)->setAttachedLocalSocket((sockaddr*) &localSocket);
nodeList->sendAssignment(*(*assignment));
// remove the assignment from the queue
::assignmentQueue.erase(assignment);
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
delete *assignment;
}
// stop looping, we've handed out an assignment
break;
} else {
// push forward the iterator to check the next assignment
assignment++;
}
}
::assignmentQueueMutex.unlock();
} }
if (Logging::shouldSendStats()) { if (Logging::shouldSendStats()) {
@ -254,9 +360,6 @@ int main(int argc, const char* argv[]) {
} }
} }
delete audioAssignment;
delete avatarAssignment;
return 0; return 0;
} }