mirror of
https://github.com/lubosz/overte.git
synced 2025-04-08 03:42:09 +02:00
have DS act as AS and hand assignments directly
This commit is contained in:
parent
972dd795dd
commit
b7aa49cf97
5 changed files with 220 additions and 184 deletions
|
@ -26,7 +26,6 @@ const char CHILD_TARGET_NAME[] = "assignment-client";
|
|||
|
||||
pid_t* childForks = NULL;
|
||||
sockaddr_in customAssignmentSocket = {};
|
||||
const char* assignmentPool = NULL;
|
||||
int numForks = 0;
|
||||
|
||||
void childClient() {
|
||||
|
@ -51,8 +50,10 @@ void childClient() {
|
|||
unsigned char packetData[MAX_PACKET_SIZE];
|
||||
ssize_t receivedBytes = 0;
|
||||
|
||||
sockaddr_in senderSocket = {};
|
||||
|
||||
// create a request assignment, accept all assignments, pass the desired pool (if it exists)
|
||||
Assignment requestAssignment(Assignment::Request, Assignment::All, assignmentPool);
|
||||
Assignment requestAssignment(Assignment::RequestDirection, Assignment::AllTypes);
|
||||
|
||||
while (true) {
|
||||
if (usecTimestampNow() - usecTimestamp(&lastRequest) >= ASSIGNMENT_REQUEST_INTERVAL_USECS) {
|
||||
|
@ -61,22 +62,34 @@ void childClient() {
|
|||
nodeList->sendAssignment(requestAssignment);
|
||||
}
|
||||
|
||||
if (nodeList->getNodeSocket()->receive(packetData, &receivedBytes) &&
|
||||
packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) {
|
||||
if (nodeList->getNodeSocket()->receive((sockaddr*) &senderSocket, packetData, &receivedBytes) &&
|
||||
(packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT || packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT)
|
||||
&& packetVersionMatch(packetData)) {
|
||||
|
||||
// construct the deployed assignment from the packet data
|
||||
Assignment deployedAssignment(packetData, receivedBytes);
|
||||
|
||||
qDebug() << "Received an assignment -" << deployedAssignment << "\n";
|
||||
|
||||
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
|
||||
if (deployedAssignment.getAttachedPublicSocket()->sa_family == AF_INET) {
|
||||
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getAttachedPublicSocket())->sin_addr;
|
||||
// switch our nodelist DOMAIN_IP
|
||||
if (packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT ||
|
||||
deployedAssignment.getAttachedPublicSocket()->sa_family == AF_INET) {
|
||||
|
||||
in_addr domainSocketAddr = {};
|
||||
|
||||
if (packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
|
||||
// the domain server IP address is the address we got this packet from
|
||||
domainSocketAddr = senderSocket.sin_addr;
|
||||
} else {
|
||||
// grab the domain server IP address from the packet from the AS
|
||||
domainSocketAddr = ((sockaddr_in*) deployedAssignment.getAttachedPublicSocket())->sin_addr;
|
||||
}
|
||||
|
||||
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
|
||||
|
||||
qDebug("Destination IP for assignment is %s\n", inet_ntoa(domainSocketAddr));
|
||||
|
||||
if (deployedAssignment.getType() == Assignment::AudioMixer) {
|
||||
if (deployedAssignment.getType() == Assignment::AudioMixerType) {
|
||||
AudioMixer::run();
|
||||
} else {
|
||||
AvatarMixer::run();
|
||||
|
@ -165,19 +178,23 @@ int main(int argc, const char* argv[]) {
|
|||
// start the Logging class with the parent's target name
|
||||
Logging::setTargetName(PARENT_TARGET_NAME);
|
||||
|
||||
const char CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION[] = "-a";
|
||||
const char CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION[] = "-p";
|
||||
|
||||
// grab the overriden assignment-server hostname from argv, if it exists
|
||||
const char* customAssignmentServer = getCmdOption(argc, argv, "-a");
|
||||
if (customAssignmentServer) {
|
||||
::customAssignmentSocket = socketForHostnameAndHostOrderPort(customAssignmentServer, ASSIGNMENT_SERVER_PORT);
|
||||
const char* customAssignmentServerHostname = getCmdOption(argc, argv, CUSTOM_ASSIGNMENT_SERVER_HOSTNAME_OPTION);
|
||||
|
||||
if (customAssignmentServerHostname) {
|
||||
const char* customAssignmentServerPortString = getCmdOption(argc, argv, CUSTOM_ASSIGNMENT_SERVER_PORT_OPTION);
|
||||
unsigned short assignmentServerPort = customAssignmentServerPortString
|
||||
? atoi(customAssignmentServerPortString) : ASSIGNMENT_SERVER_PORT;
|
||||
|
||||
::customAssignmentSocket = socketForHostnameAndHostOrderPort(customAssignmentServerHostname, assignmentServerPort);
|
||||
}
|
||||
|
||||
const char* NUM_FORKS_PARAMETER = "-n";
|
||||
const char* numForksString = getCmdOption(argc, argv, NUM_FORKS_PARAMETER);
|
||||
|
||||
// grab the assignment pool from argv, if it was passed
|
||||
const char* ASSIGNMENT_POOL_PARAMETER = "-p";
|
||||
::assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_PARAMETER);
|
||||
|
||||
int processID = 0;
|
||||
|
||||
if (numForksString) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include <arpa/inet.h>
|
||||
#include <fcntl.h>
|
||||
#include <deque.h>
|
||||
#include <map>
|
||||
#include <math.h>
|
||||
#include <string.h>
|
||||
|
@ -50,6 +51,7 @@ unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* no
|
|||
|
||||
int main(int argc, const char* argv[]) {
|
||||
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT);
|
||||
|
||||
// If user asks to run in "local" mode then we do NOT replace the IP
|
||||
// with the EC2 IP. Otherwise, we will replace the IP like we used to
|
||||
// this allows developers to run a local domain without recompiling the
|
||||
|
@ -71,7 +73,6 @@ int main(int argc, const char* argv[]) {
|
|||
char nodeType = '\0';
|
||||
|
||||
unsigned char broadcastPacket[MAX_PACKET_SIZE];
|
||||
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
|
||||
|
||||
unsigned char* currentBufferPos;
|
||||
unsigned char* startPointer;
|
||||
|
@ -84,13 +85,8 @@ int main(int argc, const char* argv[]) {
|
|||
nodeList->startSilentNodeRemovalThread();
|
||||
|
||||
timeval lastStatSendTime = {};
|
||||
|
||||
const char ASSIGNMENT_POOL_OPTION[] = "-p";
|
||||
const char ASSIGNMENT_SERVER_OPTION[] = "-a";
|
||||
|
||||
// set our assignment pool from argv, if it exists
|
||||
const char* assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_OPTION);
|
||||
|
||||
// grab the overriden assignment-server hostname from argv, if it exists
|
||||
const char* customAssignmentServer = getCmdOption(argc, argv, ASSIGNMENT_SERVER_OPTION);
|
||||
if (customAssignmentServer) {
|
||||
|
@ -99,149 +95,197 @@ int main(int argc, const char* argv[]) {
|
|||
}
|
||||
|
||||
// use a map to keep track of iterations of silence for assignment creation requests
|
||||
const long long ASSIGNMENT_SILENCE_MAX_USECS = 5 * 1000 * 1000;
|
||||
const long long GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000;
|
||||
timeval lastGlobalAssignmentRequest = {};
|
||||
|
||||
// setup the assignment queue
|
||||
std::deque<Assignment*> assignmentQueue;
|
||||
|
||||
// as a domain-server we will always want an audio mixer and avatar mixer
|
||||
// setup the create assignment pointers for those
|
||||
Assignment* audioAssignment = NULL;
|
||||
Assignment* avatarAssignment = NULL;
|
||||
// setup the create assignments for those
|
||||
Assignment audioMixerAssignment(Assignment::CreateDirection,
|
||||
Assignment::AudioMixerType);
|
||||
|
||||
// construct a local socket to send with our created assignments
|
||||
Assignment avatarMixerAssignment(Assignment::CreateDirection,
|
||||
Assignment::AvatarMixerType,
|
||||
Assignment::LocalLocation);
|
||||
|
||||
// construct a local socket to send with our created assignments to the global AS
|
||||
sockaddr_in localSocket = {};
|
||||
localSocket.sin_family = AF_INET;
|
||||
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
|
||||
localSocket.sin_addr.s_addr = serverLocalAddress;
|
||||
|
||||
while (true) {
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) {
|
||||
if (!audioAssignment
|
||||
|| usecTimestampNow() - usecTimestamp(&audioAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
|
||||
|
||||
if (!audioAssignment) {
|
||||
audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool);
|
||||
audioAssignment->setAttachedLocalSocket((sockaddr*) &localSocket);
|
||||
}
|
||||
|
||||
nodeList->sendAssignment(*audioAssignment);
|
||||
audioAssignment->setCreateTimeToNow();
|
||||
}
|
||||
|
||||
// check if our audio-mixer or avatar-mixer are dead and we don't have existing assignments in the queue
|
||||
// so we can add those assignments back to the front of the queue since they are high-priority
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER) &&
|
||||
std::find(assignmentQueue.begin(), assignmentQueue.end(), &avatarMixerAssignment) == assignmentQueue.end()) {
|
||||
qDebug("Missing an avatar mixer and assignment not in queue. Adding.\n");
|
||||
assignmentQueue.push_front(&avatarMixerAssignment);
|
||||
}
|
||||
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) {
|
||||
if (!avatarAssignment
|
||||
|| usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
|
||||
if (!avatarAssignment) {
|
||||
avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
|
||||
avatarAssignment->setAttachedLocalSocket((sockaddr*) &localSocket);
|
||||
}
|
||||
|
||||
nodeList->sendAssignment(*avatarAssignment);
|
||||
|
||||
// reset the create time on the assignment so re-request is in ASSIGNMENT_SILENCE_MAX_USECS
|
||||
avatarAssignment->setCreateTimeToNow();
|
||||
}
|
||||
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER) &&
|
||||
std::find(assignmentQueue.begin(), assignmentQueue.end(), &audioMixerAssignment) == assignmentQueue.end()) {
|
||||
qDebug("Missing an audio mixer and assignment not in queue. Adding.\n");
|
||||
assignmentQueue.push_front(&audioMixerAssignment);
|
||||
}
|
||||
|
||||
if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
|
||||
(packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) &&
|
||||
packetVersionMatch(packetData)) {
|
||||
// this is an RFD or domain list request packet, and there is a version match
|
||||
std::map<char, Node *> newestSoloNodes;
|
||||
|
||||
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
|
||||
|
||||
nodeType = *(packetData + numBytesSenderHeader);
|
||||
int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE),
|
||||
(sockaddr*) &nodeLocalAddress);
|
||||
|
||||
sockaddr* destinationSocket = (sockaddr*) &nodePublicAddress;
|
||||
|
||||
// check the node public address
|
||||
// if it matches our local address we're on the same box
|
||||
// so hardcode the EC2 public address for now
|
||||
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress) {
|
||||
// If we're not running "local" then we do replace the IP
|
||||
// with 0. This designates to clients that the server is reachable
|
||||
// at the same IP address
|
||||
if (!isLocalMode) {
|
||||
nodePublicAddress.sin_addr.s_addr = 0;
|
||||
destinationSocket = (sockaddr*) &nodeLocalAddress;
|
||||
}
|
||||
}
|
||||
|
||||
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
|
||||
(sockaddr*) &nodeLocalAddress,
|
||||
nodeType,
|
||||
nodeList->getLastNodeID());
|
||||
|
||||
// if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it
|
||||
if (newNode) {
|
||||
if (newNode->getNodeID() == nodeList->getLastNodeID()) {
|
||||
nodeList->increaseNodeID();
|
||||
while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
|
||||
packetVersionMatch(packetData)) {
|
||||
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) {
|
||||
// this is an RFD or domain list request packet, and there is a version match
|
||||
std::map<char, Node *> newestSoloNodes;
|
||||
|
||||
int numBytesSenderHeader = numBytesForPacketHeader(packetData);
|
||||
|
||||
nodeType = *(packetData + numBytesSenderHeader);
|
||||
int numBytesSocket = unpackSocket(packetData + numBytesSenderHeader + sizeof(NODE_TYPE),
|
||||
(sockaddr*) &nodeLocalAddress);
|
||||
|
||||
sockaddr* destinationSocket = (sockaddr*) &nodePublicAddress;
|
||||
|
||||
// check the node public address
|
||||
// if it matches our local address we're on the same box
|
||||
// so hardcode the EC2 public address for now
|
||||
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress) {
|
||||
// If we're not running "local" then we do replace the IP
|
||||
// with 0. This designates to clients that the server is reachable
|
||||
// at the same IP address
|
||||
if (!isLocalMode) {
|
||||
nodePublicAddress.sin_addr.s_addr = 0;
|
||||
destinationSocket = (sockaddr*) &nodeLocalAddress;
|
||||
}
|
||||
}
|
||||
|
||||
currentBufferPos = broadcastPacket + numHeaderBytes;
|
||||
startPointer = currentBufferPos;
|
||||
Node* newNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
|
||||
(sockaddr*) &nodeLocalAddress,
|
||||
nodeType,
|
||||
nodeList->getLastNodeID());
|
||||
|
||||
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE)
|
||||
+ numBytesSocket + sizeof(unsigned char);
|
||||
int numInterestTypes = *(nodeTypesOfInterest - 1);
|
||||
|
||||
if (numInterestTypes > 0) {
|
||||
// if the node has sent no types of interest, assume they want nothing but their own ID back
|
||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
|
||||
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
|
||||
// this is not the node themselves
|
||||
// and this is an node of a type in the passed node types of interest
|
||||
// or the node did not pass us any specific types they are interested in
|
||||
|
||||
if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) {
|
||||
// this is an node of which there can be multiple, just add them to the packet
|
||||
// don't send avatar nodes to other avatars, that will come from avatar mixer
|
||||
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
|
||||
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
|
||||
}
|
||||
// if addOrUpdateNode returns NULL this was a solo node we already have, don't talk back to it
|
||||
if (newNode) {
|
||||
if (newNode->getNodeID() == nodeList->getLastNodeID()) {
|
||||
nodeList->increaseNodeID();
|
||||
}
|
||||
|
||||
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
|
||||
|
||||
currentBufferPos = broadcastPacket + numHeaderBytes;
|
||||
startPointer = currentBufferPos;
|
||||
|
||||
unsigned char* nodeTypesOfInterest = packetData + numBytesSenderHeader + sizeof(NODE_TYPE)
|
||||
+ numBytesSocket + sizeof(unsigned char);
|
||||
int numInterestTypes = *(nodeTypesOfInterest - 1);
|
||||
|
||||
if (numInterestTypes > 0) {
|
||||
// if the node has sent no types of interest, assume they want nothing but their own ID back
|
||||
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
|
||||
if (!node->matches((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType) &&
|
||||
memchr(nodeTypesOfInterest, node->getType(), numInterestTypes)) {
|
||||
// this is not the node themselves
|
||||
// and this is an node of a type in the passed node types of interest
|
||||
// or the node did not pass us any specific types they are interested in
|
||||
|
||||
} else {
|
||||
// solo node, we need to only send newest
|
||||
if (newestSoloNodes[node->getType()] == NULL ||
|
||||
newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) {
|
||||
// we have to set the newer solo node to add it to the broadcast later
|
||||
newestSoloNodes[node->getType()] = &(*node);
|
||||
if (memchr(SOLO_NODE_TYPES, node->getType(), sizeof(SOLO_NODE_TYPES)) == NULL) {
|
||||
// this is an node of which there can be multiple, just add them to the packet
|
||||
// don't send avatar nodes to other avatars, that will come from avatar mixer
|
||||
if (nodeType != NODE_TYPE_AGENT || node->getType() != NODE_TYPE_AGENT) {
|
||||
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, &(*node));
|
||||
}
|
||||
|
||||
} else {
|
||||
// solo node, we need to only send newest
|
||||
if (newestSoloNodes[node->getType()] == NULL ||
|
||||
newestSoloNodes[node->getType()]->getWakeMicrostamp() < node->getWakeMicrostamp()) {
|
||||
// we have to set the newer solo node to add it to the broadcast later
|
||||
newestSoloNodes[node->getType()] = &(*node);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (std::map<char, Node *>::iterator soloNode = newestSoloNodes.begin();
|
||||
soloNode != newestSoloNodes.end();
|
||||
soloNode++) {
|
||||
// this is the newest alive solo node, add them to the packet
|
||||
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, soloNode->second);
|
||||
}
|
||||
}
|
||||
|
||||
for (std::map<char, Node *>::iterator soloNode = newestSoloNodes.begin();
|
||||
soloNode != newestSoloNodes.end();
|
||||
soloNode++) {
|
||||
// this is the newest alive solo node, add them to the packet
|
||||
currentBufferPos = addNodeToBroadcastPacket(currentBufferPos, soloNode->second);
|
||||
// update last receive to now
|
||||
uint64_t timeNow = usecTimestampNow();
|
||||
newNode->setLastHeardMicrostamp(timeNow);
|
||||
|
||||
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY
|
||||
&& memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
|
||||
newNode->setWakeMicrostamp(timeNow);
|
||||
}
|
||||
|
||||
// add the node ID to the end of the pointer
|
||||
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
|
||||
|
||||
// send the constructed list back to this node
|
||||
nodeList->getNodeSocket()->send(destinationSocket,
|
||||
broadcastPacket,
|
||||
(currentBufferPos - startPointer) + numHeaderBytes);
|
||||
}
|
||||
} else if (packetData[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
|
||||
|
||||
// update last receive to now
|
||||
uint64_t timeNow = usecTimestampNow();
|
||||
newNode->setLastHeardMicrostamp(timeNow);
|
||||
qDebug("Received a request for assignment.\n");
|
||||
|
||||
if (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY
|
||||
&& memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
|
||||
newNode->setWakeMicrostamp(timeNow);
|
||||
// this is an unassigned client talking to us directly for an assignment
|
||||
// go through our queue and see if there are any assignments to give out
|
||||
std::deque<Assignment*>::iterator assignment = assignmentQueue.begin();
|
||||
|
||||
while (assignment != assignmentQueue.end()) {
|
||||
|
||||
// give this assignment out, no conditions stop us from giving it to the local assignment client
|
||||
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_CREATE_ASSIGNMENT);
|
||||
int numAssignmentBytes = (*assignment)->packToBuffer(broadcastPacket + numHeaderBytes);
|
||||
|
||||
nodeList->getNodeSocket()->send((sockaddr*) &nodePublicAddress,
|
||||
broadcastPacket,
|
||||
numHeaderBytes + numAssignmentBytes);
|
||||
|
||||
// remove the assignment from the queue
|
||||
assignmentQueue.erase(assignment);
|
||||
|
||||
// stop looping, we've handed out an assignment
|
||||
break;
|
||||
}
|
||||
|
||||
// add the node ID to the end of the pointer
|
||||
currentBufferPos += packNodeId(currentBufferPos, newNode->getNodeID());
|
||||
|
||||
// send the constructed list back to this node
|
||||
nodeList->getNodeSocket()->send(destinationSocket,
|
||||
broadcastPacket,
|
||||
(currentBufferPos - startPointer) + numHeaderBytes);
|
||||
}
|
||||
}
|
||||
|
||||
// if ASSIGNMENT_REQUEST_INTERVAL_USECS have passed since last global assignment request then fire off another
|
||||
if (usecTimestampNow() - usecTimestamp(&lastGlobalAssignmentRequest) >= GLOBAL_ASSIGNMENT_REQUEST_INTERVAL_USECS) {
|
||||
gettimeofday(&lastGlobalAssignmentRequest, NULL);
|
||||
|
||||
// go through our queue and see if there are any assignments to send to the global assignment server
|
||||
std::deque<Assignment*>::iterator assignment = assignmentQueue.begin();
|
||||
|
||||
while (assignment != assignmentQueue.end()) {
|
||||
|
||||
if ((*assignment)->getLocation() != Assignment::LocalLocation) {
|
||||
// attach our local socket to the assignment so the assignment-server can optionally hand it out
|
||||
(*assignment)->setAttachedLocalSocket((sockaddr*) &localSocket);
|
||||
|
||||
nodeList->sendAssignment(*(*assignment));
|
||||
|
||||
// remove the assignment from the queue
|
||||
assignmentQueue.erase(assignment);
|
||||
|
||||
// stop looping, we've handed out an assignment
|
||||
break;
|
||||
} else {
|
||||
// push forward the iterator to check the next assignment
|
||||
assignment++;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (Logging::shouldSendStats()) {
|
||||
if (usecTimestampNow() - usecTimestamp(&lastStatSendTime) >= (NODE_COUNT_STAT_INTERVAL_MSECS * 1000)) {
|
||||
// time to send our count of nodes and servers to logstash
|
||||
|
@ -253,9 +297,6 @@ int main(int argc, const char* argv[]) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete audioAssignment;
|
||||
delete avatarAssignment;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -13,28 +13,19 @@
|
|||
const char IPv4_ADDRESS_DESIGNATOR = 4;
|
||||
const char IPv6_ADDRESS_DESIGNATOR = 6;
|
||||
|
||||
Assignment::Assignment(Assignment::Direction direction, Assignment::Type type, const char* pool) :
|
||||
Assignment::Assignment(Assignment::Direction direction, Assignment::Type type, Assignment::Location location) :
|
||||
_direction(direction),
|
||||
_type(type),
|
||||
_pool(NULL),
|
||||
_location(location),
|
||||
_attachedPublicSocket(NULL),
|
||||
_attachedLocalSocket(NULL)
|
||||
{
|
||||
// set the create time on this assignment
|
||||
gettimeofday(&_time, NULL);
|
||||
|
||||
// copy the pool, if we got one
|
||||
if (pool) {
|
||||
int poolLength = strlen(pool);
|
||||
|
||||
// create the char array and make it large enough for string and null termination
|
||||
_pool = new char[poolLength + sizeof(char)];
|
||||
strcpy(_pool, pool);
|
||||
}
|
||||
}
|
||||
|
||||
Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
|
||||
_pool(NULL),
|
||||
_location(GlobalLocation),
|
||||
_attachedPublicSocket(NULL),
|
||||
_attachedLocalSocket(NULL)
|
||||
{
|
||||
|
@ -44,11 +35,11 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
|
|||
int numBytesRead = 0;
|
||||
|
||||
if (dataBuffer[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) {
|
||||
_direction = Assignment::Request;
|
||||
_direction = Assignment::RequestDirection;
|
||||
} else if (dataBuffer[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
|
||||
_direction = Assignment::Create;
|
||||
_direction = Assignment::CreateDirection;
|
||||
} else if (dataBuffer[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT) {
|
||||
_direction = Assignment::Deploy;
|
||||
_direction = Assignment::DeployDirection;
|
||||
}
|
||||
|
||||
numBytesRead += numBytesForPacketHeader(dataBuffer);
|
||||
|
@ -56,15 +47,6 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
|
|||
memcpy(&_type, dataBuffer + numBytesRead, sizeof(Assignment::Type));
|
||||
numBytesRead += sizeof(Assignment::Type);
|
||||
|
||||
if (dataBuffer[numBytesRead] != 0) {
|
||||
int poolLength = strlen((const char*) dataBuffer + numBytesRead);
|
||||
_pool = new char[poolLength + sizeof(char)];
|
||||
strcpy(_pool, (char*) dataBuffer + numBytesRead);
|
||||
numBytesRead += poolLength + sizeof(char);
|
||||
} else {
|
||||
numBytesRead += sizeof(char);
|
||||
}
|
||||
|
||||
if (numBytes > numBytesRead) {
|
||||
|
||||
sockaddr* newSocket = NULL;
|
||||
|
@ -78,7 +60,7 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
|
|||
qDebug("Received a socket that cannot be unpacked!\n");
|
||||
}
|
||||
|
||||
if (_direction == Assignment::Create) {
|
||||
if (_direction == Assignment::CreateDirection) {
|
||||
delete _attachedLocalSocket;
|
||||
_attachedLocalSocket = newSocket;
|
||||
} else {
|
||||
|
@ -91,7 +73,6 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
|
|||
Assignment::~Assignment() {
|
||||
delete _attachedPublicSocket;
|
||||
delete _attachedLocalSocket;
|
||||
delete _pool;
|
||||
}
|
||||
|
||||
void Assignment::setAttachedPublicSocket(const sockaddr* attachedPublicSocket) {
|
||||
|
@ -124,16 +105,6 @@ int Assignment::packToBuffer(unsigned char* buffer) {
|
|||
memcpy(buffer + numPackedBytes, &_type, sizeof(_type));
|
||||
numPackedBytes += sizeof(_type);
|
||||
|
||||
if (_pool) {
|
||||
int poolLength = strlen(_pool);
|
||||
strcpy((char*) buffer + numPackedBytes, _pool);
|
||||
|
||||
numPackedBytes += poolLength + sizeof(char);
|
||||
} else {
|
||||
buffer[numPackedBytes] = '\0';
|
||||
numPackedBytes += sizeof(char);
|
||||
}
|
||||
|
||||
if (_attachedPublicSocket || _attachedLocalSocket) {
|
||||
sockaddr* socketToPack = (_attachedPublicSocket) ? _attachedPublicSocket : _attachedLocalSocket;
|
||||
|
||||
|
@ -148,6 +119,6 @@ int Assignment::packToBuffer(unsigned char* buffer) {
|
|||
}
|
||||
|
||||
QDebug operator<<(QDebug debug, const Assignment &assignment) {
|
||||
debug << "T:" << assignment.getType() << "P:" << assignment.getPool();
|
||||
debug << "T:" << assignment.getType();
|
||||
return debug.nospace();
|
||||
}
|
|
@ -18,18 +18,25 @@ class Assignment {
|
|||
public:
|
||||
|
||||
enum Type {
|
||||
AudioMixer,
|
||||
AvatarMixer,
|
||||
All
|
||||
AudioMixerType,
|
||||
AvatarMixerType,
|
||||
AllTypes
|
||||
};
|
||||
|
||||
enum Direction {
|
||||
Create,
|
||||
Deploy,
|
||||
Request
|
||||
CreateDirection,
|
||||
DeployDirection,
|
||||
RequestDirection
|
||||
};
|
||||
|
||||
Assignment(Assignment::Direction direction, Assignment::Type type, const char* pool = NULL);
|
||||
enum Location {
|
||||
GlobalLocation,
|
||||
LocalLocation
|
||||
};
|
||||
|
||||
Assignment(Assignment::Direction direction,
|
||||
Assignment::Type type,
|
||||
Assignment::Location location = Assignment::GlobalLocation);
|
||||
|
||||
/// Constructs an Assignment from the data in the buffer
|
||||
/// \param dataBuffer the source buffer to un-pack the assignment from
|
||||
|
@ -40,7 +47,7 @@ public:
|
|||
|
||||
Assignment::Direction getDirection() const { return _direction; }
|
||||
Assignment::Type getType() const { return _type; }
|
||||
const char* getPool() const { return _pool; }
|
||||
Assignment::Location getLocation() const { return _location; }
|
||||
const timeval& getTime() const { return _time; }
|
||||
|
||||
const sockaddr* getAttachedPublicSocket() { return _attachedPublicSocket; }
|
||||
|
@ -60,7 +67,7 @@ public:
|
|||
private:
|
||||
Assignment::Direction _direction; /// the direction of the assignment (Create, Deploy, Request)
|
||||
Assignment::Type _type; /// the type of the assignment, defines what the assignee will do
|
||||
char* _pool; /// the pool this assignment is for/from
|
||||
Assignment::Location _location; /// the location of the assignment, allows a domain to preferentially use local ACs
|
||||
sockaddr* _attachedPublicSocket; /// pointer to a public socket that relates to assignment, depends on direction
|
||||
sockaddr* _attachedLocalSocket; /// pointer to a local socket that relates to assignment, depends on direction
|
||||
timeval _time; /// time the assignment was created (set in constructor)
|
||||
|
|
|
@ -382,7 +382,7 @@ const sockaddr_in GLOBAL_ASSIGNMENT_SOCKET = socketForHostnameAndHostOrderPort(G
|
|||
void NodeList::sendAssignment(Assignment& assignment) {
|
||||
unsigned char assignmentPacket[MAX_PACKET_SIZE];
|
||||
|
||||
PACKET_TYPE assignmentPacketType = assignment.getDirection() == Assignment::Create
|
||||
PACKET_TYPE assignmentPacketType = assignment.getDirection() == Assignment::CreateDirection
|
||||
? PACKET_TYPE_CREATE_ASSIGNMENT
|
||||
: PACKET_TYPE_REQUEST_ASSIGNMENT;
|
||||
|
||||
|
|
Loading…
Reference in a new issue