From 0e5c5886b6d11f237f61b1783b7779289a1526ac Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 30 Sep 2013 13:11:46 -0700 Subject: [PATCH] touchups to DS with static assignment file --- domain-server/src/DomainServer.cpp | 139 ++++++++++++++++++++++------ domain-server/src/DomainServer.h | 5 +- libraries/shared/src/Assignment.cpp | 33 +------ libraries/shared/src/NodeList.cpp | 7 +- 4 files changed, 120 insertions(+), 64 deletions(-) diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index dc325e579b..371644e3b8 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -17,6 +17,7 @@ DomainServer* DomainServer::domainServerInstance = NULL; void DomainServer::signalHandler(int signal) { domainServerInstance->cleanup(); + exit(1); } void DomainServer::setDomainServerInstance(DomainServer* domainServer) { @@ -74,7 +75,7 @@ void DomainServer::civetwebUploadHandler(struct mg_connection *connection, const } void DomainServer::nodeAdded(Node* node) { - NodeList::getInstance()->increaseNodeID(); + // do nothing - ID is incremented in run() } void DomainServer::nodeKilled(Node* node) { @@ -82,6 +83,8 @@ void DomainServer::nodeKilled(Node* node) { if (node->getLinkedData()) { Assignment* nodeAssignment = (Assignment*) node->getLinkedData(); + qDebug() << "Adding assignment" << &nodeAssignment << "back to queue.\n"; + // find this assignment in the static file for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) { if (_staticAssignments[i].getUUID() == nodeAssignment->getUUID()) { @@ -118,7 +121,8 @@ DomainServer::DomainServer(int argc, char* argv[]) : _assignmentQueue(), _staticAssignmentFile(QString("%1/config.ds").arg(QCoreApplication::applicationDirPath())), _staticAssignmentFileData(NULL), - _voxelServerConfig(NULL) + _voxelServerConfig(NULL), + _hasCompletedRestartHold(false) { DomainServer::setDomainServerInstance(this); @@ -191,30 +195,42 @@ void DomainServer::prepopulateStaticAssignmentFile() { _staticAssignmentFile.close(); } -int DomainServer::indexForMatchingStaticAssignment(NODE_TYPE nodeType, const uchar* checkInData) { +Assignment* DomainServer::matchingStaticAssignmentForCheckIn(NODE_TYPE nodeType, const uchar* checkInData) { // pull the UUID passed with the check in QUuid checkInUUID = QUuid::fromRfc4122(QByteArray((const char*) checkInData + numBytesForPacketHeader(checkInData) + sizeof(NODE_TYPE), NUM_BYTES_RFC4122_UUID)); - int staticAssignmentIndex = 0; - while (staticAssignmentIndex < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS - && !_staticAssignments[staticAssignmentIndex].getUUID().isNull()) { - Assignment* staticAssignment = &_staticAssignments[staticAssignmentIndex]; + if (_hasCompletedRestartHold) { + _assignmentQueueMutex.lock(); - if (staticAssignment->getType() == Assignment::typeForNodeType(nodeType) - && staticAssignment->getUUID() == checkInUUID) { - // return index of match - return staticAssignmentIndex; - } else { - // no match, keep looking - staticAssignmentIndex++; + // iterate the assignment queue to check for a match + std::deque::iterator assignment = _assignmentQueue.begin(); + while (assignment != _assignmentQueue.end()) { + if ((*assignment)->getUUID() == checkInUUID) { + // return the matched assignment + _assignmentQueueMutex.unlock(); + return *assignment; + } else { + // no match, push deque iterator forwards + assignment++; + } + } + + _assignmentQueueMutex.unlock(); + } else { + for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) { + if (_staticAssignments[i].getUUID() == checkInUUID) { + // return matched assignment + return &_staticAssignments[i]; + } else if (_staticAssignments[i].getUUID().isNull()) { + // end of static assignments, no match - return NULL + return NULL; + } } } - // return -1 for no match - return -1; - + return NULL; } Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssignment) { @@ -229,6 +245,8 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi if (requestAssignment.getType() == Assignment::AllTypes || (*assignment)->getType() == requestAssignment.getType()) { + Assignment* deployableAssignment = *assignment; + if ((*assignment)->getType() == Assignment::AgentType) { // if this is a script assignment we need to delete it to avoid a memory leak // or if there is more than one instance to send out, simpy decrease the number of instances @@ -239,22 +257,21 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi delete *assignment; } } else { - Assignment *sentAssignment = *assignment; // remove the assignment from the queue _assignmentQueue.erase(assignment); - if (sentAssignment->getType() != Assignment::VoxelServerType) { + if (deployableAssignment->getType() != Assignment::VoxelServerType) { // keep audio-mixer and avatar-mixer assignments in the queue // until we get a check-in from that GUID // but stick it at the back so the others have a chance to go out - _assignmentQueue.push_back(sentAssignment); + _assignmentQueue.push_back(deployableAssignment); } } // stop looping, we've handed out an assignment _assignmentQueueMutex.unlock(); - return *assignment; + return deployableAssignment; } else { // push forward the iterator to check the next assignment assignment++; @@ -265,6 +282,25 @@ Assignment* DomainServer::deployableAssignmentForRequest(Assignment& requestAssi return NULL; } +void DomainServer::removeAssignmentFromQueue(Assignment* removableAssignment) { + + _assignmentQueueMutex.lock(); + + std::deque::iterator assignment = _assignmentQueue.begin(); + + while (assignment != _assignmentQueue.end()) { + if ((*assignment)->getUUID() == removableAssignment->getUUID()) { + _assignmentQueue.erase(assignment); + break; + } else { + // push forward the iterator to check the next assignment + assignment++; + } + } + + _assignmentQueueMutex.unlock(); +} + void DomainServer::cleanup() { _staticAssignmentFile.unmap(_staticAssignmentFileData); _staticAssignmentFile.close(); @@ -303,6 +339,9 @@ int DomainServer::run() { _staticAssignments = (Assignment*) (_staticAssignmentFileData + sizeof(*_numAssignmentsInStaticFile)); + timeval startTime; + gettimeofday(&startTime, NULL); + while (true) { while (nodeList->getNodeSocket()->receive((sockaddr *)&nodePublicAddress, packetData, &receivedBytes) && packetVersionMatch(packetData)) { @@ -332,20 +371,29 @@ int DomainServer::run() { NODE_TYPE_VOXEL_SERVER }; - int matchingStaticAssignmentIndex = -1; + Assignment* matchingStaticAssignment = NULL; if (memchr(STATICALLY_ASSIGNED_NODES, nodeType, sizeof(STATICALLY_ASSIGNED_NODES)) == NULL || - (matchingStaticAssignmentIndex = indexForMatchingStaticAssignment(nodeType, packetData)) != -1) { + (matchingStaticAssignment = matchingStaticAssignmentForCheckIn(nodeType, packetData))) { Node* checkInNode = nodeList->addOrUpdateNode((sockaddr*) &nodePublicAddress, (sockaddr*) &nodeLocalAddress, nodeType, nodeList->getLastNodeID()); - if (matchingStaticAssignmentIndex != -1) { - // set the linked data for this node to the matching assignment from the static file + if (checkInNode->getNodeID() == nodeList->getLastNodeID() && matchingStaticAssignment) { + // this was a newly added node + NodeList::getInstance()->increaseNodeID(); + + if (_hasCompletedRestartHold) { + // remove the matching assignment from the assignment queue so we don't take the next check in + removeAssignmentFromQueue(matchingStaticAssignment); + } + + // set the linked data for this node to a copy of the matching assignment // so we can re-queue it should the node die - checkInNode->setLinkedData(&_staticAssignments[matchingStaticAssignmentIndex]); + Assignment* nodeCopyOfMatchingAssignment = new Assignment(*matchingStaticAssignment); + checkInNode->setLinkedData(nodeCopyOfMatchingAssignment); } int numHeaderBytes = populateTypeAndVersion(broadcastPacket, PACKET_TYPE_DOMAIN); @@ -434,6 +482,45 @@ int DomainServer::run() { } } } + + const long long RESTART_HOLD_TIME_USECS = 5 * 1000 * 1000; + + if (!_hasCompletedRestartHold && usecTimestampNow() - usecTimestamp(&startTime) > RESTART_HOLD_TIME_USECS) { + _hasCompletedRestartHold = true; + + // pull anything in the static assignment file that isn't spoken for and add to the assignment queue + for (int i = 0; i < MAX_STATIC_ASSIGNMENT_FILE_ASSIGNMENTS; i++) { + if (_staticAssignments[i].getUUID().isNull()) { + // reached the end of static assignments, bail + break; + } + + bool foundMatchingAssignment = false; + + // enumerate the nodes and check if there is one with an attached assignment with matching UUID + // if the node has sent no types of interest, assume they want nothing but their own ID back + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + if (node->getLinkedData()) { + Assignment* linkedAssignment = (Assignment*) node->getLinkedData(); + if (linkedAssignment->getUUID() == _staticAssignments[i].getUUID()) { + foundMatchingAssignment = true; + break; + } + } + } + + if (!foundMatchingAssignment) { + // this assignment has not been fulfilled - reset the UUID and add it to the assignment queue + _staticAssignments[i].resetUUID(); + + qDebug() << "Adding static assignment to queue -" << _staticAssignments[i] << "\n"; + + _assignmentQueueMutex.lock(); + _assignmentQueue.push_back(&_staticAssignments[i]); + _assignmentQueueMutex.unlock(); + } + } + } } this->cleanup(); diff --git a/domain-server/src/DomainServer.h b/domain-server/src/DomainServer.h index 0a366e12e8..e5166b2580 100644 --- a/domain-server/src/DomainServer.h +++ b/domain-server/src/DomainServer.h @@ -42,8 +42,9 @@ private: static DomainServer* domainServerInstance; void prepopulateStaticAssignmentFile(); - int indexForMatchingStaticAssignment(NODE_TYPE nodeType, const uchar* checkInUUID); + Assignment* matchingStaticAssignmentForCheckIn(NODE_TYPE nodeType, const uchar* checkInUUID); Assignment* deployableAssignmentForRequest(Assignment& requestAssignment); + void removeAssignmentFromQueue(Assignment* removableAssignment); void cleanup(); @@ -59,6 +60,8 @@ private: Assignment* _staticAssignments; const char* _voxelServerConfig; + + bool _hasCompletedRestartHold; }; #endif /* defined(__hifi__DomainServer__) */ diff --git a/libraries/shared/src/Assignment.cpp b/libraries/shared/src/Assignment.cpp index 8aa678873d..9bd2e12b46 100644 --- a/libraries/shared/src/Assignment.cpp +++ b/libraries/shared/src/Assignment.cpp @@ -168,36 +168,7 @@ void Assignment::run() { } QDebug operator<<(QDebug debug, const Assignment &assignment) { - debug << "T:" << assignment.getType(); + debug << "UUID:" << assignment.getUUIDStringWithoutCurlyBraces().toStdString().c_str() << + ", Type:" << assignment.getType(); return debug.nospace(); } - -QDataStream& operator<<(QDataStream& out, const Assignment& assignment) { - out << assignment._uuid; - out << (uchar) assignment._command; - out << (uchar) assignment._type; - out << (uchar) assignment._location; - out << assignment._numberOfInstances; - out << assignment._numPayloadBytes; - - if (assignment._numPayloadBytes > 0) { - out.writeBytes((char*) assignment._payload, assignment._numPayloadBytes); - } - - return out; -} - -QDataStream& operator>>(QDataStream& in, Assignment& assignment) { - in >> assignment._uuid; - in >> (uchar&) assignment._command; - in >> (uchar&) assignment._type; - in >> (uchar&) assignment._location; - in >> assignment._numberOfInstances; - in >> assignment._numPayloadBytes; - - if (assignment._numPayloadBytes > 0) { - in.readRawData((char*) assignment._payload, assignment._numPayloadBytes); - } - - return in; -} diff --git a/libraries/shared/src/NodeList.cpp b/libraries/shared/src/NodeList.cpp index 4125c9352a..4307810db3 100644 --- a/libraries/shared/src/NodeList.cpp +++ b/libraries/shared/src/NodeList.cpp @@ -439,12 +439,7 @@ Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, c } } - if (node == end()) { - // if we already had this node AND it's a solo type then bust out of here - if (soloNodeOfType(nodeType)) { - return NULL; - } - + if (node == end()) { // we didn't have this node, so add them Node* newNode = new Node(publicSocket, localSocket, nodeType, nodeId);