improvements to PPS behavior of PacketSender

This commit is contained in:
ZappoMan 2013-11-10 10:39:56 -08:00
parent e47abf2f4c
commit 1a6efea5c0
4 changed files with 521 additions and 142 deletions
animation-server/src
libraries

View file

@ -41,13 +41,20 @@ bool includeBlinkingVoxel = false;
bool includeDanceFloor = true;
bool buildStreet = false;
bool nonThreadedPacketSender = false;
int packetsPerSecond = PacketSender::DEFAULT_PACKETS_PER_SECOND;
bool waitForVoxelServer = true;
const int ANIMATION_LISTEN_PORT = 40107;
const int ACTUAL_FPS = 60;
const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS
const int FUDGE_USECS = 10; // a little bit of fudge to actually do some processing
const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs
int ANIMATE_FPS = 60;
double ANIMATE_FPS_IN_MILLISECONDS = 1000.0/ANIMATE_FPS; // determines FPS from our desired FPS
int ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs
int PROCESSING_FPS = 60;
double PROCESSING_FPS_IN_MILLISECONDS = 1000.0/PROCESSING_FPS; // determines FPS from our desired FPS
int FUDGE_USECS = 650; // a little bit of fudge to actually do some processing
int PROCESSING_INTERVAL_USECS = (PROCESSING_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs
bool wantLocalDomain = false;
@ -361,7 +368,7 @@ unsigned char danceFloorOnColorB[DANCE_FLOOR_COLORS][3] = {
float danceFloorGradient = 0.5f;
const float BEATS_PER_MINUTE = 118.0f;
const float SECONDS_PER_MINUTE = 60.0f;
const float FRAMES_PER_BEAT = (SECONDS_PER_MINUTE * ACTUAL_FPS) / BEATS_PER_MINUTE;
const float FRAMES_PER_BEAT = (SECONDS_PER_MINUTE * ANIMATE_FPS) / BEATS_PER_MINUTE;
float danceFloorGradientIncrement = 1.0f / FRAMES_PER_BEAT;
const float DANCE_FLOOR_MAX_GRADIENT = 1.0f;
const float DANCE_FLOOR_MIN_GRADIENT = 0.0f;
@ -591,72 +598,119 @@ double start = 0;
void* animateVoxels(void* args) {
timeval lastSendTime;
uint64_t lastAnimateTime = 0;
uint64_t lastProcessTime = 0;
int processesPerAnimate = 0;
bool firstTime = true;
std::cout << "Setting PPS to " << ::packetsPerSecond << "\n";
::voxelEditPacketSender->setPacketsPerSecond(::packetsPerSecond);
std::cout << "PPS set to " << ::voxelEditPacketSender->getPacketsPerSecond() << "\n";
while (true) {
gettimeofday(&lastSendTime, NULL);
// If we're asked to wait for voxel servers, and there isn't one available yet, then
// let the voxelEditPacketSender process and move on.
if (::waitForVoxelServer && !::voxelEditPacketSender->voxelServersExist()) {
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->process();
}
} else {
if (firstTime) {
lastAnimateTime = usecTimestampNow();
firstTime = false;
}
lastProcessTime = usecTimestampNow();
int packetsStarting = 0;
int packetsEnding = 0;
int packetsStarting = ::voxelEditPacketSender->packetsToSendCount();
// The while loop will be running at PROCESSING_FPS, but we only want to call these animation functions at
// ANIMATE_FPS. So we check out last animate time and only call these if we've elapsed that time.
uint64_t now = usecTimestampNow();
uint64_t animationElapsed = now - lastAnimateTime;
int withinAnimationTarget = ANIMATE_VOXELS_INTERVAL_USECS - animationElapsed;
// some animations
//sendVoxelBlinkMessage();
int animateLoopsPerAnimate = 0;
while (withinAnimationTarget < 2000) {
processesPerAnimate = 0;
animateLoopsPerAnimate++;
lastAnimateTime = now;
packetsStarting = ::voxelEditPacketSender->packetsToSendCount();
// some animations
//sendVoxelBlinkMessage();
if (::includeBillboard) {
sendBillboard();
}
if (::includeBorderTracer) {
sendBlinkingStringOfLights();
}
if (::includeMovingBug) {
renderMovingBug();
}
if (::includeBlinkingVoxel) {
sendVoxelBlinkMessage();
}
if (::includeDanceFloor) {
sendDanceFloor();
}
if (::includeBillboard) {
sendBillboard();
}
if (::includeBorderTracer) {
sendBlinkingStringOfLights();
}
if (::includeMovingBug) {
renderMovingBug();
}
if (::includeBlinkingVoxel) {
sendVoxelBlinkMessage();
}
if (::includeDanceFloor) {
sendDanceFloor();
}
if (::buildStreet) {
doBuildStreet();
}
if (::buildStreet) {
doBuildStreet();
}
::voxelEditPacketSender->releaseQueuedMessages();
int packetsEnding = ::voxelEditPacketSender->packetsToSendCount();
packetsEnding = ::voxelEditPacketSender->packetsToSendCount();
if (animationElapsed > ANIMATE_VOXELS_INTERVAL_USECS) {
animationElapsed -= ANIMATE_VOXELS_INTERVAL_USECS; // credit ourselves one animation frame
} else {
animationElapsed = 0;
}
withinAnimationTarget = ANIMATE_VOXELS_INTERVAL_USECS - animationElapsed;
::voxelEditPacketSender->releaseQueuedMessages();
}
if (firstTime) {
int packetsPerSecond = std::max(ACTUAL_FPS, (packetsEnding - packetsStarting) * (ACTUAL_FPS));
std::cout << "Setting PPS to " << packetsPerSecond << "\n";
::voxelEditPacketSender->setPacketsPerSecond(packetsPerSecond);
firstTime = false;
}
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->process();
}
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->process();
}
processesPerAnimate++;
uint64_t end = usecTimestampNow();
uint64_t elapsedSeconds = (end - ::start) / 1000000;
if (::shouldShowPacketsPerSecond) {
printf("packetsSent=%ld, bytesSent=%ld pps=%f bps=%f\n",packetsSent,bytesSent,
(float)(packetsSent/elapsedSeconds),(float)(bytesSent/elapsedSeconds));
if (::shouldShowPacketsPerSecond) {
float lifetimeSeconds = ::voxelEditPacketSender->getLifetimeInSeconds();
int targetPPS = ::voxelEditPacketSender->getPacketsPerSecond();
float lifetimePPS = ::voxelEditPacketSender->getLifetimePPS();
float lifetimeBPS = ::voxelEditPacketSender->getLifetimeBPS();
uint64_t totalPacketsSent = ::voxelEditPacketSender->getLifetimePacketsSent();
uint64_t totalBytesSent = ::voxelEditPacketSender->getLifetimeBytesSent();
float lifetimePPSQueued = ::voxelEditPacketSender->getLifetimePPSQueued();
float lifetimeBPSQueued = ::voxelEditPacketSender->getLifetimeBPSQueued();
uint64_t totalPacketsQueued = ::voxelEditPacketSender->getLifetimePacketsQueued();
uint64_t totalBytesQueued = ::voxelEditPacketSender->getLifetimeBytesQueued();
uint64_t packetsPending = ::voxelEditPacketSender->packetsToSendCount();
printf("lifetime=%f secs packetsSent=%lld, bytesSent=%lld targetPPS=%d pps=%f bps=%f\n",
lifetimeSeconds, totalPacketsSent, totalBytesSent, targetPPS, lifetimePPS, lifetimeBPS);
printf("packetsPending=%lld packetsQueued=%lld, bytesQueued=%lld ppsQueued=%f bpsQueued=%f\n",
packetsPending, totalPacketsQueued, totalBytesQueued, lifetimePPSQueued, lifetimeBPSQueued);
}
}
// dynamically sleep until we need to fire off the next set of voxels
uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime));
if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) {
usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS;
uint64_t usecToSleep = PROCESSING_INTERVAL_USECS - (usecTimestampNow() - lastProcessTime);
if (usecToSleep > PROCESSING_INTERVAL_USECS) {
usecToSleep = PROCESSING_INTERVAL_USECS;
}
if (usecToSleep > 0) {
usleep(usecToSleep);
} else {
std::cout << "Last send took too much time, not sleeping!\n";
}
}
@ -718,6 +772,49 @@ int main(int argc, const char * argv[])
NodeList::getInstance()->setDomainHostname(domainHostname);
}
const char* packetsPerSecondCommand = getCmdOption(argc, argv, "--pps");
if (packetsPerSecondCommand) {
::packetsPerSecond = atoi(packetsPerSecondCommand);
}
printf("packetsPerSecond=%d\n",packetsPerSecond);
const char* animateFPSCommand = getCmdOption(argc, argv, "--AnimateFPS");
const char* animateIntervalCommand = getCmdOption(argc, argv, "--AnimateInterval");
if (animateFPSCommand || animateIntervalCommand) {
if (animateIntervalCommand) {
::ANIMATE_FPS_IN_MILLISECONDS = atoi(animateIntervalCommand);
::ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs
::ANIMATE_FPS = PacketSender::USECS_PER_SECOND / ::ANIMATE_VOXELS_INTERVAL_USECS;
} else {
::ANIMATE_FPS = atoi(animateFPSCommand);
::ANIMATE_FPS_IN_MILLISECONDS = 1000.0/ANIMATE_FPS; // determines FPS from our desired FPS
::ANIMATE_VOXELS_INTERVAL_USECS = (ANIMATE_FPS_IN_MILLISECONDS * 1000.0); // converts from milliseconds to usecs
}
}
printf("ANIMATE_FPS=%d\n",ANIMATE_FPS);
printf("ANIMATE_VOXELS_INTERVAL_USECS=%d\n",ANIMATE_VOXELS_INTERVAL_USECS);
const char* processingFPSCommand = getCmdOption(argc, argv, "--ProcessingFPS");
const char* processingIntervalCommand = getCmdOption(argc, argv, "--ProcessingInterval");
if (processingFPSCommand || processingIntervalCommand) {
if (processingIntervalCommand) {
::PROCESSING_FPS_IN_MILLISECONDS = atoi(processingIntervalCommand);
::PROCESSING_INTERVAL_USECS = ::PROCESSING_FPS_IN_MILLISECONDS * 1000.0;
::PROCESSING_FPS = PacketSender::USECS_PER_SECOND / ::PROCESSING_INTERVAL_USECS;
} else {
::PROCESSING_FPS = atoi(processingFPSCommand);
::PROCESSING_FPS_IN_MILLISECONDS = 1000.0/PROCESSING_FPS; // determines FPS from our desired FPS
::PROCESSING_INTERVAL_USECS = (PROCESSING_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs
}
}
printf("PROCESSING_FPS=%d\n",PROCESSING_FPS);
printf("PROCESSING_INTERVAL_USECS=%d\n",PROCESSING_INTERVAL_USECS);
if (cmdOptionExists(argc, argv, "--quickExit")) {
return 0;
}
nodeList->linkedDataCreateCallback = NULL; // do we need a callback?
nodeList->startSilentNodeRemovalThread();
@ -735,7 +832,7 @@ int main(int argc, const char * argv[])
::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions());
}
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS);
::voxelEditPacketSender->setProcessCallIntervalHint(PROCESSING_INTERVAL_USECS);
}
srand((unsigned)time(0));

View file

@ -15,18 +15,31 @@
#include "PacketSender.h"
#include "SharedUtil.h"
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
const uint64_t PacketSender::USECS_PER_SECOND = 1000 * 1000;
const uint64_t PacketSender::SENDING_INTERVAL_ADJUST = 200; // approaximate 200us
const int PacketSender::TARGET_FPS = 60;
const int PacketSender::MAX_SLEEP_INTERVAL = PacketSender::USECS_PER_SECOND;
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 30;
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
const int PacketSender::MINIMAL_SLEEP_INTERVAL = (USECS_PER_SECOND / TARGET_FPS) / 2;
const int AVERAGE_CALL_TIME_SAMPLES = 10;
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
_packetsPerSecond(packetsPerSecond),
_usecsPerProcessCallHint(0),
_lastProcessCallTime(usecTimestampNow()),
_lastProcessCallTime(0),
_averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES),
_lastSendTime(usecTimestampNow()),
_notify(notify)
_lastSendTime(0), // Note: we set this to 0 to indicate we haven't yet sent something
_notify(notify),
_lastPPSCheck(0),
_packetsOverCheckInterval(0),
_started(usecTimestampNow()),
_totalPacketsSent(0),
_totalBytesSent(0),
_totalPacketsQueued(0),
_totalBytesQueued(0)
{
}
@ -36,66 +49,281 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe
lock();
_packets.push_back(packet);
unlock();
_totalPacketsQueued++;
_totalBytesQueued += packetLength;
}
bool PacketSender::process() {
if (isThreaded()) {
return threadedProcess();
}
return nonThreadedProcess();
}
bool PacketSender::threadedProcess() {
bool hasSlept = false;
uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t USECS_SMALL_ADJUST = 2 * 1000; // approaximate 2ms
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
uint64_t INTERVAL_SLEEP_USECS = (SEND_INTERVAL_USECS > USECS_SMALL_ADJUST) ?
SEND_INTERVAL_USECS - USECS_SMALL_ADJUST : SEND_INTERVAL_USECS;
// keep track of our process call times, so we have a reliable account of how often our caller calls us
uint64_t now = usecTimestampNow();
uint64_t elapsedSinceLastCall = now - _lastProcessCallTime;
_lastProcessCallTime = now;
_averageProcessCallTime.updateAverage(elapsedSinceLastCall);
if (_lastSendTime == 0) {
_lastSendTime = usecTimestampNow();
}
if (_packets.size() == 0) {
if (isThreaded()) {
usleep(INTERVAL_SLEEP_USECS);
// in threaded mode, we keep running and just empty our packet queue sleeping enough to keep our PPS on target
while (_packets.size() > 0) {
// Recalculate our SEND_INTERVAL_USECS each time, in case the caller has changed it on us..
int packetsPerSecondTarget = (_packetsPerSecond > MINIMUM_PACKETS_PER_SECOND)
? _packetsPerSecond : MINIMUM_PACKETS_PER_SECOND;
uint64_t intervalBetweenSends = USECS_PER_SECOND / packetsPerSecondTarget;
uint64_t sleepInterval = (intervalBetweenSends > SENDING_INTERVAL_ADJUST) ?
intervalBetweenSends - SENDING_INTERVAL_ADJUST : intervalBetweenSends;
// We'll sleep before we send, this way, we can set our last send time to be our ACTUAL last send time
uint64_t now = usecTimestampNow();
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = sleepInterval - elapsed;
// If we've never sent, or it's been a long time since we sent, then our elapsed time will be quite large
// and therefore usecToSleep will be less than 0 and we won't sleep before sending...
if (usecToSleep > 0) {
if (usecToSleep > MAX_SLEEP_INTERVAL) {
usecToSleep = MAX_SLEEP_INTERVAL;
}
usleep(usecToSleep);
hasSlept = true;
} else {
// in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us
return isStillRunning();
}
// call our non-threaded version of ourselves
bool keepRunning = nonThreadedProcess();
if (!keepRunning) {
break;
}
}
int packetsPerCall = _packets.size(); // in threaded mode, we just empty this!
int packetsThisCall = 0;
// if threaded and we haven't slept? We want to sleep a little so we don't hog the CPU, but
// we don't want to sleep too long because how ever much we sleep will delay any future unsent
// packets that arrive while we're sleeping. So we sleep 1/2 of our target fps interval
if (!hasSlept) {
usleep(MINIMAL_SLEEP_INTERVAL);
}
return isStillRunning();
}
// We may be called more frequently than we get packets or need to send packets, we may also get called less frequently.
//
// If we're called more often then out target PPS then we will space our our actual sends to be a single packet for multiple
// calls to process. Those calls to proces in which we do not need to send a packet to keep up with our target PPS we will
// just track our call rate (in order to predict our sends per call) but we won't actually send any packets.
//
// When we are called less frequently than we have packets to send, we will send enough packets per call to keep up with our
// target PPS.
bool PacketSender::nonThreadedProcess() {
uint64_t now = usecTimestampNow();
if (_lastProcessCallTime == 0) {
_lastProcessCallTime = now - _usecsPerProcessCallHint;
}
const uint64_t MINIMUM_POSSIBLE_CALL_TIME = 10; // in usecs
const uint64_t USECS_PER_SECOND = 1000 * 1000;
const float ZERO_RESET_CALLS_PER_SECOND = 1; // used in guard against divide by zero
// if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process
bool wantDebugging = false;
if (wantDebugging) {
printf("\n\nPacketSender::nonThreadedProcess() _packets.size()=%ld\n",_packets.size());
}
// keep track of our process call times, so we have a reliable account of how often our caller calls us
uint64_t elapsedSinceLastCall = now - _lastProcessCallTime;
_lastProcessCallTime = now;
_averageProcessCallTime.updateAverage(elapsedSinceLastCall);
float averageCallTime = 0;
const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2;
if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) {
averageCallTime = _averageProcessCallTime.getAverage();
if (wantDebugging) {
printf("averageCallTime = _averageProcessCallTime.getAverage() =%f\n",averageCallTime);
}
} else {
averageCallTime = _usecsPerProcessCallHint;
if (wantDebugging) {
printf("averageCallTime = _usecsPerProcessCallHint =%f\n",averageCallTime);
}
}
if (wantDebugging) {
printf("elapsedSinceLastCall=%llu averageCallTime=%f\n",elapsedSinceLastCall, averageCallTime);
}
if (_packets.size() == 0) {
// in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us
return isStillRunning();
}
// This only happens once, the first time we get this far... so we can use it as an accurate initialization
// point for these important timing variables
if (_lastPPSCheck == 0) {
_lastPPSCheck = now;
_started = now - (uint64_t)averageCallTime;
}
float averagePacketsPerCall = 0; // might be less than 1, if our caller calls us more frequently than the target PPS
int packetsSentThisCall = 0;
int packetsToSendThisCall = 0;
// Since we're in non-threaded mode, we need to determine how many packets to send per call to process
// based on how often we get called... We do this by keeping a running average of our call times, and we determine
// how many packets to send per call
if (!isThreaded()) {
int averageCallTime;
const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2;
if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) {
averageCallTime = _averageProcessCallTime.getAverage();
} else {
averageCallTime = _usecsPerProcessCallHint;
// We assume you can't possibly call us less than MINIMUM_POSSIBLE_CALL_TIME apart
if (averageCallTime <= 0) {
averageCallTime = MINIMUM_POSSIBLE_CALL_TIME;
}
// we can determine how many packets we need to send per call to achieve our desired
// packets per second send rate.
float callsPerSecond = USECS_PER_SECOND / averageCallTime;
if (wantDebugging) {
printf("PacketSender::process() USECS_PER_SECOND=%llu averageCallTime=%f callsPerSecond=%f\n",
USECS_PER_SECOND, averageCallTime, callsPerSecond );
}
// theoretically we could get called less than 1 time per second... but since we're using floats, it really shouldn't be
// possible to get 0 calls per second, but we will guard agains that here, just in case.
if (callsPerSecond == 0) {
callsPerSecond = ZERO_RESET_CALLS_PER_SECOND;
qDebug("PacketSender::nonThreadedProcess() UNEXPECTED:callsPerSecond==0, assuming ZERO_RESET_CALLS_PER_SECOND\n");
}
// This is the average number of packets per call...
averagePacketsPerCall = _packetsPerSecond / callsPerSecond;
packetsToSendThisCall = averagePacketsPerCall;
if (wantDebugging) {
printf("PacketSender::process() averageCallTime=%f averagePacketsPerCall=%f packetsToSendThisCall=%d\n",
averageCallTime, averagePacketsPerCall, packetsToSendThisCall);
}
// if we get called more than 1 per second, we want to mostly divide the packets evenly across the calls...
// but we want to track the remainder and make sure over the course of a second, we are sending the target PPS
// e.g.
// 200pps called 60 times per second...
// 200/60 = 3.333... so really...
// each call we should send 3
// every 3rd call we should send 4...
// 3,3,4,3,3,4...3,3,4 = 200...
// if we get called less than 1 per second, then we want to send more than our PPS each time...
// e.g.
// 200pps called ever 1332.5ms
// 200 / (1000/1332.5) = 200/(0.7505) = 266.5 packets per call
// so...
// every other call we should send 266 packets
// then on the next call we should send 267 packets
// So no mater whether or not we're getting called more or less than once per second, we still need to do some bookkeeping
// to make sure we send a few extra packets to even out our flow rate.
//
// So how do we do that reasonably?
//
// keep a _lastPPSCheck
// keep a _packetsSinceLastPPSCheck
//
// check elapsed since _lastTimeCheck
uint64_t elapsedSinceLastCheck = now - _lastPPSCheck;
// if we get called more than once per second then check our PPS each time elapsedSinceLastPPSCheck > 1 second
// if we get called less than once per second, then check out PPS over ever 2 calls to process
const float CALL_INTERVALS_TO_CHECK = 1;
const float MIN_CALL_INTERVALS_PER_RESET = 5;
float callIntervalsPerReset = fmax(callsPerSecond, MIN_CALL_INTERVALS_PER_RESET);
if (wantDebugging) {
printf("about to check interval... callsPerSecond=%f elapsedSinceLastPPSCheck=%llu checkInterval=%f\n",
callsPerSecond, elapsedSinceLastCheck, (averageCallTime * CALL_INTERVALS_TO_CHECK));
}
// ((callsPerSecond > 1 && elapsedSinceLastPPSCheck > USECS_PER_SECOND)
if (elapsedSinceLastCheck > (averageCallTime * CALL_INTERVALS_TO_CHECK)) {
if (wantDebugging) {
printf(">>>>>>>>>>>>>>> check interval... _packetsOverCheckInterval=%d elapsedSinceLastPPSCheck=%llu\n",
_packetsOverCheckInterval, elapsedSinceLastCheck);
}
// we can determine how many packets we need to send per call to achieve our desired
// packets per second send rate.
int callsPerSecond = USECS_PER_SECOND / averageCallTime;
float ppsOverCheckInterval = (float)_packetsOverCheckInterval;
float ppsExpectedForCheckInterval = (float)_packetsPerSecond * ((float)elapsedSinceLastCheck / (float)USECS_PER_SECOND);
if (wantDebugging) {
printf(">>>>>>>>>>>>>>> check interval... ppsOverCheckInterval=%f ppsExpectedForCheckInterval=%f\n",
ppsOverCheckInterval, ppsExpectedForCheckInterval);
}
if (ppsOverCheckInterval < ppsExpectedForCheckInterval) {
int adjust = ppsExpectedForCheckInterval - ppsOverCheckInterval;
packetsToSendThisCall += adjust;
if (wantDebugging) {
qDebug("Lower pps [%f] than expected [%f] over check interval [%llu], adjusting UP by %d.\n",
ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck, adjust);
}
} else if (ppsOverCheckInterval > ppsExpectedForCheckInterval) {
int adjust = ppsOverCheckInterval - ppsExpectedForCheckInterval;
packetsToSendThisCall -= adjust;
if (wantDebugging) {
qDebug("Higher pps [%f] than expected [%f] over check interval [%llu], adjusting DOWN by %d.\n",
ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck, adjust);
}
} else {
if (wantDebugging) {
qDebug("pps [%f] is expected [%f] over check interval [%llu], NO ADJUSTMENT.\n",
ppsOverCheckInterval, ppsExpectedForCheckInterval, elapsedSinceLastCheck);
}
}
// make sure our number of calls per second doesn't cause a divide by zero
callsPerSecond = glm::clamp(callsPerSecond, 1, _packetsPerSecond);
// now, do we want to adjust the check interval? don't want to completely reset, because we would still have
// a rounding error. instead, we check to see that we've passed the reset interval (which is much larger than
// the check interval.
if (wantDebugging) {
printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> Should we reset? callsPerSecond=%f elapsedSinceLastPPSCheck=%llu resetInterval=%f\n",
callsPerSecond, elapsedSinceLastCheck, (averageCallTime * callIntervalsPerReset));
}
packetsPerCall = ceil(_packetsPerSecond / callsPerSecond);
// send at least one packet per call, if we have it
if (packetsPerCall < 1) {
packetsPerCall = 1;
if (elapsedSinceLastCheck > (averageCallTime * callIntervalsPerReset)) {
if (wantDebugging) {
printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> elapsedSinceLastCheck/2=%llu _packetsOverCheckInterval/2=%d\n",
(elapsedSinceLastCheck/2), (_packetsOverCheckInterval/2));
}
_lastPPSCheck += (elapsedSinceLastCheck/2);
_packetsOverCheckInterval = (_packetsOverCheckInterval/2);
elapsedSinceLastCheck = now - _lastPPSCheck;
if (wantDebugging) {
printf(">>>>>>>>>>> RESET >>>>>>>>>>>>>>> NEW _lastPPSCheck=%llu elapsedSinceLastCheck=%llu _packetsOverCheckInterval=%d\n",
_lastPPSCheck, elapsedSinceLastCheck, _packetsOverCheckInterval);
}
}
}
int packetsLeft = _packets.size();
bool keepGoing = packetsLeft > 0;
while (keepGoing) {
if (wantDebugging) {
printf("packetsSentThisCall=%d packetsToSendThisCall=%d packetsLeft=%d\n",
packetsSentThisCall, packetsToSendThisCall, packetsLeft);
}
while ((packetsSentThisCall < packetsToSendThisCall) && (packetsLeft > 0)) {
lock();
NetworkPacket& packet = _packets.front();
NetworkPacket temporary = packet; // make a copy
@ -107,52 +335,19 @@ bool PacketSender::process() {
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
nodeSocket->send(&temporary.getAddress(), temporary.getData(), temporary.getLength());
packetsThisCall++;
packetsSentThisCall++;
_packetsOverCheckInterval++;
_totalPacketsSent++;
_totalBytesSent += temporary.getLength();
if (wantDebugging) {
printf("nodeSocket->send()... packetsSentThisCall=%d _packetsOverCheckInterval=%d\n",
packetsSentThisCall, _packetsOverCheckInterval);
}
if (_notify) {
_notify->packetSentNotification(temporary.getLength());
}
// in threaded mode, we go till we're empty
if (isThreaded()) {
keepGoing = packetsLeft > 0;
// dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
if (keepGoing) {
now = usecTimestampNow();
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = INTERVAL_SLEEP_USECS - elapsed;
// we only sleep in non-threaded mode
if (usecToSleep > 0) {
if (usecToSleep > INTERVAL_SLEEP_USECS) {
usecToSleep = INTERVAL_SLEEP_USECS;
}
usleep(usecToSleep);
hasSlept = true;
}
}
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0);
}
// if threaded and we haven't slept? We want to sleep....
if (isThreaded() && !hasSlept) {
now = usecTimestampNow();
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = INTERVAL_SLEEP_USECS - elapsed;
if (usecToSleep > 0) {
if (usecToSleep > INTERVAL_SLEEP_USECS) {
usecToSleep = INTERVAL_SLEEP_USECS;
}
usleep(usecToSleep);
}
}
_lastSendTime = now;
}
return isStillRunning();
}
}

View file

@ -13,6 +13,7 @@
#include "GenericThread.h"
#include "NetworkPacket.h"
#include "SharedUtil.h"
/// Notification Hook for packets being sent by a PacketSender
class PacketSenderNotify {
@ -24,8 +25,15 @@ public:
/// Generalized threaded processor for queueing and sending of outbound packets.
class PacketSender : public virtual GenericThread {
public:
static const uint64_t USECS_PER_SECOND;
static const uint64_t SENDING_INTERVAL_ADJUST;
static const int TARGET_FPS;
static const int MAX_SLEEP_INTERVAL;
static const int DEFAULT_PACKETS_PER_SECOND;
static const int MINIMUM_PACKETS_PER_SECOND;
static const int MINIMAL_SLEEP_INTERVAL;
PacketSender(PacketSenderNotify* notify = NULL, int packetsPerSecond = DEFAULT_PACKETS_PER_SECOND);
@ -36,7 +44,12 @@ public:
/// \thread any thread, typically the application thread
void queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength);
void setPacketsPerSecond(int packetsPerSecond) { _packetsPerSecond = std::max(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond); }
void setPacketsPerSecond(int packetsPerSecond) {
_packetsPerSecond = std::max(MINIMUM_PACKETS_PER_SECOND, packetsPerSecond);
if (!isThreaded()) {
printf("setPacketsPerSecond()... this=%p _packetsPerSecond=%d\n", this, _packetsPerSecond);
}
}
int getPacketsPerSecond() const { return _packetsPerSecond; }
void setPacketSenderNotify(PacketSenderNotify* notify) { _notify = notify; }
@ -55,6 +68,40 @@ public:
/// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode.
void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; }
/// returns the packets per second send rate of this object over its lifetime
float getLifetimePPS() const
{ return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalPacketsSent / getLifetimeInSeconds()); }
/// returns the bytes per second send rate of this object over its lifetime
float getLifetimeBPS() const
{ return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalBytesSent / getLifetimeInSeconds()); }
/// returns the packets per second queued rate of this object over its lifetime
float getLifetimePPSQueued() const
{ return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalPacketsQueued / getLifetimeInSeconds()); }
/// returns the bytes per second queued rate of this object over its lifetime
float getLifetimeBPSQueued() const
{ return getLifetimeInSeconds() == 0 ? 0 : (float)((float)_totalBytesQueued / getLifetimeInSeconds()); }
/// returns lifetime of this object from first packet sent to now in usecs
uint64_t getLifetimeInUsecs() const { return (usecTimestampNow() - _started); }
/// returns lifetime of this object from first packet sent to now in usecs
float getLifetimeInSeconds() const { return ((float)getLifetimeInUsecs() / (float)USECS_PER_SECOND); }
/// returns the total packets sent by this object over its lifetime
uint64_t getLifetimePacketsSent() const { return _totalPacketsSent; }
/// returns the total bytes sent by this object over its lifetime
uint64_t getLifetimeBytesSent() const { return _totalBytesSent; }
/// returns the total packets queued by this object over its lifetime
uint64_t getLifetimePacketsQueued() const { return _totalPacketsQueued; }
/// returns the total bytes queued by this object over its lifetime
uint64_t getLifetimeBytesQueued() const { return _totalBytesQueued; }
protected:
int _packetsPerSecond;
int _usecsPerProcessCallHint;
@ -65,6 +112,19 @@ private:
std::vector<NetworkPacket> _packets;
uint64_t _lastSendTime;
PacketSenderNotify* _notify;
bool threadedProcess();
bool nonThreadedProcess();
uint64_t _lastPPSCheck;
int _packetsOverCheckInterval;
uint64_t _started;
uint64_t _totalPacketsSent;
uint64_t _totalBytesSent;
uint64_t _totalPacketsQueued;
uint64_t _totalBytesQueued;
};
#endif // __shared__PacketSender__

