Merge branch 'assignment'

This commit is contained in:
Stephen Birarda 2013-09-06 16:44:07 -07:00
commit ebd97227af
6 changed files with 106 additions and 105 deletions

View file

@ -16,9 +16,9 @@
#include <PacketHeaders.h> #include <PacketHeaders.h>
#include <SharedUtil.h> #include <SharedUtil.h>
const int ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000; const long long ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000;
int main(int argc, char* const argv[]) { int main(int argc, const char* argv[]) {
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
@ -28,61 +28,56 @@ int main(int argc, char* const argv[]) {
// change the timeout on the nodelist socket to be as often as we want to re-request // change the timeout on the nodelist socket to be as often as we want to re-request
nodeList->getNodeSocket()->setBlockingReceiveTimeoutInUsecs(ASSIGNMENT_REQUEST_INTERVAL_USECS); nodeList->getNodeSocket()->setBlockingReceiveTimeoutInUsecs(ASSIGNMENT_REQUEST_INTERVAL_USECS);
timeval lastRequest = {};
unsigned char packetData[MAX_PACKET_SIZE]; unsigned char packetData[MAX_PACKET_SIZE];
ssize_t receivedBytes = 0; ssize_t receivedBytes = 0;
// loop the parameters to see if we were passed a pool // grab the assignment pool from argv, if it was passed
int parameter = -1; const char* assignmentPool = getCmdOption(argc, argv, "-p");
const char ALLOWED_PARAMETERS[] = "p::";
const char POOL_PARAMETER_CHAR = 'p';
char* assignmentPool = NULL;
while ((parameter = getopt(argc, argv, ALLOWED_PARAMETERS)) != -1) {
if (parameter == POOL_PARAMETER_CHAR) {
// copy the passed assignment pool
int poolLength = strlen(optarg);
assignmentPool = new char[poolLength + sizeof(char)];
strcpy(assignmentPool, optarg);
}
}
// set the overriden assignment-server hostname from argv, if it exists
nodeList->setAssignmentServerHostname(getCmdOption(argc, argv, "-a"));
// create a request assignment, accept all assignments, pass the desired pool (if it exists) // create a request assignment, accept all assignments, pass the desired pool (if it exists)
Assignment requestAssignment(Assignment::Request, Assignment::All, assignmentPool); Assignment requestAssignment(Assignment::Request, Assignment::All, assignmentPool);
while (true) { while (true) {
// if we're here we have no assignment, so send a request if (usecTimestampNow() - usecTimestamp(&lastRequest) >= ASSIGNMENT_REQUEST_INTERVAL_USECS) {
qDebug() << "Sending an assignment request -" << requestAssignment; gettimeofday(&lastRequest, NULL);
nodeList->sendAssignment(requestAssignment); // if we're here we have no assignment, so send a request
qDebug() << "Sending an assignment request -" << requestAssignment;
nodeList->sendAssignment(requestAssignment);
}
while (nodeList->getNodeSocket()->receive(packetData, &receivedBytes)) { if (nodeList->getNodeSocket()->receive(packetData, &receivedBytes) &&
if (packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) { packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) {
// construct the deployed assignment from the packet data
Assignment deployedAssignment(packetData, receivedBytes);
qDebug() << "Received an assignment - " << deployedAssignment << "\n";
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
if (deployedAssignment.getDomainSocket()->sa_family == AF_INET) {
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getDomainSocket())->sin_addr;
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
// construct the deployed assignment from the packet data qDebug() << "Changed domain IP to " << inet_ntoa(domainSocketAddr);
Assignment deployedAssignment(packetData, receivedBytes);
qDebug() << "Received an assignment - " << deployedAssignment << "\n";
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
if (deployedAssignment.getDomainSocket()->sa_family == AF_INET) {
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getDomainSocket())->sin_addr;
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
qDebug() << "Changed domain IP to " << inet_ntoa(domainSocketAddr);
}
if (deployedAssignment.getType() == Assignment::AudioMixer) {
AudioMixer::run();
} else {
AvatarMixer::run();
}
qDebug() << "Assignment finished or never started - waiting for new assignment";
// reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NODE_TYPE_UNASSIGNED);
nodeList->clear();
} }
if (deployedAssignment.getType() == Assignment::AudioMixer) {
AudioMixer::run();
} else {
qDebug() << "Running as an avatar mixer!";
AvatarMixer::run();
}
qDebug() << "Assignment finished or never started - waiting for new assignment";
// reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NODE_TYPE_UNASSIGNED);
nodeList->clear();
} }
} }
} }

View file

