From d8611e408ef9304a9bf38fffa6288fa4244378f4 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Fri, 6 Sep 2013 10:59:20 -0700 Subject: [PATCH] clear stale assignments from queue, fix CPU usage in client --- assignment-client/src/main.cpp | 14 +++++--------- assignment-server/src/main.cpp | 12 +++++++++++- libraries/shared/src/Assignment.cpp | 8 ++++++++ libraries/shared/src/Assignment.h | 4 +++- libraries/shared/src/SharedUtil.cpp | 2 +- libraries/shared/src/SharedUtil.h | 2 +- libraries/shared/src/UDPSocket.cpp | 12 +++++++----- libraries/shared/src/UDPSocket.h | 5 +++++ 8 files changed, 41 insertions(+), 18 deletions(-) diff --git a/assignment-client/src/main.cpp b/assignment-client/src/main.cpp index 98808a03a5..f3d6b8c0cd 100644 --- a/assignment-client/src/main.cpp +++ b/assignment-client/src/main.cpp @@ -24,9 +24,9 @@ int main(int argc, char* const argv[]) { // create a NodeList as an unassigned client NodeList* nodeList = NodeList::createInstance(NODE_TYPE_UNASSIGNED); - nodeList->getNodeSocket()->setBlocking(false); - timeval lastRequest = {}; + // change the timeout on the nodelist socket to be as often as we want to re-request + nodeList->getNodeSocket()->setBlockingReceiveTimeoutInUsecs(ASSIGNMENT_REQUEST_INTERVAL_USECS); unsigned char packetData[MAX_PACKET_SIZE]; ssize_t receivedBytes = 0; @@ -51,13 +51,9 @@ int main(int argc, char* const argv[]) { Assignment requestAssignment(Assignment::Request, Assignment::All, assignmentPool); while (true) { - if (usecTimestampNow() - usecTimestamp(&lastRequest) >= ASSIGNMENT_REQUEST_INTERVAL_USECS) { - gettimeofday(&lastRequest, NULL); - - // send an assignment request to the Nodelist - qDebug("Sending assignment request.\n"); - nodeList->sendAssignment(requestAssignment); - } + // if we're here we have no assignment, so send a request + qDebug() << "Sending an assignment request -" << requestAssignment; + nodeList->sendAssignment(requestAssignment); while (nodeList->getNodeSocket()->receive(packetData, &receivedBytes)) { if (packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) { diff --git a/assignment-server/src/main.cpp b/assignment-server/src/main.cpp index 0daba0938f..dbbd6b2607 100644 --- a/assignment-server/src/main.cpp +++ b/assignment-server/src/main.cpp @@ -18,6 +18,7 @@ #include const int MAX_PACKET_SIZE_BYTES = 1400; +const long long NUM_DEFAULT_ASSIGNMENT_STALENESS_USECS = 10 * 1000 * 1000; int main(int argc, const char* argv[]) { @@ -49,6 +50,15 @@ int main(int argc, const char* argv[]) { // enumerate assignments until we find one to give this client (if possible) while (assignment != assignmentQueue.end()) { + // if this assignment is stale then get rid of it and check the next one + if (usecTimestampNow() - usecTimestamp(&((*assignment)->getTime())) + >= NUM_DEFAULT_ASSIGNMENT_STALENESS_USECS) { + delete *assignment; + assignment = assignmentQueue.erase(assignment); + + continue; + } + bool eitherHasPool = ((*assignment)->getPool() || requestAssignment.getPool()); bool bothHavePool = ((*assignment)->getPool() && requestAssignment.getPool()); @@ -83,7 +93,7 @@ int main(int argc, const char* argv[]) { // construct the create assignment from the packet data Assignment* createdAssignment = new Assignment(senderData, receivedBytes); - qDebug() << "Received a created assignment:" << createdAssignment; + qDebug() << "Received a created assignment:" << *createdAssignment; qDebug() << "Current queue size is" << assignmentQueue.size(); // assignment server is on a public server diff --git a/libraries/shared/src/Assignment.cpp b/libraries/shared/src/Assignment.cpp index e3d9c31e47..20c88700d6 100644 --- a/libraries/shared/src/Assignment.cpp +++ b/libraries/shared/src/Assignment.cpp @@ -6,6 +6,8 @@ // Copyright (c) 2013 HighFidelity, Inc. All rights reserved. // +#include + #include "PacketHeaders.h" #include "Assignment.h" @@ -16,6 +18,9 @@ Assignment::Assignment(Assignment::Direction direction, Assignment::Type type, c _pool(NULL), _domainSocket(NULL) { + // set the create time on this assignment + gettimeofday(&_time, NULL); + // copy the pool, if we got one if (pool) { int poolLength = strlen(pool); @@ -30,6 +35,9 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) : _pool(NULL), _domainSocket(NULL) { + // set the create time on this assignment + gettimeofday(&_time, NULL); + int numBytesRead = 0; if (dataBuffer[0] == PACKET_TYPE_REQUEST_ASSIGNMENT) { diff --git a/libraries/shared/src/Assignment.h b/libraries/shared/src/Assignment.h index e9b19866c2..720169e55b 100644 --- a/libraries/shared/src/Assignment.h +++ b/libraries/shared/src/Assignment.h @@ -34,6 +34,7 @@ public: Assignment::Direction getDirection() const { return _direction; } Assignment::Type getType() const { return _type; } const char* getPool() const { return _pool; } + const timeval& getTime() const { return _time; } const sockaddr* getDomainSocket() { return _domainSocket; } void setDomainSocket(const sockaddr* domainSocket); @@ -44,7 +45,8 @@ private: Assignment::Direction _direction; Assignment::Type _type; char* _pool; - sockaddr* _domainSocket; + sockaddr* _domainSocket; + timeval _time; }; QDebug operator<<(QDebug debug, const Assignment &assignment); diff --git a/libraries/shared/src/SharedUtil.cpp b/libraries/shared/src/SharedUtil.cpp index c34663c077..359a154c4f 100644 --- a/libraries/shared/src/SharedUtil.cpp +++ b/libraries/shared/src/SharedUtil.cpp @@ -26,7 +26,7 @@ #include "PacketHeaders.h" #include "SharedUtil.h" -uint64_t usecTimestamp(timeval *time) { +uint64_t usecTimestamp(const timeval *time) { return (time->tv_sec * 1000000 + time->tv_usec); } diff --git a/libraries/shared/src/SharedUtil.h b/libraries/shared/src/SharedUtil.h index 32553b967b..4b73835f21 100644 --- a/libraries/shared/src/SharedUtil.h +++ b/libraries/shared/src/SharedUtil.h @@ -38,7 +38,7 @@ static const float DECIMETER = 0.1f; static const float CENTIMETER = 0.01f; static const float MILLIIMETER = 0.001f; -uint64_t usecTimestamp(timeval *time); +uint64_t usecTimestamp(const timeval *time); uint64_t usecTimestampNow(); float randFloat(); diff --git a/libraries/shared/src/UDPSocket.cpp b/libraries/shared/src/UDPSocket.cpp index b32a66ccfb..ab2460dd7f 100644 --- a/libraries/shared/src/UDPSocket.cpp +++ b/libraries/shared/src/UDPSocket.cpp @@ -167,11 +167,8 @@ UDPSocket::UDPSocket(unsigned short int listeningPort) : _listeningPort = ntohs(bind_address.sin_port); } - // set timeout on socket recieve to 0.5 seconds - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 500000; - setsockopt(handle, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof tv); + const int DEFAULT_BLOCKING_SOCKET_TIMEOUT_USECS = 0.5 * 1000000; + setBlockingReceiveTimeoutInUsecs(DEFAULT_BLOCKING_SOCKET_TIMEOUT_USECS); qDebug("Created UDP socket listening on port %hu.\n", _listeningPort); } @@ -228,6 +225,11 @@ void UDPSocket::setBlocking(bool blocking) { #endif } +void UDPSocket::setBlockingReceiveTimeoutInUsecs(int timeoutUsecs) { + struct timeval tv = {timeoutUsecs / 1000000, timeoutUsecs % 1000000}; + setsockopt(handle, SOL_SOCKET, SO_RCVTIMEO, (char *)&tv, sizeof(tv)); +} + // Receive data on this socket with retrieving address of sender bool UDPSocket::receive(void* receivedData, ssize_t* receivedBytes) const { return receive((sockaddr*) &senderAddress, receivedData, receivedBytes); diff --git a/libraries/shared/src/UDPSocket.h b/libraries/shared/src/UDPSocket.h index eec6c9bf43..1b627dbd41 100644 --- a/libraries/shared/src/UDPSocket.h +++ b/libraries/shared/src/UDPSocket.h @@ -22,12 +22,17 @@ class UDPSocket { public: UDPSocket(unsigned short int listeningPort); ~UDPSocket(); + bool init(); unsigned short int getListeningPort() const { return _listeningPort; } + void setBlocking(bool blocking); bool isBlocking() const { return blocking; } + void setBlockingReceiveTimeoutInUsecs(int timeoutUsecs); + int send(sockaddr* destAddress, const void* data, size_t byteLength) const; int send(char* destAddress, int destPort, const void* data, size_t byteLength) const; + bool receive(void* receivedData, ssize_t* receivedBytes) const; bool receive(sockaddr* recvAddress, void* receivedData, ssize_t* receivedBytes) const; private: