touchups to DS with static assignment file

This commit is contained in:
Stephen Birarda 2013-09-30 13:11:46 -07:00
parent a8d55644e9
commit 0e5c5886b6
4 changed files with 120 additions and 64 deletions

View file

@ -17,6 +17,7 @@ DomainServer* DomainServer::domainServerInstance = NULL;
void DomainServer::signalHandler(int signal) {
domainServerInstance->cleanup();
exit(1);
}
void DomainServer::setDomainServerInstance(DomainServer* domainServer) {
@ -74,7 +75,7 @@ void DomainServer::civetwebUploadHandler(struct mg_connection *connection, const
}
void DomainServer::nodeAdded(Node* node) {
NodeList::getInstance()->increaseNodeID();
// do nothing - ID is incremented in run()
}
void DomainServer::nodeKilled(Node* node) {
@ -82,6 +83,8 @@ void DomainServer::nodeKilled(Node* node) {
if (node->getLinkedData()) {
Assignment* nodeAssignment = (Assignment*) node->getLinkedData();
qDebug() << "Adding assignment" << &nodeAssignment << "back to queue.\n";
// find this assignment in the static file
for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) {
if (_staticAssignments[i].getUUID() == nodeAssignment->getUUID()) {
@ -118,7 +121,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
_assignmentQueue(),
_staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())),
_staticAssignmentFileData(NULL),
_voxelServerConfig(NULL)
_voxelServerConfig(NULL),
_hasCompletedRestartHold(false)
{
DomainServer::setDomainServerInstance(this);
@ -191,30 +195,42 @@ void DomainServer::prepopulateStaticAssignmentFile() {
_staticAssignmentFile.close();
}
int DomainServer::indexForMatchingStaticAssignment(NODE_TYPE nodeType, const uchar* checkInData) {
Assignment* DomainServer::matchingStaticAssignmentForCheckIn(NODE_TYPE nodeType, const uchar* checkInData) {
// pull the UUID passed with the check in
QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) checkInData + numBytesForPacketHeader(checkInData) +
sizeof(NODE_TYPE),
NUM_BYTES_RFC4122_UUID));
int staticAssignmentIndex = 0;
while (staticAssignmentIndex < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS
&& !_staticAssignments[staticAssignmentIndex].getUUID().isNull()) {
Assignment* staticAssignment = &_staticAssignments[staticAssignmentIndex];
if (_hasCompletedRestartHold) {
_assignmentQueueMutex.lock();
if (staticAssignment->getType() == Assignment::typeForNodeType(nodeType)
&& staticAssignment->getUUID() == checkInUUID) {
// return index of match
return staticAssignmentIndex;
} else {
// no match, keep looking
staticAssignmentIndex++;
// iterate the assignment queue to check for a match
std::deque<Assignment*>::iterator assignment = _assignmentQueue.begin();
while (assignment != _assignmentQueue.end()) {
if ((*assignment)->getUUID() == checkInUUID) {
// return the matched assignment
_assignmentQueueMutex.unlock();
return *assignment;
} else {
// no match, push deque iterator forwards
assignment++;
}
}
_assignmentQueueMutex.unlock();
} else {
for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) {
if (_staticAssignments[i].getUUID() == checkInUUID) {
// return matched assignment
return &_staticAssignments[i];
} else if (_staticAssignments[i].getUUID().isNull()) {
// end of static assignments, no match - return NULL
return NULL;
}
}
}
// return -1 for no match
return -1;
return NULL;
}
Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssignment) {
@ -229,6 +245,8 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi
if (requestAssignment.getType() == Assignment::AllTypes ||
(*assignment)->getType() == requestAssignment.getType()) {
Assignment* deployableAssignment = *assignment;
if ((*assignment)->getType() == Assignment::AgentType) {
// if this is a script assignment we need to delete it to avoid a memory leak
// or if there is more than one instance to send out, simpy decrease the number of instances
@ -239,22 +257,21 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi
delete *assignment;
}
} else {
Assignment *sentAssignment = *assignment;
// remove the assignment from the queue
_assignmentQueue.erase(assignment);
if (sentAssignment->getType() != Assignment::VoxelServerType) {
if (deployableAssignment->getType() != Assignment::VoxelServerType) {
// keep audio-mixer and avatar-mixer assignments in the queue
// until we get a check-in from that GUID
// but stick it at the back so the others have a chance to go out
_assignmentQueue.push_back(sentAssignment);
_assignmentQueue.push_back(deployableAssignment);
}
}
// stop looping, we've handed out an assignment
_assignmentQueueMutex.unlock();
return *assignment;
return deployableAssignment;
} else {
// push forward the iterator to check the next assignment
assignment++;
@ -265,6 +282,25 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi
return NULL;
}
void DomainServer::removeAssignmentFromQueue(Assignment* removableAssignment) {
_assignmentQueueMutex.lock();
std::deque<Assignment*>::iterator assignment = _assignmentQueue.begin();
while (assignment != _assignmentQueue.end()) {
if ((*assignment)->getUUID() == removableAssignment->getUUID()) {
_assignmentQueue.erase(assignment);
break;
} else {
// push forward the iterator to check the next assignment
assignment++;
}
}
_assignmentQueueMutex.unlock();
}
void DomainServer::cleanup() {
_staticAssignmentFile.unmap(_staticAssignmentFileData);
_staticAssignmentFile.close();
@ -303,6 +339,9 @@ int DomainServer::run() {
_staticAssignments = (Assignment*)
(_staticAssignmentFileData + sizeof(*_numAssignmentsInStaticFile));
timeval startTime;
gettimeofday(&startTime, NULL);
while (true) {
while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
@ -332,20 +371,29 @@ int DomainServer::run() {
NODE_TYPE_VOXEL_SERVER
};
int matchingStaticAssignmentIndex = -1;
Assignment* matchingStaticAssignment = NULL;
if (memchr(STATICALLY_ASSIGNED_NODES, nodeType, sizeof(STATICALLY_ASSIGNED_NODES)) == NULL ||
(matchingStaticAssignmentIndex = indexForMatchingStaticAssignment(nodeType, packetData)) != -1) {
(matchingStaticAssignment = matchingStaticAssignmentForCheckIn(nodeType, packetData))) {
Node* checkInNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress,
(sockaddr*) &nodeLocalAddress,
nodeType,
nodeList->getLastNodeID());
if (matchingStaticAssignmentIndex != -1) {
// set the linked data for this node to the matching assignment from the static file
if (checkInNode->getNodeID() == nodeList->getLastNodeID() && matchingStaticAssignment) {
// this was a newly added node
NodeList::getInstance()->increaseNodeID();
if (_hasCompletedRestartHold) {
// remove the matching assignment from the assignment queue so we don't take the next check in
removeAssignmentFromQueue(matchingStaticAssignment);
}
// set the linked data for this node to a copy of the matching assignment
// so we can re-queue it should the node die
checkInNode->setLinkedData(&_staticAssignments[matchingStaticAssignmentIndex]);
Assignment* nodeCopyOfMatchingAssignment = new Assignment(*matchingStaticAssignment);
checkInNode->setLinkedData(nodeCopyOfMatchingAssignment);
}
int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN);
@ -434,6 +482,45 @@ int DomainServer::run() {
}
}
}
const long long RESTART_HOLD_TIME_USECS = 5 * 1000 * 1000;
if (!_hasCompletedRestartHold && usecTimestampNow() - usecTimestamp(&startTime) > RESTART_HOLD_TIME_USECS) {
_hasCompletedRestartHold = true;
// pull anything in the static assignment file that isn't spoken for and add to the assignment queue
for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) {
if (_staticAssignments[i].getUUID().isNull()) {
// reached the end of static assignments, bail
break;
}
bool foundMatchingAssignment = false;
// enumerate the nodes and check if there is one with an attached assignment with matching UUID
// if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
Assignment* linkedAssignment = (Assignment*) node->getLinkedData();
if (linkedAssignment->getUUID() == _staticAssignments[i].getUUID()) {
foundMatchingAssignment = true;
break;
}
}
}
if (!foundMatchingAssignment) {
// this assignment has not been fulfilled - reset the UUID and add it to the assignment queue
_staticAssignments[i].resetUUID();
qDebug() << "Adding static assignment to queue -" << _staticAssignments[i] << "\n";
_assignmentQueueMutex.lock();
_assignmentQueue.push_back(&_staticAssignments[i]);
_assignmentQueueMutex.unlock();
}
}
}
}
this->cleanup();