@ -48,7 +48,7 @@ unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* no
return currentPosition; return currentPosition;
} }
int main(int argc, char* const argv[]) { int main(int argc, const char* argv[]) {
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT); NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT);
// If user asks to run in "local" mode then we do NOT replace the IP // If user asks to run in "local" mode then we do NOT replace the IP
// with the EC2 IP. Otherwise, we will replace the IP like we used to // with the EC2 IP. Otherwise, we will replace the IP like we used to
@ -85,56 +85,49 @@ int main(int argc, char* const argv[]) {
timeval lastStatSendTime = {}; timeval lastStatSendTime = {};
// loop the parameters to see if we were passed a pool for assignment // set our assignment pool from argv, if it exists
int parameter = -1; const char* assignmentPool = getCmdOption(argc, argv, "-p");
const char ALLOWED_PARAMETERS[] = "p::-local::";
const char POOL_PARAMETER_CHAR = 'p';
char* assignmentPool = NULL; // grab the overriden assignment-server hostname from argv, if it exists
nodeList->setAssignmentServerHostname(getCmdOption(argc, argv, "-a"));
while ((parameter = getopt(argc, argv, ALLOWED_PARAMETERS)) != -1) {
if (parameter == POOL_PARAMETER_CHAR) {
// copy the passed assignment pool
int poolLength = strlen(optarg);
assignmentPool = new char[poolLength + sizeof(char)];
strcpy(assignmentPool, optarg);
}
}
// use a map to keep track of iterations of silence for assignment creation requests // use a map to keep track of iterations of silence for assignment creation requests
const int ASSIGNMENT_SILENCE_MAX_ITERATIONS = 5; const long long ASSIGNMENT_SILENCE_MAX_USECS = 5 * 1000 * 1000;
std::map<Assignment*, int> assignmentSilenceCount;
// as a domain-server we will always want an audio mixer and avatar mixer // as a domain-server we will always want an audio mixer and avatar mixer
// setup the create assignments for those // setup the create assignment pointers for those
Assignment audioAssignment(Assignment::Create, Assignment::AudioMixer, assignmentPool); Assignment *audioAssignment = NULL;
Assignment avatarAssignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool); Assignment *avatarAssignment = NULL;
while (true) { while (true) {
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) { if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) {
if (assignmentSilenceCount[&audioAssignment] == ASSIGNMENT_SILENCE_MAX_ITERATIONS) { if (!audioAssignment
nodeList->sendAssignment(audioAssignment); || usecTimestampNow() - usecTimestamp(&audioAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
assignmentSilenceCount[&audioAssignment] = 0;
} else { if (!audioAssignment) {
assignmentSilenceCount[&audioAssignment]++; audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool);
}
nodeList->sendAssignment(*audioAssignment);
audioAssignment->setCreateTimeToNow();
} }
} else {
assignmentSilenceCount[&audioAssignment] = 0;
} }
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) { if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) {
if (assignmentSilenceCount[&avatarAssignment] == ASSIGNMENT_SILENCE_MAX_ITERATIONS) { if (!avatarAssignment
nodeList->sendAssignment(avatarAssignment); || usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
assignmentSilenceCount[&avatarAssignment] = 0; if (!avatarAssignment) {
} else { avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
assignmentSilenceCount[&avatarAssignment]++; }
nodeList->sendAssignment(*avatarAssignment);
// reset the create time on the assignment so re-request is in ASSIGNMENT_SILENCE_MAX_USECS
avatarAssignment->setCreateTimeToNow();
} }
} else {
assignmentSilenceCount[&avatarAssignment] = 0;
} }
if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) && if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
(packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) && (packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) &&
packetVersionMatch(packetData)) { packetVersionMatch(packetData)) {
@ -245,6 +238,9 @@ int main(int argc, char* const argv[]) {
} }
} }
} }
delete audioAssignment;
delete avatarAssignment;
return 0; return 0;
} }

View file

