diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index 56a7abdb67..a2e190c7f3 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -6,6 +6,7 @@ // Copyright (c) 2012 High Fidelity, Inc. All rights reserved. // +#include #include #include #include @@ -39,12 +40,14 @@ bool includeMovingBug = true; bool includeBlinkingVoxel = false; bool includeDanceFloor = true; bool buildStreet = false; +bool nonThreadedPacketSender = false; const int ANIMATION_LISTEN_PORT = 40107; const int ACTUAL_FPS = 60; const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS -const int ANIMATE_VOXELS_INTERVAL_USECS = OUR_FPS_IN_MILLISECONDS * 1000.0; // converts from milliseconds to usecs +const int FUDGE_USECS = 10.0; // a little bit of fudge to actually do some processing +const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs bool wantLocalDomain = false; @@ -619,6 +622,9 @@ void* animateVoxels(void* args) { if (::voxelEditPacketSender) { ::voxelEditPacketSender->releaseQueuedMessages(); + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->process(); + } } uint64_t end = usecTimestampNow(); @@ -629,6 +635,9 @@ void* animateVoxels(void* args) { } // dynamically sleep until we need to fire off the next set of voxels uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); + if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) { + usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS; + } if (usecToSleep > 0) { usleep(usecToSleep); @@ -648,6 +657,11 @@ int main(int argc, const char * argv[]) NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT); setvbuf(stdout, NULL, _IOLBF, 0); + // Handle Local Domain testing with the --local command line + const char* NON_THREADED_PACKETSENDER = "--NonThreadedPacketSender"; + ::nonThreadedPacketSender = cmdOptionExists(argc, argv, NON_THREADED_PACKETSENDER); + printf("nonThreadedPacketSender=%s\n", debug::valueOf(::nonThreadedPacketSender)); + // Handle Local Domain testing with the --local command line const char* NO_BILLBOARD = "--NoBillboard"; ::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD); @@ -702,10 +716,13 @@ int main(int argc, const char * argv[]) // Create out VoxelEditPacketSender ::voxelEditPacketSender = new VoxelEditPacketSender; if (::voxelEditPacketSender) { - ::voxelEditPacketSender->initialize(true); + ::voxelEditPacketSender->initialize(!::nonThreadedPacketSender); if (::jurisdictionListener) { ::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions()); } + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS); + } } srand((unsigned)time(0)); diff --git a/assignment-client/src/Agent.cpp b/assignment-client/src/Agent.cpp index c0ca8b18a1..56eea450cb 100644 --- a/assignment-client/src/Agent.cpp +++ b/assignment-client/src/Agent.cpp @@ -149,9 +149,11 @@ void Agent::run() { qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n"; } - // flush the queue of packets and then process them so they are all sent off + // release the queue of edit voxel messages. voxelScripter.getVoxelPacketSender()->releaseQueuedMessages(); - voxelScripter.getVoxelPacketSender()->processWithoutSleep(); + + // since we're in non-threaded mode, call process so that the packets are sent + voxelScripter.getVoxelPacketSender()->process(); } while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) { diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index e363d16178..013b7a7936 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -42,13 +42,15 @@ protected: void unlock() { pthread_mutex_unlock(&_mutex); } bool isStillRunning() const { return !_stopThread; } + + bool isThreaded() const { return _isThreaded; } private: pthread_mutex_t _mutex; - bool _stopThread; - bool _isThreaded; - pthread_t _thread; + bool _stopThread; + bool _isThreaded; + pthread_t _thread; }; extern "C" void* GenericThreadEntry(void* arg); diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index d496b3e1c7..4a2b879e74 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -8,6 +8,7 @@ // Threaded or non-threaded packet sender. // +#include #include #include "NodeList.h" @@ -17,9 +18,13 @@ const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; +const int AVERAGE_CALL_TIME_SAMPLES = 10; PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : _packetsPerSecond(packetsPerSecond), + _usecsPerProcessCallHint(0), + _lastProcessCallTime(usecTimestampNow()), + _averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES), _lastSendTime(usecTimestampNow()), _notify(notify) { @@ -27,7 +32,6 @@ PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { - //printf("PacketSender::queuePacketForSending packetLength=%ld\n",packetLength); NetworkPacket packet(address, packetData, packetLength); lock(); _packets.push_back(packet); @@ -35,20 +39,67 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe } bool PacketSender::process() { - printf("PacketSender::process() _packets.size()=%ld\n",_packets.size()); uint64_t USECS_PER_SECOND = 1000 * 1000; uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); + // keep track of our process call times, so we have a reliable account of how often our caller calls us + uint64_t now = usecTimestampNow(); + uint64_t elapsedSinceLastCall = now - _lastProcessCallTime; + _lastProcessCallTime = now; + _averageProcessCallTime.updateAverage(elapsedSinceLastCall); + if (_packets.size() == 0) { - usleep(SEND_INTERVAL_USECS); + if (isThreaded()) { + usleep(SEND_INTERVAL_USECS); + } else { + return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us + } } - while (_packets.size() > 0) { + + int packetsPerCall = _packets.size(); // in threaded mode, we just empty this! + int packetsThisCall = 0; + + // if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process + // based on how often we get called... We do this by keeping a running average of our call times, and we determine + // how many packets to send per call + if (!isThreaded()) { + int averageCallTime; + const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2; + if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) { + averageCallTime = _averageProcessCallTime.getAverage(); + } else { + averageCallTime = _usecsPerProcessCallHint; + } + + // we can determine how many packets we need to send per call to achieve our desired + // packets per second send rate. + int callsPerSecond = USECS_PER_SECOND / averageCallTime; + packetsPerCall = ceil(_packetsPerSecond / callsPerSecond); + + // send at least one packet per call, if we have it + if (packetsPerCall < 1) { + packetsPerCall = 1; + } + } + + bool keepGoing = _packets.size() > 0; + while (keepGoing) { + + // in threaded mode, we go till we're empty + if (isThreaded()) { + keepGoing = _packets.size() > 0; + } else { + // in non-threaded mode, we send as many packets as we need per expected call to process() + keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0); + } + NetworkPacket& packet = _packets.front(); // send the packet through the NodeList... UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); + packetsThisCall++; if (_notify) { _notify->packetSentNotification(packet.getLength()); @@ -58,34 +109,18 @@ bool PacketSender::process() { _packets.erase(_packets.begin()); unlock(); - uint64_t now = usecTimestampNow(); - // dynamically sleep until we need to fire off the next set of voxels - uint64_t elapsed = now - _lastSendTime; - int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed); - _lastSendTime = now; - if (usecToSleep > 0) { - usleep(usecToSleep); + // dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode + if (isThreaded()) { + uint64_t elapsed = now - _lastSendTime; + int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed); + + // we only sleep in non-threaded mode + if (usecToSleep > 0) { + usleep(usecToSleep); + } } - + _lastSendTime = now; } + return isStillRunning(); // keep running till they terminate us } - -void PacketSender::processWithoutSleep() { - while (_packets.size() > 0) { - NetworkPacket& packet = _packets.front(); - - // send the packet through the NodeList... - UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); - - nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); - - if (_notify) { - _notify->packetSentNotification(packet.getLength()); - } - - lock(); - _packets.erase(_packets.begin()); - unlock(); - } -} diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index babcaca82b..84ae6ae9ca 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -43,7 +43,6 @@ public: PacketSenderNotify* getPacketSenderNotify() const { return _notify; } virtual bool process(); - virtual void processWithoutSleep(); /// are there packets waiting in the send queue to be sent bool hasPacketsToSend() const { return _packets.size() > 0; } @@ -51,8 +50,16 @@ public: /// how many packets are there in the send queue waiting to be sent int packetsToSendCount() const { return _packets.size(); } + /// If you're running in non-threaded mode, call this to give us a hint as to how frequently you will call process. + /// This has no effect in threaded mode. This is only considered a hint in non-threaded mode. + /// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode. + void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; } + protected: int _packetsPerSecond; + int _usecsPerProcessCallHint; + uint64_t _lastProcessCallTime; + SimpleMovingAverage _averageProcessCallTime; private: std::vector _packets; diff --git a/libraries/voxels/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp index a2190440d2..fbf171eb58 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -48,8 +48,6 @@ VoxelEditPacketSender::~VoxelEditPacketSender() { void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { - //printf("void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail)\n"); - // allows app to disable sending if for example voxels have been disabled if (!_shouldSend) { return; // bail early @@ -72,7 +70,6 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& // if we've saved MORE than out max, then clear out the oldest packet... int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); if (allPendingMessages > _maxPendingMessages) { - printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n"); EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); delete packet; _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); @@ -102,9 +99,6 @@ bool VoxelEditPacketSender::voxelServersExist() const { // This method is called when the edit packet layer has determined that it has a fully formed packet destined for // a known nodeID. However, we also want to handle the case where the void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { - -//printf("void VoxelEditPacketSender::queuePacketToNode(nodeID=%d)\n",nodeID); - NodeList* nodeList = NodeList::getInstance(); for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER @@ -117,9 +111,6 @@ void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bu } void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) { - -//printf("void VoxelEditPacketSender::queueVoxelEditMessages()\n"); - if (!_shouldSend) { return; // bail early } @@ -135,10 +126,6 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO } void VoxelEditPacketSender::processPreServerExistsPackets() { - -printf("void VoxelEditPacketSender::processPreServerExistsPackets()\n"); - - assert(voxelServersExist()); // we should only be here if we have jurisdictions // First send out all the single message packets... @@ -166,9 +153,6 @@ printf("void VoxelEditPacketSender::processPreServerExistsPackets()\n"); } void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) { - -printf("void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, length=%ld)\n", length); - if (!_shouldSend) { return; // bail early } @@ -200,9 +184,6 @@ printf("void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { - -//printf("void VoxelEditPacketSender::queueVoxelEditMessage(unsigned char* codeColorBuffer, length=%ld)\n", length); - if (!_shouldSend) { return; // bail early } @@ -217,7 +198,6 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha // if we've saved MORE than out max, then clear out the oldest packet... int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); if (allPendingMessages > _maxPendingMessages) { - printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n"); EditPacketBuffer* packet = _preServerPackets.front(); delete packet; _preServerPackets.erase(_preServerPackets.begin()); @@ -267,17 +247,13 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha } void VoxelEditPacketSender::releaseQueuedMessages() { - //printf("void VoxelEditPacketSender::releaseQueuedMessages()\n"); - // if we don't yet have jurisdictions then we can't actually release messages yet because we don't // know where to send them to. Instead, just remember this request and when we eventually get jurisdictions // call release again at that time. if (!voxelServersExist()) { - //printf("...no voxel servers... _releaseQueuedMessagesPending=true\n"); _releaseQueuedMessagesPending = true; } else { for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - //printf("...actually calling releaseQueuedPacket()\n"); releaseQueuedPacket(i->second); } } @@ -298,12 +274,9 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC } bool VoxelEditPacketSender::process() { - printf("VoxelEditPacketSender::process()\n"); - // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those // before doing our normal process step. This processPreJurisdictionPackets() if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { - printf("processPreServerExistsPackets()<<<<<<<<<<<<<<<<<<<<<\n"); processPreServerExistsPackets(); }