View file

@ -110,6 +110,31 @@ void VoxelEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned ch
if (nodeList->getNodeActiveSocketOrPing(&(*node))) {
sockaddr* nodeAddress = node->getActiveSocket();
queuePacketForSending(*nodeAddress, buffer, length);
// debugging output...
bool wantDebugging = false;
if (wantDebugging) {
int numBytesPacketHeader = numBytesForPacketHeader(buffer);
unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader)));
uint64_t createdAt = (*((uint64_t*)(buffer + numBytesPacketHeader + sizeof(sequence))));
uint64_t queuedAt = usecTimestampNow();
uint64_t transitTime = queuedAt - createdAt;
const char* messageName;
switch (buffer[0]) {
case PACKET_TYPE_SET_VOXEL:
messageName = "PACKET_TYPE_SET_VOXEL";
break;
case PACKET_TYPE_SET_VOXEL_DESTRUCTIVE:
messageName = "PACKET_TYPE_SET_VOXEL_DESTRUCTIVE";
break;
case PACKET_TYPE_ERASE_VOXEL:
messageName = "PACKET_TYPE_ERASE_VOXEL";
break;
}
printf("VoxelEditPacketSender::queuePacketToNode() queued %s - command to node bytes=%ld sequence=%d transitTimeSoFar=%llu usecs\n",
messageName, length, sequence, transitTime);
}
}
}
}
@ -267,7 +292,9 @@ void VoxelEditPacketSender::releaseQueuedMessages() {
}
void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) {
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PACKET_TYPE_UNKNOWN) {
queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize);
}
packetBuffer._currentSize = 0;
packetBuffer._currentType = PACKET_TYPE_UNKNOWN;
}