From 1a6efea5c098e0079d63831f455387921bcfd152 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Sun, 10 Nov 2013 10:39:56 -0800 Subject: [PATCH] improvements to PPS behavior of PacketSender --- animation-server/src/main.cpp | 209 +++++++--- libraries/shared/src/PacketSender.cpp | 363 ++++++++++++++---- libraries/shared/src/PacketSender.h | 62 ++- .../voxels/src/VoxelEditPacketSender.cpp | 29 +- 4 files changed, 521 insertions(+), 142 deletions(-) diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index 2e13636eda..d1269044ce 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -41,13 +41,20 @@ bool includeBlinkingVoxel = false; bool includeDanceFloor = true; bool buildStreet = false; bool nonThreadedPacketSender = false; +int packetsPerSecond = PacketSender::DEFAULT_PACKETS_PER_SECOND; +bool waitForVoxelServer = true; const int ANIMATION_LISTEN_PORT = 40107; -const int ACTUAL_FPS = 60; -const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS -const int FUDGE_USECS = 10; // a little bit of fudge to actually do some processing -const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs +int ANIMATE_FPS = 60; +double ANIMATE_FPS_IN_MILLISECONDS = 1000.0/ANIMATE_FPS; // determines FPS from our desired FPS +int ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs + + +int PROCESSING_FPS = 60; +double PROCESSING_FPS_IN_MILLISECONDS = 1000.0/PROCESSING_FPS; // determines FPS from our desired FPS +int FUDGE_USECS = 650; // a little bit of fudge to actually do some processing +int PROCESSING_INTERVAL_USECS = (PROCESSING_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs bool wantLocalDomain = false; @@ -361,7 +368,7 @@ unsigned char danceFloorOnColorB[DANCE_FLOOR_COLORS][3] = { float danceFloorGradient = 0.5f; const float BEATS_PER_MINUTE = 118.0f; const float SECONDS_PER_MINUTE = 60.0f; -const float FRAMES_PER_BEAT = (SECONDS_PER_MINUTE * ACTUAL_FPS) / BEATS_PER_MINUTE; +const float FRAMES_PER_BEAT = (SECONDS_PER_MINUTE * ANIMATE_FPS) / BEATS_PER_MINUTE; float danceFloorGradientIncrement = 1.0f / FRAMES_PER_BEAT; const float DANCE_FLOOR_MAX_GRADIENT = 1.0f; const float DANCE_FLOOR_MIN_GRADIENT = 0.0f; @@ -591,72 +598,119 @@ double start = 0; void* animateVoxels(void* args) { - - timeval lastSendTime; + + uint64_t lastAnimateTime = 0; + uint64_t lastProcessTime = 0; + int processesPerAnimate = 0; bool firstTime = true; + + std::cout << "Setting PPS to " << ::packetsPerSecond << "\n"; + ::voxelEditPacketSender->setPacketsPerSecond(::packetsPerSecond); + + std::cout << "PPS set to " << ::voxelEditPacketSender->getPacketsPerSecond() << "\n"; while (true) { - gettimeofday(&lastSendTime, NULL); + + // If we're asked to wait for voxel servers, and there isn't one available yet, then + // let the voxelEditPacketSender process and move on. + if (::waitForVoxelServer && !::voxelEditPacketSender->voxelServersExist()) { + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->process(); + } + } else { + if (firstTime) { + lastAnimateTime = usecTimestampNow(); + firstTime = false; + } + lastProcessTime = usecTimestampNow(); + + int packetsStarting = 0; + int packetsEnding = 0; - int packetsStarting = ::voxelEditPacketSender->packetsToSendCount(); + // The while loop will be running at PROCESSING_FPS, but we only want to call these animation functions at + // ANIMATE_FPS. So we check out last animate time and only call these if we've elapsed that time. + uint64_t now = usecTimestampNow(); + uint64_t animationElapsed = now - lastAnimateTime; + int withinAnimationTarget = ANIMATE_VOXELS_INTERVAL_USECS - animationElapsed; - // some animations - //sendVoxelBlinkMessage(); + int animateLoopsPerAnimate = 0; + while (withinAnimationTarget < 2000) { + processesPerAnimate = 0; + animateLoopsPerAnimate++; + + lastAnimateTime = now; + packetsStarting = ::voxelEditPacketSender->packetsToSendCount(); + + // some animations + //sendVoxelBlinkMessage(); - if (::includeBillboard) { - sendBillboard(); - } - if (::includeBorderTracer) { - sendBlinkingStringOfLights(); - } - if (::includeMovingBug) { - renderMovingBug(); - } - if (::includeBlinkingVoxel) { - sendVoxelBlinkMessage(); - } - if (::includeDanceFloor) { - sendDanceFloor(); - } + if (::includeBillboard) { + sendBillboard(); + } + if (::includeBorderTracer) { + sendBlinkingStringOfLights(); + } + if (::includeMovingBug) { + renderMovingBug(); + } + if (::includeBlinkingVoxel) { + sendVoxelBlinkMessage(); + } + if (::includeDanceFloor) { + sendDanceFloor(); + } - if (::buildStreet) { - doBuildStreet(); - } + if (::buildStreet) { + doBuildStreet(); + } - ::voxelEditPacketSender->releaseQueuedMessages(); - int packetsEnding = ::voxelEditPacketSender->packetsToSendCount(); + packetsEnding = ::voxelEditPacketSender->packetsToSendCount(); + + if (animationElapsed > ANIMATE_VOXELS_INTERVAL_USECS) { + animationElapsed -= ANIMATE_VOXELS_INTERVAL_USECS; // credit ourselves one animation frame + } else { + animationElapsed = 0; + } + withinAnimationTarget = ANIMATE_VOXELS_INTERVAL_USECS - animationElapsed; + + ::voxelEditPacketSender->releaseQueuedMessages(); + } - if (firstTime) { - int packetsPerSecond = std::max(ACTUAL_FPS, (packetsEnding - packetsStarting) * (ACTUAL_FPS)); - - std::cout << "Setting PPS to " << packetsPerSecond << "\n"; - - ::voxelEditPacketSender->setPacketsPerSecond(packetsPerSecond); - firstTime = false; - } - - - if (::nonThreadedPacketSender) { - ::voxelEditPacketSender->process(); - } + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->process(); + } + processesPerAnimate++; - uint64_t end = usecTimestampNow(); - uint64_t elapsedSeconds = (end - ::start) / 1000000; - if (::shouldShowPacketsPerSecond) { - printf("packetsSent=%ld, bytesSent=%ld pps=%f bps=%f\n",packetsSent,bytesSent, - (float)(packetsSent/elapsedSeconds),(float)(bytesSent/elapsedSeconds)); + if (::shouldShowPacketsPerSecond) { + float lifetimeSeconds = ::voxelEditPacketSender->getLifetimeInSeconds(); + int targetPPS = ::voxelEditPacketSender->getPacketsPerSecond(); + float lifetimePPS = ::voxelEditPacketSender->getLifetimePPS(); + float lifetimeBPS = ::voxelEditPacketSender->getLifetimeBPS(); + uint64_t totalPacketsSent = ::voxelEditPacketSender->getLifetimePacketsSent(); + uint64_t totalBytesSent = ::voxelEditPacketSender->getLifetimeBytesSent(); + + float lifetimePPSQueued = ::voxelEditPacketSender->getLifetimePPSQueued(); + float lifetimeBPSQueued = ::voxelEditPacketSender->getLifetimeBPSQueued(); + uint64_t totalPacketsQueued = ::voxelEditPacketSender->getLifetimePacketsQueued(); + uint64_t totalBytesQueued = ::voxelEditPacketSender->getLifetimeBytesQueued(); + + uint64_t packetsPending = ::voxelEditPacketSender->packetsToSendCount(); + + printf("lifetime=%f secs packetsSent=%lld, bytesSent=%lld targetPPS=%d pps=%f bps=%f\n", + lifetimeSeconds, totalPacketsSent, totalBytesSent, targetPPS, lifetimePPS, lifetimeBPS); + printf("packetsPending=%lld packetsQueued=%lld, bytesQueued=%lld ppsQueued=%f bpsQueued=%f\n", + packetsPending, totalPacketsQueued, totalBytesQueued, lifetimePPSQueued, lifetimeBPSQueued); + } } // dynamically sleep until we need to fire off the next set of voxels - uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); - if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) { - usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS; + uint64_t usecToSleep = PROCESSING_INTERVAL_USECS - (usecTimestampNow() - lastProcessTime); + if (usecToSleep > PROCESSING_INTERVAL_USECS) { + usecToSleep = PROCESSING_INTERVAL_USECS; } - + if (usecToSleep > 0) { usleep(usecToSleep); - } else { - std::cout << "Last send took too much time, not sleeping!\n"; } } @@ -718,6 +772,49 @@ int main(int argc, const char * argv[]) NodeList::getInstance()->setDomainHostname(domainHostname); } + const char* packetsPerSecondCommand = getCmdOption(argc, argv, "--pps"); + if (packetsPerSecondCommand) { + ::packetsPerSecond = atoi(packetsPerSecondCommand); + } + printf("packetsPerSecond=%d\n",packetsPerSecond); + + const char* animateFPSCommand = getCmdOption(argc, argv, "--AnimateFPS"); + const char* animateIntervalCommand = getCmdOption(argc, argv, "--AnimateInterval"); + if (animateFPSCommand || animateIntervalCommand) { + if (animateIntervalCommand) { + ::ANIMATE_FPS_IN_MILLISECONDS = atoi(animateIntervalCommand); + ::ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs + ::ANIMATE_FPS = PacketSender::USECS_PER_SECOND / ::ANIMATE_VOXELS_INTERVAL_USECS; + } else { + ::ANIMATE_FPS = atoi(animateFPSCommand); + ::ANIMATE_FPS_IN_MILLISECONDS = 1000.0/ANIMATE_FPS; // determines FPS from our desired FPS + ::ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs + } + } + printf("ANIMATE_FPS=%d\n",ANIMATE_FPS); + printf("ANIMATE_VOXELS_INTERVAL_USECS=%d\n",ANIMATE_VOXELS_INTERVAL_USECS); + + const char* processingFPSCommand = getCmdOption(argc, argv, "--ProcessingFPS"); + const char* processingIntervalCommand = getCmdOption(argc, argv, "--ProcessingInterval"); + if (processingFPSCommand || processingIntervalCommand) { + if (processingIntervalCommand) { + ::PROCESSING_FPS_IN_MILLISECONDS = atoi(processingIntervalCommand); + ::PROCESSING_INTERVAL_USECS = ::PROCESSING_FPS_IN_MILLISECONDS * 1000.0; + ::PROCESSING_FPS = PacketSender::USECS_PER_SECOND / ::PROCESSING_INTERVAL_USECS; + } else { + ::PROCESSING_FPS = atoi(processingFPSCommand); + ::PROCESSING_FPS_IN_MILLISECONDS = 1000.0/PROCESSING_FPS; // determines FPS from our desired FPS + ::PROCESSING_INTERVAL_USECS = (PROCESSING_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs + } + } + printf("PROCESSING_FPS=%d\n",PROCESSING_FPS); + printf("PROCESSING_INTERVAL_USECS=%d\n",PROCESSING_INTERVAL_USECS); + + if (cmdOptionExists(argc, argv, "--quickExit")) { + return 0; + } + + nodeList->linkedDataCreateCallback = NULL; // do we need a callback? nodeList->startSilentNodeRemovalThread(); @@ -735,7 +832,7 @@ int main(int argc, const char * argv[]) ::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions()); } if (::nonThreadedPacketSender) { - ::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS); + ::voxelEditPacketSender->setProcessCallIntervalHint(PROCESSING_INTERVAL_USECS); } srand((unsigned)time(0)); diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index f2a104634e..aa9f11ead1 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -15,18 +15,31 @@ #include "PacketSender.h" #include "SharedUtil.h" -const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; +const uint64_t PacketSender::USECS_PER_SECOND = 1000 * 1000; +const uint64_t PacketSender::SENDING_INTERVAL_ADJUST = 200; // approaximate 200us +const int PacketSender::TARGET_FPS = 60; +const int PacketSender::MAX_SLEEP_INTERVAL = PacketSender::USECS_PER_SECOND; + +const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 30; const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; +const int PacketSender::MINIMAL_SLEEP_INTERVAL = (USECS_PER_SECOND / TARGET_FPS) / 2; const int AVERAGE_CALL_TIME_SAMPLES = 10; PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : _packetsPerSecond(packetsPerSecond), _usecsPerProcessCallHint(0), - _lastProcessCallTime(usecTimestampNow()), + _lastProcessCallTime(0), _averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES), - _lastSendTime(usecTimestampNow()), - _notify(notify) + _lastSendTime(0), // Note: we set this to 0 to indicate we haven't yet sent something + _notify(notify), + _lastPPSCheck(0), + _packetsOverCheckInterval(0), + _started(usecTimestampNow()), + _totalPacketsSent(0), + _totalBytesSent(0), + _totalPacketsQueued(0), + _totalBytesQueued(0) { } @@ -36,66 +49,281 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe lock(); _packets.push_back(packet); unlock(); + _totalPacketsQueued++; + _totalBytesQueued += packetLength; } bool PacketSender::process() { + if (isThreaded()) { + return threadedProcess(); + } + return nonThreadedProcess(); +} + + +bool PacketSender::threadedProcess() { bool hasSlept = false; - uint64_t USECS_PER_SECOND = 1000 * 1000; - uint64_t USECS_SMALL_ADJUST = 2 * 1000; // approaximate 2ms - uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); - uint64_t INTERVAL_SLEEP_USECS = (SEND_INTERVAL_USECS > USECS_SMALL_ADJUST) ? - SEND_INTERVAL_USECS - USECS_SMALL_ADJUST : SEND_INTERVAL_USECS; - // keep track of our process call times, so we have a reliable account of how often our caller calls us - uint64_t now = usecTimestampNow(); - uint64_t elapsedSinceLastCall = now - _lastProcessCallTime; - _lastProcessCallTime = now; - _averageProcessCallTime.updateAverage(elapsedSinceLastCall); + if (_lastSendTime == 0) { + _lastSendTime = usecTimestampNow(); + } - if (_packets.size() == 0) { - if (isThreaded()) { - usleep(INTERVAL_SLEEP_USECS); + // in threaded mode, we keep running and just empty our packet queue sleeping enough to keep our PPS on target + while (_packets.size() > 0) { + // Recalculate our SEND_INTERVAL_USECS each time, in case the caller has changed it on us.. + int packetsPerSecondTarget = (_packetsPerSecond > MINIMUM_PACKETS_PER_SECOND) + ? _packetsPerSecond : MINIMUM_PACKETS_PER_SECOND; + + uint64_t intervalBetweenSends = USECS_PER_SECOND / packetsPerSecondTarget; + uint64_t sleepInterval = (intervalBetweenSends > SENDING_INTERVAL_ADJUST) ? + intervalBetweenSends - SENDING_INTERVAL_ADJUST : intervalBetweenSends; + + // We'll sleep before we send, this way, we can set our last send time to be our ACTUAL last send time + uint64_t now = usecTimestampNow(); + uint64_t elapsed = now - _lastSendTime; + int usecToSleep = sleepInterval - elapsed; + + // If we've never sent, or it's been a long time since we sent, then our elapsed time will be quite large + // and therefore usecToSleep will be less than 0 and we won't sleep before sending... + if (usecToSleep > 0) { + if (usecToSleep > MAX_SLEEP_INTERVAL) { + usecToSleep = MAX_SLEEP_INTERVAL; + } + usleep(usecToSleep); hasSlept = true; - } else { - // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us - return isStillRunning(); + } + + // call our non-threaded version of ourselves + bool keepRunning = nonThreadedProcess(); + + if (!keepRunning) { + break; } } - int packetsPerCall = _packets.size(); // in threaded mode, we just empty this! - int packetsThisCall = 0; + // if threaded and we haven't slept? We want to sleep a little so we don't hog the CPU, but + // we don't want to sleep too long because how ever much we sleep will delay any future unsent + // packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval + if (!hasSlept) { + usleep(MINIMAL_SLEEP_INTERVAL); + } + + return isStillRunning(); +} + + +// We may be called more frequently than we get packets or need to send packets, we may also get called less frequently. +// +// If we're called more often then out target PPS then we will space our our actual sends to be a single packet for multiple +// calls to process. Those calls to proces in which we do not need to send a packet to keep up with our target PPS we will +// just track our call rate (in order to predict our sends per call) but we won't actually send any packets. +// +// When we are called less frequently than we have packets to send, we will send enough packets per call to keep up with our +// target PPS. +bool PacketSender::nonThreadedProcess() { + uint64_t now = usecTimestampNow(); + + if (_lastProcessCallTime == 0) { + _lastProcessCallTime = now - _usecsPerProcessCallHint; + } + + const uint64_t MINIMUM_POSSIBLE_CALL_TIME = 10; // in usecs + const uint64_t USECS_PER_SECOND = 1000 * 1000; + const float ZERO_RESET_CALLS_PER_SECOND = 1; // used in guard against divide by zero - // if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process + bool wantDebugging = false; + if (wantDebugging) { + printf("\n\nPacketSender::nonThreadedProcess() _packets.size()=%ld\n",_packets.size()); + } + + // keep track of our process call times, so we have a reliable account of how often our caller calls us + uint64_t elapsedSinceLastCall = now - _lastProcessCallTime; + _lastProcessCallTime = now; + _averageProcessCallTime.updateAverage(elapsedSinceLastCall); + + float averageCallTime = 0; + const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2; + if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) { + averageCallTime = _averageProcessCallTime.getAverage(); + + if (wantDebugging) { + printf("averageCallTime = _averageProcessCallTime.getAverage() =%f\n",averageCallTime); + } + + } else { + averageCallTime = _usecsPerProcessCallHint; + + if (wantDebugging) { + printf("averageCallTime = _usecsPerProcessCallHint =%f\n",averageCallTime); + } + + } + + if (wantDebugging) { + printf("elapsedSinceLastCall=%llu averageCallTime=%f\n",elapsedSinceLastCall, averageCallTime); + } + + if (_packets.size() == 0) { + // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us + return isStillRunning(); + } + + // This only happens once, the first time we get this far... so we can use it as an accurate initialization + // point for these important timing variables + if (_lastPPSCheck == 0) { + _lastPPSCheck = now; + _started = now - (uint64_t)averageCallTime; + } + + + float averagePacketsPerCall = 0; // might be less than 1, if our caller calls us more frequently than the target PPS + int packetsSentThisCall = 0; + int packetsToSendThisCall = 0; + + // Since we're in non-threaded mode, we need to determine how many packets to send per call to process // based on how often we get called... We do this by keeping a running average of our call times, and we determine // how many packets to send per call - if (!isThreaded()) { - int averageCallTime; - const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2; - if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) { - averageCallTime = _averageProcessCallTime.getAverage(); - } else { - averageCallTime = _usecsPerProcessCallHint; + // We assume you can't possibly call us less than MINIMUM_POSSIBLE_CALL_TIME apart + if (averageCallTime <= 0) { + averageCallTime = MINIMUM_POSSIBLE_CALL_TIME; + } + + // we can determine how many packets we need to send per call to achieve our desired + // packets per second send rate. + float callsPerSecond = USECS_PER_SECOND / averageCallTime; + + if (wantDebugging) { + printf("PacketSender::process() USECS_PER_SECOND=%llu averageCallTime=%f callsPerSecond=%f\n", + USECS_PER_SECOND, averageCallTime, callsPerSecond ); + } + + // theoretically we could get called less than 1 time per second... but since we're using floats, it really shouldn't be + // possible to get 0 calls per second, but we will guard agains that here, just in case. + if (callsPerSecond == 0) { + callsPerSecond = ZERO_RESET_CALLS_PER_SECOND; + qDebug("PacketSender::nonThreadedProcess() UNEXPECTED:callsPerSecond==0, assuming ZERO_RESET_CALLS_PER_SECOND\n"); + } + + // This is the average number of packets per call... + averagePacketsPerCall = _packetsPerSecond / callsPerSecond; + packetsToSendThisCall = averagePacketsPerCall; + if (wantDebugging) { + printf("PacketSender::process() averageCallTime=%f averagePacketsPerCall=%f packetsToSendThisCall=%d\n", + averageCallTime, averagePacketsPerCall, packetsToSendThisCall); + } + + // if we get called more than 1 per second, we want to mostly divide the packets evenly across the calls... + // but we want to track the remainder and make sure over the course of a second, we are sending the target PPS + // e.g. + // 200pps called 60 times per second... + // 200/60 = 3.333... so really... + // each call we should send 3 + // every 3rd call we should send 4... + // 3,3,4,3,3,4...3,3,4 = 200... + + // if we get called less than 1 per second, then we want to send more than our PPS each time... + // e.g. + // 200pps called ever 1332.5ms + // 200 / (1000/1332.5) = 200/(0.7505) = 266.5 packets per call + // so... + // every other call we should send 266 packets + // then on the next call we should send 267 packets + + // So no mater whether or not we're getting called more or less than once per second, we still need to do some bookkeeping + // to make sure we send a few extra packets to even out our flow rate. + // + // So how do we do that reasonably? + // + // keep a _lastPPSCheck + // keep a _packetsSinceLastPPSCheck + // + // check elapsed since _lastTimeCheck + uint64_t elapsedSinceLastCheck = now - _lastPPSCheck; + + // if we get called more than once per second then check our PPS each time elapsedSinceLastPPSCheck > 1 second + // if we get called less than once per second, then check out PPS over ever 2 calls to process + const float CALL_INTERVALS_TO_CHECK = 1; + const float MIN_CALL_INTERVALS_PER_RESET = 5; + float callIntervalsPerReset = fmax(callsPerSecond, MIN_CALL_INTERVALS_PER_RESET); + + if (wantDebugging) { + printf("about to check interval... callsPerSecond=%f elapsedSinceLastPPSCheck=%llu checkInterval=%f\n", + callsPerSecond, elapsedSinceLastCheck, (averageCallTime * CALL_INTERVALS_TO_CHECK)); + } + + // ((callsPerSecond > 1 && elapsedSinceLastPPSCheck > USECS_PER_SECOND) + if (elapsedSinceLastCheck > (averageCallTime * CALL_INTERVALS_TO_CHECK)) { + + if (wantDebugging) { + printf(">>>>>>>>>>>>>>> check interval... _packetsOverCheckInterval=%d elapsedSinceLastPPSCheck=%llu\n", + _packetsOverCheckInterval, elapsedSinceLastCheck); } - // we can determine how many packets we need to send per call to achieve our desired - // packets per second send rate. - int callsPerSecond = USECS_PER_SECOND / averageCallTime; + float ppsOverCheckInterval = (float)_packetsOverCheckInterval; + float ppsExpectedForCheckInterval = (float)_packetsPerSecond * ((float)elapsedSinceLastCheck / (float)USECS_PER_SECOND); + + if (wantDebugging) { + printf(">>>>>>>>>>>>>>> check interval... ppsOverCheckInterval=%f ppsExpectedForCheckInterval=%f\n", + ppsOverCheckInterval, ppsExpectedForCheckInterval); + } + + if (ppsOverCheckInterval < ppsExpectedForCheckInterval) { + int adjust = ppsExpectedForCheckInterval - ppsOverCheckInterval; + packetsToSendThisCall += adjust; + if (wantDebugging) { + qDebug("Lower pps [%f] than expected [%f] over check interval [%llu], adjusting UP by %d.\n", + ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck, adjust); + } + } else if (ppsOverCheckInterval > ppsExpectedForCheckInterval) { + int adjust = ppsOverCheckInterval - ppsExpectedForCheckInterval; + packetsToSendThisCall -= adjust; + if (wantDebugging) { + qDebug("Higher pps [%f] than expected [%f] over check interval [%llu], adjusting DOWN by %d.\n", + ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck, adjust); + } + } else { + if (wantDebugging) { + qDebug("pps [%f] is expected [%f] over check interval [%llu], NO ADJUSTMENT.\n", + ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck); + } + } - // make sure our number of calls per second doesn't cause a divide by zero - callsPerSecond = glm::clamp(callsPerSecond, 1, _packetsPerSecond); + // now, do we want to adjust the check interval? don't want to completely reset, because we would still have + // a rounding error. instead, we check to see that we've passed the reset interval (which is much larger than + // the check interval. + if (wantDebugging) { + printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> Should we reset? callsPerSecond=%f elapsedSinceLastPPSCheck=%llu resetInterval=%f\n", + callsPerSecond, elapsedSinceLastCheck, (averageCallTime * callIntervalsPerReset)); + } - packetsPerCall = ceil(_packetsPerSecond / callsPerSecond); - - // send at least one packet per call, if we have it - if (packetsPerCall < 1) { - packetsPerCall = 1; + if (elapsedSinceLastCheck > (averageCallTime * callIntervalsPerReset)) { + + if (wantDebugging) { + printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> elapsedSinceLastCheck/2=%llu _packetsOverCheckInterval/2=%d\n", + (elapsedSinceLastCheck/2), (_packetsOverCheckInterval/2)); + } + + + _lastPPSCheck += (elapsedSinceLastCheck/2); + _packetsOverCheckInterval = (_packetsOverCheckInterval/2); + + elapsedSinceLastCheck = now - _lastPPSCheck; + + if (wantDebugging) { + printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> NEW _lastPPSCheck=%llu elapsedSinceLastCheck=%llu _packetsOverCheckInterval=%d\n", + _lastPPSCheck, elapsedSinceLastCheck, _packetsOverCheckInterval); + } } } int packetsLeft = _packets.size(); - bool keepGoing = packetsLeft > 0; - while (keepGoing) { + + if (wantDebugging) { + printf("packetsSentThisCall=%d packetsToSendThisCall=%d packetsLeft=%d\n", + packetsSentThisCall, packetsToSendThisCall, packetsLeft); + } + while ((packetsSentThisCall < packetsToSendThisCall) && (packetsLeft > 0)) { lock(); NetworkPacket& packet = _packets.front(); NetworkPacket temporary = packet; // make a copy @@ -107,52 +335,19 @@ bool PacketSender::process() { UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); nodeSocket->send(&temporary.getAddress(), temporary.getData(), temporary.getLength()); - packetsThisCall++; + packetsSentThisCall++; + _packetsOverCheckInterval++; + _totalPacketsSent++; + _totalBytesSent += temporary.getLength(); + if (wantDebugging) { + printf("nodeSocket->send()... packetsSentThisCall=%d _packetsOverCheckInterval=%d\n", + packetsSentThisCall, _packetsOverCheckInterval); + } if (_notify) { _notify->packetSentNotification(temporary.getLength()); } - - - // in threaded mode, we go till we're empty - if (isThreaded()) { - keepGoing = packetsLeft > 0; - - // dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode - if (keepGoing) { - now = usecTimestampNow(); - uint64_t elapsed = now - _lastSendTime; - int usecToSleep = INTERVAL_SLEEP_USECS - elapsed; - - // we only sleep in non-threaded mode - if (usecToSleep > 0) { - if (usecToSleep > INTERVAL_SLEEP_USECS) { - usecToSleep = INTERVAL_SLEEP_USECS; - } - usleep(usecToSleep); - hasSlept = true; - } - } - - } else { - // in non-threaded mode, we send as many packets as we need per expected call to process() - keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0); - } - - // if threaded and we haven't slept? We want to sleep.... - if (isThreaded() && !hasSlept) { - now = usecTimestampNow(); - uint64_t elapsed = now - _lastSendTime; - int usecToSleep = INTERVAL_SLEEP_USECS - elapsed; - if (usecToSleep > 0) { - if (usecToSleep > INTERVAL_SLEEP_USECS) { - usecToSleep = INTERVAL_SLEEP_USECS; - } - usleep(usecToSleep); - } - } - _lastSendTime = now; } return isStillRunning(); -} +} \ No newline at end of file diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 9197c3c7d7..6ca6e19be2 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -13,6 +13,7 @@ #include "GenericThread.h" #include "NetworkPacket.h" +#include "SharedUtil.h" /// Notification Hook for packets being sent by a PacketSender class PacketSenderNotify { @@ -24,8 +25,15 @@ public: /// Generalized threaded processor for queueing and sending of outbound packets. class PacketSender : public virtual GenericThread { public: + + static const uint64_t USECS_PER_SECOND; + static const uint64_t SENDING_INTERVAL_ADJUST; + static const int TARGET_FPS; + static const int MAX_SLEEP_INTERVAL; + static const int DEFAULT_PACKETS_PER_SECOND; static const int MINIMUM_PACKETS_PER_SECOND; + static const int MINIMAL_SLEEP_INTERVAL; PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND); @@ -36,7 +44,12 @@ public: /// \thread any thread, typically the application thread void queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength); - void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::max(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); } + void setPacketsPerSecond(int packetsPerSecond) { + _packetsPerSecond = std::max(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); + if (!isThreaded()) { + printf("setPacketsPerSecond()... this=%p _packetsPerSecond=%d\n", this, _packetsPerSecond); + } + } int getPacketsPerSecond() const { return _packetsPerSecond; } void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; } @@ -55,6 +68,40 @@ public: /// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode. void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; } + /// returns the packets per second send rate of this object over its lifetime + float getLifetimePPS() const + { return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalPacketsSent / getLifetimeInSeconds()); } + + /// returns the bytes per second send rate of this object over its lifetime + float getLifetimeBPS() const + { return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalBytesSent / getLifetimeInSeconds()); } + + /// returns the packets per second queued rate of this object over its lifetime + float getLifetimePPSQueued() const + { return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalPacketsQueued / getLifetimeInSeconds()); } + + /// returns the bytes per second queued rate of this object over its lifetime + float getLifetimeBPSQueued() const + { return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalBytesQueued / getLifetimeInSeconds()); } + + /// returns lifetime of this object from first packet sent to now in usecs + uint64_t getLifetimeInUsecs() const { return (usecTimestampNow() - _started); } + + /// returns lifetime of this object from first packet sent to now in usecs + float getLifetimeInSeconds() const { return ((float)getLifetimeInUsecs() / (float)USECS_PER_SECOND); } + + /// returns the total packets sent by this object over its lifetime + uint64_t getLifetimePacketsSent() const { return _totalPacketsSent; } + + /// returns the total bytes sent by this object over its lifetime + uint64_t getLifetimeBytesSent() const { return _totalBytesSent; } + + /// returns the total packets queued by this object over its lifetime + uint64_t getLifetimePacketsQueued() const { return _totalPacketsQueued; } + + /// returns the total bytes queued by this object over its lifetime + uint64_t getLifetimeBytesQueued() const { return _totalBytesQueued; } + protected: int _packetsPerSecond; int _usecsPerProcessCallHint; @@ -65,6 +112,19 @@ private: std::vector _packets; uint64_t _lastSendTime; PacketSenderNotify* _notify; + + bool threadedProcess(); + bool nonThreadedProcess(); + + uint64_t _lastPPSCheck; + int _packetsOverCheckInterval; + + uint64_t _started; + uint64_t _totalPacketsSent; + uint64_t _totalBytesSent; + + uint64_t _totalPacketsQueued; + uint64_t _totalBytesQueued; }; #endif // __shared__PacketSender__ diff --git a/libraries/voxels/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp index 125d80e61a..cc9f25133b 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -110,6 +110,31 @@ void VoxelEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned ch if (nodeList->getNodeActiveSocketOrPing(&(*node))) { sockaddr* nodeAddress = node->getActiveSocket(); queuePacketForSending(*nodeAddress, buffer, length); + + // debugging output... + bool wantDebugging = false; + if (wantDebugging) { + int numBytesPacketHeader = numBytesForPacketHeader(buffer); + unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader))); + uint64_t createdAt = (*((uint64_t*)(buffer + numBytesPacketHeader + sizeof(sequence)))); + uint64_t queuedAt = usecTimestampNow(); + uint64_t transitTime = queuedAt - createdAt; + + const char* messageName; + switch (buffer[0]) { + case PACKET_TYPE_SET_VOXEL: + messageName = "PACKET_TYPE_SET_VOXEL"; + break; + case PACKET_TYPE_SET_VOXEL_DESTRUCTIVE: + messageName = "PACKET_TYPE_SET_VOXEL_DESTRUCTIVE"; + break; + case PACKET_TYPE_ERASE_VOXEL: + messageName = "PACKET_TYPE_ERASE_VOXEL"; + break; + } + printf("VoxelEditPacketSender::queuePacketToNode() queued %s - command to node bytes=%ld sequence=%d transitTimeSoFar=%llu usecs\n", + messageName, length, sequence, transitTime); + } } } } @@ -267,7 +292,9 @@ void VoxelEditPacketSender::releaseQueuedMessages() { } void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { - queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); + if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PACKET_TYPE_UNKNOWN) { + queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); + } packetBuffer._currentSize = 0; packetBuffer._currentType = PACKET_TYPE_UNKNOWN; }