@ -6,8 +6,6 @@
// Copyright (c) 2013 HighFidelity, Inc. All rights reserved. // Copyright (c) 2013 HighFidelity, Inc. All rights reserved.
// //
#include <sys/time.h>
#include "PacketHeaders.h" #include "PacketHeaders.h"
#include "Assignment.h" #include "Assignment.h"
@ -86,6 +84,24 @@ Assignment::~Assignment() {
delete _pool; delete _pool;
} }
void Assignment::setDomainSocket(const sockaddr* domainSocket) {
if (_domainSocket) {
// delete the old _domainSocket if it exists
delete _domainSocket;
_domainSocket = NULL;
}
// create a new sockaddr or sockaddr_in depending on what type of address this is
if (domainSocket->sa_family == AF_INET) {
_domainSocket = (sockaddr*) new sockaddr_in;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in));
} else {
_domainSocket = (sockaddr*) new sockaddr_in6;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in6));
}
}
int Assignment::packToBuffer(unsigned char* buffer) { int Assignment::packToBuffer(unsigned char* buffer) {
int numPackedBytes = 0; int numPackedBytes = 0;
@ -114,24 +130,6 @@ int Assignment::packToBuffer(unsigned char* buffer) {
return numPackedBytes; return numPackedBytes;
} }
void Assignment::setDomainSocket(const sockaddr* domainSocket) {
if (_domainSocket) {
// delete the old _domainSocket if it exists
delete _domainSocket;
_domainSocket = NULL;
}
// create a new sockaddr or sockaddr_in depending on what type of address this is
if (domainSocket->sa_family == AF_INET) {
_domainSocket = (sockaddr*) new sockaddr_in;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in));
} else {
_domainSocket = (sockaddr*) new sockaddr_in6;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in6));
}
}
QDebug operator<<(QDebug debug, const Assignment &assignment) { QDebug operator<<(QDebug debug, const Assignment &assignment) {
debug << "T:" << assignment.getType() << "P:" << assignment.getPool(); debug << "T:" << assignment.getType() << "P:" << assignment.getPool();
return debug.nospace(); return debug.nospace();

View file

@ -9,6 +9,8 @@
#ifndef __hifi__Assignment__ #ifndef __hifi__Assignment__
#define __hifi__Assignment__ #define __hifi__Assignment__
#include <sys/time.h>
#include "NodeList.h" #include "NodeList.h"
/// Holds information used for request, creation, and deployment of assignments /// Holds information used for request, creation, and deployment of assignments
@ -49,6 +51,9 @@ public:
/// \return number of bytes packed into buffer /// \return number of bytes packed into buffer
int packToBuffer(unsigned char* buffer); int packToBuffer(unsigned char* buffer);
/// Sets _time to the current time given by gettimeofday
void setCreateTimeToNow() { gettimeofday(&_time, NULL); }
private: private:
Assignment::Direction _direction; /// the direction of the assignment (Create, Deploy, Request) Assignment::Direction _direction; /// the direction of the assignment (Create, Deploy, Request)
Assignment::Type _type; /// the type of the assignment, defines what the assignee will do Assignment::Type _type; /// the type of the assignment, defines what the assignee will do

View file

@ -255,6 +255,7 @@ void NodeList::clear() {
} }
_numNodes = 0; _numNodes = 0;
_numNoReplyDomainCheckIns = 0;
} }
void NodeList::setNodeTypesOfInterest(const char* nodeTypesOfInterest, int numNodeTypesOfInterest) { void NodeList::setNodeTypesOfInterest(const char* nodeTypesOfInterest, int numNodeTypesOfInterest) {
@ -374,9 +375,7 @@ int NodeList::processDomainServerList(unsigned char* packetData, size_t dataByte
return readNodes; return readNodes;
} }
const char ASSIGNMENT_SERVER_HOSTNAME[] = "assignment.highfidelity.io"; const char GLOBAL_ASSIGNMENT_SERVER_HOSTNAME[] = "assignment.highfidelity.io";
const sockaddr_in assignmentServerSocket = socketForHostnameAndHostOrderPort(ASSIGNMENT_SERVER_HOSTNAME,
ASSIGNMENT_SERVER_PORT);
void NodeList::sendAssignment(Assignment& assignment) { void NodeList::sendAssignment(Assignment& assignment) {
unsigned char assignmentPacket[MAX_PACKET_SIZE]; unsigned char assignmentPacket[MAX_PACKET_SIZE];
@ -387,6 +386,12 @@ void NodeList::sendAssignment(Assignment& assignment) {
int numHeaderBytes = populateTypeAndVersion(assignmentPacket, assignmentPacketType); int numHeaderBytes = populateTypeAndVersion(assignmentPacket, assignmentPacketType);
int numAssignmentBytes = assignment.packToBuffer(assignmentPacket + numHeaderBytes); int numAssignmentBytes = assignment.packToBuffer(assignmentPacket + numHeaderBytes);
// setup the assignmentServerSocket once, use a custom assignmentServerHostname if it is present
static sockaddr_in assignmentServerSocket = socketForHostnameAndHostOrderPort((_assignmentServerHostname != NULL
? (const char*) _assignmentServerHostname
: GLOBAL_ASSIGNMENT_SERVER_HOSTNAME),
ASSIGNMENT_SERVER_PORT);
_nodeSocket.send((sockaddr*) &assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes); _nodeSocket.send((sockaddr*) &assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes);
} }

View file

@ -99,6 +99,7 @@ public:
void sendDomainServerCheckIn(); void sendDomainServerCheckIn();
int processDomainServerList(unsigned char *packetData, size_t dataBytes); int processDomainServerList(unsigned char *packetData, size_t dataBytes);
void setAssignmentServerHostname(const char* serverHostname) { _assignmentServerHostname = serverHostname; }
void sendAssignment(Assignment& assignment); void sendAssignment(Assignment& assignment);
Node* nodeWithAddress(sockaddr *senderAddress); Node* nodeWithAddress(sockaddr *senderAddress);
@ -151,6 +152,7 @@ private:
pthread_t removeSilentNodesThread; pthread_t removeSilentNodesThread;
pthread_t checkInWithDomainServerThread; pthread_t checkInWithDomainServerThread;
int _numNoReplyDomainCheckIns; int _numNoReplyDomainCheckIns;
const char* _assignmentServerHostname;
void handlePingReply(sockaddr *nodeAddress); void handlePingReply(sockaddr *nodeAddress);
void timePingReply(sockaddr *nodeAddress, unsigned char *packetData); void timePingReply(sockaddr *nodeAddress, unsigned char *packetData);