mirror of
https://github.com/overte-org/overte.git
synced 2025-04-08 08:14:48 +02:00
Merge pull request #910 from birarda/assignment
more feature additions to assignment
This commit is contained in:
commit
218ac4641b
6 changed files with 146 additions and 108 deletions
|
@ -16,73 +16,98 @@
|
|||
#include <PacketHeaders.h>
|
||||
#include <SharedUtil.h>
|
||||
|
||||
const int ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000;
|
||||
const long long ASSIGNMENT_REQUEST_INTERVAL_USECS = 1 * 1000 * 1000;
|
||||
|
||||
int main(int argc, char* const argv[]) {
|
||||
|
||||
int main(int argc, const char* argv[]) {
|
||||
|
||||
setvbuf(stdout, NULL, _IOLBF, 0);
|
||||
|
||||
sockaddr_in customAssignmentSocket = {};
|
||||
|
||||
// grab the overriden assignment-server hostname from argv, if it exists
|
||||
const char* customAssignmentServer = getCmdOption(argc, argv, "-a");
|
||||
if (customAssignmentServer) {
|
||||
customAssignmentSocket = socketForHostnameAndHostOrderPort(customAssignmentServer, ASSIGNMENT_SERVER_PORT);
|
||||
}
|
||||
|
||||
const char* NUM_FORKS_PARAMETER = "-n";
|
||||
const char* numForksIncludingParentString = getCmdOption(argc, argv, NUM_FORKS_PARAMETER);
|
||||
|
||||
if (numForksIncludingParentString) {
|
||||
int numForksIncludingParent = atoi(numForksIncludingParentString);
|
||||
qDebug() << "Starting" << numForksIncludingParent << "assignment clients.";
|
||||
|
||||
int processID = 0;
|
||||
|
||||
// fire off as many children as we need (this is one less than the parent since the parent will run as well)
|
||||
for (int i = 0; i < numForksIncludingParent - 1; i++) {
|
||||
processID = fork();
|
||||
|
||||
if (processID == 0) {
|
||||
// this is one of the children, break so we don't start a fork bomb
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create a NodeList as an unassigned client
|
||||
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_UNASSIGNED);
|
||||
|
||||
// set the custom assignment socket if we have it
|
||||
if (customAssignmentSocket.sin_addr.s_addr != 0) {
|
||||
nodeList->setAssignmentServerSocket((sockaddr*) &customAssignmentSocket);
|
||||
}
|
||||
|
||||
// change the timeout on the nodelist socket to be as often as we want to re-request
|
||||
nodeList->getNodeSocket()->setBlockingReceiveTimeoutInUsecs(ASSIGNMENT_REQUEST_INTERVAL_USECS);
|
||||
|
||||
timeval lastRequest = {};
|
||||
|
||||
unsigned char packetData[MAX_PACKET_SIZE];
|
||||
ssize_t receivedBytes = 0;
|
||||
|
||||
// loop the parameters to see if we were passed a pool
|
||||
int parameter = -1;
|
||||
const char ALLOWED_PARAMETERS[] = "p::";
|
||||
const char POOL_PARAMETER_CHAR = 'p';
|
||||
|
||||
char* assignmentPool = NULL;
|
||||
|
||||
while ((parameter = getopt(argc, argv, ALLOWED_PARAMETERS)) != -1) {
|
||||
if (parameter == POOL_PARAMETER_CHAR) {
|
||||
// copy the passed assignment pool
|
||||
int poolLength = strlen(optarg);
|
||||
assignmentPool = new char[poolLength + sizeof(char)];
|
||||
strcpy(assignmentPool, optarg);
|
||||
}
|
||||
}
|
||||
|
||||
// grab the assignment pool from argv, if it was passed
|
||||
const char* ASSIGNMENT_POOL_PARAMETER = "-p";
|
||||
const char* assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_PARAMETER);
|
||||
|
||||
// create a request assignment, accept all assignments, pass the desired pool (if it exists)
|
||||
Assignment requestAssignment(Assignment::Request, Assignment::All, assignmentPool);
|
||||
|
||||
while (true) {
|
||||
// if we're here we have no assignment, so send a request
|
||||
qDebug() << "Sending an assignment request -" << requestAssignment;
|
||||
nodeList->sendAssignment(requestAssignment);
|
||||
if (usecTimestampNow() - usecTimestamp(&lastRequest) >= ASSIGNMENT_REQUEST_INTERVAL_USECS) {
|
||||
gettimeofday(&lastRequest, NULL);
|
||||
// if we're here we have no assignment, so send a request
|
||||
nodeList->sendAssignment(requestAssignment);
|
||||
}
|
||||
|
||||
while (nodeList->getNodeSocket()->receive(packetData, &receivedBytes)) {
|
||||
if (packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) {
|
||||
if (nodeList->getNodeSocket()->receive(packetData, &receivedBytes) &&
|
||||
packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) {
|
||||
|
||||
// construct the deployed assignment from the packet data
|
||||
Assignment deployedAssignment(packetData, receivedBytes);
|
||||
|
||||
qDebug() << "Received an assignment - " << deployedAssignment << "\n";
|
||||
|
||||
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
|
||||
if (deployedAssignment.getDomainSocket()->sa_family == AF_INET) {
|
||||
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getDomainSocket())->sin_addr;
|
||||
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
|
||||
|
||||
// construct the deployed assignment from the packet data
|
||||
Assignment deployedAssignment(packetData, receivedBytes);
|
||||
|
||||
qDebug() << "Received an assignment - " << deployedAssignment << "\n";
|
||||
|
||||
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
|
||||
if (deployedAssignment.getDomainSocket()->sa_family == AF_INET) {
|
||||
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getDomainSocket())->sin_addr;
|
||||
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
|
||||
|
||||
qDebug() << "Changed domain IP to " << inet_ntoa(domainSocketAddr);
|
||||
}
|
||||
|
||||
if (deployedAssignment.getType() == Assignment::AudioMixer) {
|
||||
AudioMixer::run();
|
||||
} else {
|
||||
AvatarMixer::run();
|
||||
}
|
||||
|
||||
qDebug() << "Assignment finished or never started - waiting for new assignment";
|
||||
|
||||
// reset our NodeList by switching back to unassigned and clearing the list
|
||||
nodeList->setOwnerType(NODE_TYPE_UNASSIGNED);
|
||||
nodeList->clear();
|
||||
qDebug() << "Changed domain IP to " << inet_ntoa(domainSocketAddr);
|
||||
}
|
||||
|
||||
if (deployedAssignment.getType() == Assignment::AudioMixer) {
|
||||
AudioMixer::run();
|
||||
} else {
|
||||
AvatarMixer::run();
|
||||
}
|
||||
|
||||
qDebug() << "Assignment finished or never started - waiting for new assignment";
|
||||
|
||||
// reset our NodeList by switching back to unassigned and clearing the list
|
||||
nodeList->setOwnerType(NODE_TYPE_UNASSIGNED);
|
||||
nodeList->clear();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -48,7 +48,7 @@ unsigned char* addNodeToBroadcastPacket(unsigned char* currentPosition, Node* no
|
|||
return currentPosition;
|
||||
}
|
||||
|
||||
int main(int argc, char* const argv[]) {
|
||||
int main(int argc, const char* argv[]) {
|
||||
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_DOMAIN, DOMAIN_LISTEN_PORT);
|
||||
// If user asks to run in "local" mode then we do NOT replace the IP
|
||||
// with the EC2 IP. Otherwise, we will replace the IP like we used to
|
||||
|
@ -85,56 +85,56 @@ int main(int argc, char* const argv[]) {
|
|||
|
||||
timeval lastStatSendTime = {};
|
||||
|
||||
// loop the parameters to see if we were passed a pool for assignment
|
||||
int parameter = -1;
|
||||
const char ALLOWED_PARAMETERS[] = "p::-local::";
|
||||
const char POOL_PARAMETER_CHAR = 'p';
|
||||
const char ASSIGNMENT_POOL_OPTION[] = "-p";
|
||||
const char ASSIGNMENT_SERVER_OPTION[] = "-a";
|
||||
|
||||
char* assignmentPool = NULL;
|
||||
// set our assignment pool from argv, if it exists
|
||||
const char* assignmentPool = getCmdOption(argc, argv, ASSIGNMENT_POOL_OPTION);
|
||||
|
||||
while ((parameter = getopt(argc, argv, ALLOWED_PARAMETERS)) != -1) {
|
||||
if (parameter == POOL_PARAMETER_CHAR) {
|
||||
// copy the passed assignment pool
|
||||
int poolLength = strlen(optarg);
|
||||
assignmentPool = new char[poolLength + sizeof(char)];
|
||||
strcpy(assignmentPool, optarg);
|
||||
}
|
||||
// grab the overriden assignment-server hostname from argv, if it exists
|
||||
const char* customAssignmentServer = getCmdOption(argc, argv, ASSIGNMENT_SERVER_OPTION);
|
||||
if (customAssignmentServer) {
|
||||
sockaddr_in customAssignmentSocket = socketForHostnameAndHostOrderPort(customAssignmentServer, ASSIGNMENT_SERVER_PORT);
|
||||
nodeList->setAssignmentServerSocket((sockaddr*) &customAssignmentSocket);
|
||||
}
|
||||
|
||||
// use a map to keep track of iterations of silence for assignment creation requests
|
||||
const int ASSIGNMENT_SILENCE_MAX_ITERATIONS = 5;
|
||||
std::map<Assignment*, int> assignmentSilenceCount;
|
||||
const long long ASSIGNMENT_SILENCE_MAX_USECS = 5 * 1000 * 1000;
|
||||
|
||||
// as a domain-server we will always want an audio mixer and avatar mixer
|
||||
// setup the create assignments for those
|
||||
Assignment audioAssignment(Assignment::Create, Assignment::AudioMixer, assignmentPool);
|
||||
Assignment avatarAssignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
|
||||
// setup the create assignment pointers for those
|
||||
Assignment* audioAssignment = NULL;
|
||||
Assignment* avatarAssignment = NULL;
|
||||
|
||||
while (true) {
|
||||
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) {
|
||||
if (assignmentSilenceCount[&audioAssignment] == ASSIGNMENT_SILENCE_MAX_ITERATIONS) {
|
||||
nodeList->sendAssignment(audioAssignment);
|
||||
assignmentSilenceCount[&audioAssignment] = 0;
|
||||
} else {
|
||||
assignmentSilenceCount[&audioAssignment]++;
|
||||
if (!audioAssignment
|
||||
|| usecTimestampNow() - usecTimestamp(&audioAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
|
||||
|
||||
if (!audioAssignment) {
|
||||
audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool);
|
||||
}
|
||||
|
||||
nodeList->sendAssignment(*audioAssignment);
|
||||
audioAssignment->setCreateTimeToNow();
|
||||
}
|
||||
} else {
|
||||
assignmentSilenceCount[&audioAssignment] = 0;
|
||||
}
|
||||
|
||||
if (!nodeList->soloNodeOfType(NODE_TYPE_AVATAR_MIXER)) {
|
||||
if (assignmentSilenceCount[&avatarAssignment] == ASSIGNMENT_SILENCE_MAX_ITERATIONS) {
|
||||
nodeList->sendAssignment(avatarAssignment);
|
||||
assignmentSilenceCount[&avatarAssignment] = 0;
|
||||
} else {
|
||||
assignmentSilenceCount[&avatarAssignment]++;
|
||||
if (!avatarAssignment
|
||||
|| usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
|
||||
if (!avatarAssignment) {
|
||||
avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
|
||||
}
|
||||
|
||||
nodeList->sendAssignment(*avatarAssignment);
|
||||
|
||||
// reset the create time on the assignment so re-request is in ASSIGNMENT_SILENCE_MAX_USECS
|
||||
avatarAssignment->setCreateTimeToNow();
|
||||
}
|
||||
} else {
|
||||
assignmentSilenceCount[&avatarAssignment] = 0;
|
||||
|
||||
}
|
||||
|
||||
|
||||
if (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
|
||||
(packetData[0] == PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY || packetData[0] == PACKET_TYPE_DOMAIN_LIST_REQUEST) &&
|
||||
packetVersionMatch(packetData)) {
|
||||
|
@ -245,6 +245,9 @@ int main(int argc, char* const argv[]) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
delete audioAssignment;
|
||||
delete avatarAssignment;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -6,8 +6,6 @@
|
|||
// Copyright (c) 2013 HighFidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "PacketHeaders.h"
|
||||
|
||||
#include "Assignment.h"
|
||||
|
@ -86,6 +84,24 @@ Assignment::~Assignment() {
|
|||
delete _pool;
|
||||
}
|
||||
|
||||
void Assignment::setDomainSocket(const sockaddr* domainSocket) {
|
||||
|
||||
if (_domainSocket) {
|
||||
// delete the old _domainSocket if it exists
|
||||
delete _domainSocket;
|
||||
_domainSocket = NULL;
|
||||
}
|
||||
|
||||
// create a new sockaddr or sockaddr_in depending on what type of address this is
|
||||
if (domainSocket->sa_family == AF_INET) {
|
||||
_domainSocket = (sockaddr*) new sockaddr_in;
|
||||
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in));
|
||||
} else {
|
||||
_domainSocket = (sockaddr*) new sockaddr_in6;
|
||||
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in6));
|
||||
}
|
||||
}
|
||||
|
||||
int Assignment::packToBuffer(unsigned char* buffer) {
|
||||
int numPackedBytes = 0;
|
||||
|
||||
|
@ -114,24 +130,6 @@ int Assignment::packToBuffer(unsigned char* buffer) {
|
|||
return numPackedBytes;
|
||||
}
|
||||
|
||||
void Assignment::setDomainSocket(const sockaddr* domainSocket) {
|
||||
|
||||
if (_domainSocket) {
|
||||
// delete the old _domainSocket if it exists
|
||||
delete _domainSocket;
|
||||
_domainSocket = NULL;
|
||||
}
|
||||
|
||||
// create a new sockaddr or sockaddr_in depending on what type of address this is
|
||||
if (domainSocket->sa_family == AF_INET) {
|
||||
_domainSocket = (sockaddr*) new sockaddr_in;
|
||||
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in));
|
||||
} else {
|
||||
_domainSocket = (sockaddr*) new sockaddr_in6;
|
||||
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in6));
|
||||
}
|
||||
}
|
||||
|
||||
QDebug operator<<(QDebug debug, const Assignment &assignment) {
|
||||
debug << "T:" << assignment.getType() << "P:" << assignment.getPool();
|
||||
return debug.nospace();
|
||||
|
|
|
@ -9,6 +9,8 @@
|
|||
#ifndef __hifi__Assignment__
|
||||
#define __hifi__Assignment__
|
||||
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "NodeList.h"
|
||||
|
||||
/// Holds information used for request, creation, and deployment of assignments
|
||||
|
@ -49,6 +51,9 @@ public:
|
|||
/// \return number of bytes packed into buffer
|
||||
int packToBuffer(unsigned char* buffer);
|
||||
|
||||
/// Sets _time to the current time given by gettimeofday
|
||||
void setCreateTimeToNow() { gettimeofday(&_time, NULL); }
|
||||
|
||||
private:
|
||||
Assignment::Direction _direction; /// the direction of the assignment (Create, Deploy, Request)
|
||||
Assignment::Type _type; /// the type of the assignment, defines what the assignee will do
|
||||
|
|
|
@ -65,7 +65,8 @@ NodeList::NodeList(char newOwnerType, unsigned short int newSocketListenPort) :
|
|||
_nodeTypesOfInterest(NULL),
|
||||
_ownerID(UNKNOWN_NODE_ID),
|
||||
_lastNodeID(UNKNOWN_NODE_ID + 1),
|
||||
_numNoReplyDomainCheckIns(0)
|
||||
_numNoReplyDomainCheckIns(0),
|
||||
_assignmentServerSocket(NULL)
|
||||
{
|
||||
memcpy(_domainHostname, DEFAULT_DOMAIN_HOSTNAME, sizeof(DEFAULT_DOMAIN_HOSTNAME));
|
||||
memcpy(_domainIP, DEFAULT_DOMAIN_IP, sizeof(DEFAULT_DOMAIN_IP));
|
||||
|
@ -255,6 +256,7 @@ void NodeList::clear() {
|
|||
}
|
||||
|
||||
_numNodes = 0;
|
||||
_numNoReplyDomainCheckIns = 0;
|
||||
}
|
||||
|
||||
void NodeList::setNodeTypesOfInterest(const char* nodeTypesOfInterest, int numNodeTypesOfInterest) {
|
||||
|
@ -374,10 +376,9 @@ int NodeList::processDomainServerList(unsigned char* packetData, size_t dataByte
|
|||
return readNodes;
|
||||
}
|
||||
|
||||
const char ASSIGNMENT_SERVER_HOSTNAME[] = "assignment.highfidelity.io";
|
||||
const sockaddr_in assignmentServerSocket = socketForHostnameAndHostOrderPort(ASSIGNMENT_SERVER_HOSTNAME,
|
||||
ASSIGNMENT_SERVER_PORT);
|
||||
|
||||
const char GLOBAL_ASSIGNMENT_SERVER_HOSTNAME[] = "assignment.highfidelity.io";
|
||||
const sockaddr_in GLOBAL_ASSIGNMENT_SOCKET = socketForHostnameAndHostOrderPort(GLOBAL_ASSIGNMENT_SERVER_HOSTNAME,
|
||||
ASSIGNMENT_SERVER_PORT);
|
||||
void NodeList::sendAssignment(Assignment& assignment) {
|
||||
unsigned char assignmentPacket[MAX_PACKET_SIZE];
|
||||
|
||||
|
@ -387,8 +388,12 @@ void NodeList::sendAssignment(Assignment& assignment) {
|
|||
|
||||
int numHeaderBytes = populateTypeAndVersion(assignmentPacket, assignmentPacketType);
|
||||
int numAssignmentBytes = assignment.packToBuffer(assignmentPacket + numHeaderBytes);
|
||||
|
||||
_nodeSocket.send((sockaddr*) &assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes);
|
||||
|
||||
sockaddr* assignmentServerSocket = (_assignmentServerSocket == NULL)
|
||||
? (sockaddr*) &GLOBAL_ASSIGNMENT_SOCKET
|
||||
: _assignmentServerSocket;
|
||||
|
||||
_nodeSocket.send((sockaddr*) assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes);
|
||||
}
|
||||
|
||||
Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, char nodeType, uint16_t nodeId) {
|
||||
|
|
|
@ -99,6 +99,7 @@ public:
|
|||
void sendDomainServerCheckIn();
|
||||
int processDomainServerList(unsigned char *packetData, size_t dataBytes);
|
||||
|
||||
void setAssignmentServerSocket(sockaddr* serverSocket) { _assignmentServerSocket = serverSocket; }
|
||||
void sendAssignment(Assignment& assignment);
|
||||
|
||||
Node* nodeWithAddress(sockaddr *senderAddress);
|
||||
|
@ -151,6 +152,7 @@ private:
|
|||
pthread_t removeSilentNodesThread;
|
||||
pthread_t checkInWithDomainServerThread;
|
||||
int _numNoReplyDomainCheckIns;
|
||||
sockaddr* _assignmentServerSocket;
|
||||
|
||||
void handlePingReply(sockaddr *nodeAddress);
|
||||
void timePingReply(sockaddr *nodeAddress, unsigned char *packetData);
|
||||
|
|
Loading…
Reference in a new issue