View file

@ -42,8 +42,9 @@ private:
static DomainServer* domainServerInstance;
void prepopulateStaticAssignmentFile();
int indexForMatchingStaticAssignment(NODE_TYPE nodeType, const uchar* checkInUUID);
Assignment* matchingStaticAssignmentForCheckIn(NODE_TYPE nodeType, const uchar* checkInUUID);
Assignment* deployableAssignmentForRequest(Assignment& requestAssignment);
void removeAssignmentFromQueue(Assignment* removableAssignment);
void cleanup();
@ -59,6 +60,8 @@ private:
Assignment* _staticAssignments;
const char* _voxelServerConfig;
bool _hasCompletedRestartHold;
};
#endif /* defined(__hifi__DomainServer__) */

View file

@ -168,36 +168,7 @@ void Assignment::run() {
}
QDebug operator<<(QDebug debug, const Assignment &assignment) {
debug << "T:" << assignment.getType();
debug << "UUID:" << assignment.getUUIDStringWithoutCurlyBraces().toStdString().c_str() <<
", Type:" << assignment.getType();
return debug.nospace();
}
QDataStream& operator<<(QDataStream& out, const Assignment& assignment) {
out << assignment._uuid;
out << (uchar) assignment._command;
out << (uchar) assignment._type;
out << (uchar) assignment._location;
out << assignment._numberOfInstances;
out << assignment._numPayloadBytes;
if (assignment._numPayloadBytes > 0) {
out.writeBytes((char*) assignment._payload, assignment._numPayloadBytes);
}
return out;
}
QDataStream& operator>>(QDataStream& in, Assignment& assignment) {
in >> assignment._uuid;
in >> (uchar&) assignment._command;
in >> (uchar&) assignment._type;
in >> (uchar&) assignment._location;
in >> assignment._numberOfInstances;
in >> assignment._numPayloadBytes;
if (assignment._numPayloadBytes > 0) {
in.readRawData((char*) assignment._payload, assignment._numPayloadBytes);
}
return in;
}

View file

@ -439,12 +439,7 @@ Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, c
}
}
if (node == end()) {
// if we already had this node AND it's a solo type then bust out of here
if (soloNodeOfType(nodeType)) {
return NULL;
}
if (node == end()) {
// we didn't have this node, so add them
Node* newNode = new Node(publicSocket, localSocket, nodeType, nodeId);