implement support for process interval hints and handling sending only some packets per call to process in non-threaded mode

This commit is contained in:
ZappoMan 2013-10-02 20:39:58 -07:00
parent 4271fc5d41
commit 9d231a50b6
6 changed files with 102 additions and 66 deletions

View file

@ -6,6 +6,7 @@
// Copyright (c) 2012 High Fidelity, Inc. All rights reserved. // Copyright (c) 2012 High Fidelity, Inc. All rights reserved.
// //
#include <algorithm>
#include <cmath> #include <cmath>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
@ -39,12 +40,14 @@ bool includeMovingBug = true;
bool includeBlinkingVoxel = false; bool includeBlinkingVoxel = false;
bool includeDanceFloor = true; bool includeDanceFloor = true;
bool buildStreet = false; bool buildStreet = false;
bool nonThreadedPacketSender = false;
const int ANIMATION_LISTEN_PORT = 40107; const int ANIMATION_LISTEN_PORT = 40107;
const int ACTUAL_FPS = 60; const int ACTUAL_FPS = 60;
const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS
const int ANIMATE_VOXELS_INTERVAL_USECS = OUR_FPS_IN_MILLISECONDS * 1000.0; // converts from milliseconds to usecs const int FUDGE_USECS = 10.0; // a little bit of fudge to actually do some processing
const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs
bool wantLocalDomain = false; bool wantLocalDomain = false;
@ -619,6 +622,9 @@ void* animateVoxels(void* args) {
if (::voxelEditPacketSender) { if (::voxelEditPacketSender) {
::voxelEditPacketSender->releaseQueuedMessages(); ::voxelEditPacketSender->releaseQueuedMessages();
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->process();
}
} }
uint64_t end = usecTimestampNow(); uint64_t end = usecTimestampNow();
@ -629,6 +635,9 @@ void* animateVoxels(void* args) {
} }
// dynamically sleep until we need to fire off the next set of voxels // dynamically sleep until we need to fire off the next set of voxels
uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime));
if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) {
usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS;
}
if (usecToSleep > 0) { if (usecToSleep > 0) {
usleep(usecToSleep); usleep(usecToSleep);
@ -648,6 +657,11 @@ int main(int argc, const char * argv[])
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT); NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT);
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
// Handle Local Domain testing with the --local command line
const char* NON_THREADED_PACKETSENDER = "--NonThreadedPacketSender";
::nonThreadedPacketSender = cmdOptionExists(argc, argv, NON_THREADED_PACKETSENDER);
printf("nonThreadedPacketSender=%s\n", debug::valueOf(::nonThreadedPacketSender));
// Handle Local Domain testing with the --local command line // Handle Local Domain testing with the --local command line
const char* NO_BILLBOARD = "--NoBillboard"; const char* NO_BILLBOARD = "--NoBillboard";
::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD); ::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD);
@ -702,10 +716,13 @@ int main(int argc, const char * argv[])
// Create out VoxelEditPacketSender // Create out VoxelEditPacketSender
::voxelEditPacketSender = new VoxelEditPacketSender; ::voxelEditPacketSender = new VoxelEditPacketSender;
if (::voxelEditPacketSender) { if (::voxelEditPacketSender) {
::voxelEditPacketSender->initialize(true); ::voxelEditPacketSender->initialize(!::nonThreadedPacketSender);
if (::jurisdictionListener) { if (::jurisdictionListener) {
::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions()); ::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions());
} }
if (::nonThreadedPacketSender) {
::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS);
}
} }
srand((unsigned)time(0)); srand((unsigned)time(0));

View file

@ -149,9 +149,11 @@ void Agent::run() {
qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n"; qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n";
} }
// flush the queue of packets and then process them so they are all sent off // release the queue of edit voxel messages.
voxelScripter.getVoxelPacketSender()->releaseQueuedMessages(); voxelScripter.getVoxelPacketSender()->releaseQueuedMessages();
voxelScripter.getVoxelPacketSender()->processWithoutSleep();
// since we're in non-threaded mode, call process so that the packets are sent
voxelScripter.getVoxelPacketSender()->process();
} }
while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) { while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) {

View file

@ -42,13 +42,15 @@ protected:
void unlock() { pthread_mutex_unlock(&_mutex); } void unlock() { pthread_mutex_unlock(&_mutex); }
bool isStillRunning() const { return !_stopThread; } bool isStillRunning() const { return !_stopThread; }
bool isThreaded() const { return _isThreaded; }
private: private:
pthread_mutex_t _mutex; pthread_mutex_t _mutex;
bool _stopThread; bool _stopThread;
bool _isThreaded; bool _isThreaded;
pthread_t _thread; pthread_t _thread;
}; };
extern "C" void* GenericThreadEntry(void* arg); extern "C" void* GenericThreadEntry(void* arg);

View file

@ -8,6 +8,7 @@
// Threaded or non-threaded packet sender. // Threaded or non-threaded packet sender.
// //
#include <math.h>
#include <stdint.h> #include <stdint.h>
#include "NodeList.h" #include "NodeList.h"
@ -17,9 +18,13 @@
const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200;
const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1;
const int AVERAGE_CALL_TIME_SAMPLES = 10;
PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
_packetsPerSecond(packetsPerSecond), _packetsPerSecond(packetsPerSecond),
_usecsPerProcessCallHint(0),
_lastProcessCallTime(usecTimestampNow()),
_averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES),
_lastSendTime(usecTimestampNow()), _lastSendTime(usecTimestampNow()),
_notify(notify) _notify(notify)
{ {
@ -27,7 +32,6 @@ PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) :
void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
//printf("PacketSender::queuePacketForSending packetLength=%ld\n",packetLength);
NetworkPacket packet(address, packetData, packetLength); NetworkPacket packet(address, packetData, packetLength);
lock(); lock();
_packets.push_back(packet); _packets.push_back(packet);
@ -35,20 +39,67 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe
} }
bool PacketSender::process() { bool PacketSender::process() {
printf("PacketSender::process() _packets.size()=%ld\n",_packets.size());
uint64_t USECS_PER_SECOND = 1000 * 1000; uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
// keep track of our process call times, so we have a reliable account of how often our caller calls us
uint64_t now = usecTimestampNow();
uint64_t elapsedSinceLastCall = now - _lastProcessCallTime;
_lastProcessCallTime = now;
_averageProcessCallTime.updateAverage(elapsedSinceLastCall);
if (_packets.size() == 0) { if (_packets.size() == 0) {
usleep(SEND_INTERVAL_USECS); if (isThreaded()) {
usleep(SEND_INTERVAL_USECS);
} else {
return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us
}
} }
while (_packets.size() > 0) {
int packetsPerCall = _packets.size(); // in threaded mode, we just empty this!
int packetsThisCall = 0;
// if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process
// based on how often we get called... We do this by keeping a running average of our call times, and we determine
// how many packets to send per call
if (!isThreaded()) {
int averageCallTime;
const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2;
if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) {
averageCallTime = _averageProcessCallTime.getAverage();
} else {
averageCallTime = _usecsPerProcessCallHint;
}
// we can determine how many packets we need to send per call to achieve our desired
// packets per second send rate.
int callsPerSecond = USECS_PER_SECOND / averageCallTime;
packetsPerCall = ceil(_packetsPerSecond / callsPerSecond);
// send at least one packet per call, if we have it
if (packetsPerCall < 1) {
packetsPerCall = 1;
}
}
bool keepGoing = _packets.size() > 0;
while (keepGoing) {
// in threaded mode, we go till we're empty
if (isThreaded()) {
keepGoing = _packets.size() > 0;
} else {
// in non-threaded mode, we send as many packets as we need per expected call to process()
keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0);
}
NetworkPacket& packet = _packets.front(); NetworkPacket& packet = _packets.front();
// send the packet through the NodeList... // send the packet through the NodeList...
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
packetsThisCall++;
if (_notify) { if (_notify) {
_notify->packetSentNotification(packet.getLength()); _notify->packetSentNotification(packet.getLength());
@ -58,34 +109,18 @@ bool PacketSender::process() {
_packets.erase(_packets.begin()); _packets.erase(_packets.begin());
unlock(); unlock();
uint64_t now = usecTimestampNow(); // dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode
// dynamically sleep until we need to fire off the next set of voxels if (isThreaded()) {
uint64_t elapsed = now - _lastSendTime; uint64_t elapsed = now - _lastSendTime;
int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed); int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed);
_lastSendTime = now;
if (usecToSleep > 0) { // we only sleep in non-threaded mode
usleep(usecToSleep); if (usecToSleep > 0) {
usleep(usecToSleep);
}
} }
_lastSendTime = now;
} }
return isStillRunning(); // keep running till they terminate us return isStillRunning(); // keep running till they terminate us
} }
void PacketSender::processWithoutSleep() {
while (_packets.size() > 0) {
NetworkPacket& packet = _packets.front();
// send the packet through the NodeList...
UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket();
nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength());
if (_notify) {
_notify->packetSentNotification(packet.getLength());
}
lock();
_packets.erase(_packets.begin());
unlock();
}
}

View file

@ -43,7 +43,6 @@ public:
PacketSenderNotify* getPacketSenderNotify() const { return _notify; } PacketSenderNotify* getPacketSenderNotify() const { return _notify; }
virtual bool process(); virtual bool process();
virtual void processWithoutSleep();
/// are there packets waiting in the send queue to be sent /// are there packets waiting in the send queue to be sent
bool hasPacketsToSend() const { return _packets.size() > 0; } bool hasPacketsToSend() const { return _packets.size() > 0; }
@ -51,8 +50,16 @@ public:
/// how many packets are there in the send queue waiting to be sent /// how many packets are there in the send queue waiting to be sent
int packetsToSendCount() const { return _packets.size(); } int packetsToSendCount() const { return _packets.size(); }
/// If you're running in non-threaded mode, call this to give us a hint as to how frequently you will call process.
/// This has no effect in threaded mode. This is only considered a hint in non-threaded mode.
/// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode.
void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; }
protected: protected:
int _packetsPerSecond; int _packetsPerSecond;
int _usecsPerProcessCallHint;
uint64_t _lastProcessCallTime;
SimpleMovingAverage _averageProcessCallTime;
private: private:
std::vector<NetworkPacket> _packets; std::vector<NetworkPacket> _packets;

View file

@ -48,8 +48,6 @@ VoxelEditPacketSender::~VoxelEditPacketSender() {
void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) {
//printf("void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail)\n");
// allows app to disable sending if for example voxels have been disabled // allows app to disable sending if for example voxels have been disabled
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
@ -72,7 +70,6 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail&
// if we've saved MORE than out max, then clear out the oldest packet... // if we've saved MORE than out max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
if (allPendingMessages > _maxPendingMessages) { if (allPendingMessages > _maxPendingMessages) {
printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n");
EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); EditPacketBuffer* packet = _preServerSingleMessagePackets.front();
delete packet; delete packet;
_preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin());
@ -102,9 +99,6 @@ bool VoxelEditPacketSender::voxelServersExist() const {
// This method is called when the edit packet layer has determined that it has a fully formed packet destined for // This method is called when the edit packet layer has determined that it has a fully formed packet destined for
// a known nodeID. However, we also want to handle the case where the // a known nodeID. However, we also want to handle the case where the
void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) {
//printf("void VoxelEditPacketSender::queuePacketToNode(nodeID=%d)\n",nodeID);
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
// only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER
@ -117,9 +111,6 @@ void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bu
} }
void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) { void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) {
//printf("void VoxelEditPacketSender::queueVoxelEditMessages()\n");
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
} }
@ -135,10 +126,6 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO
} }
void VoxelEditPacketSender::processPreServerExistsPackets() { void VoxelEditPacketSender::processPreServerExistsPackets() {
printf("void VoxelEditPacketSender::processPreServerExistsPackets()\n");
assert(voxelServersExist()); // we should only be here if we have jurisdictions assert(voxelServersExist()); // we should only be here if we have jurisdictions
// First send out all the single message packets... // First send out all the single message packets...
@ -166,9 +153,6 @@ printf("void VoxelEditPacketSender::processPreServerExistsPackets()\n");
} }
void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) { void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) {
printf("void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, length=%ld)\n", length);
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
} }
@ -200,9 +184,6 @@ printf("void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char*
void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) {
//printf("void VoxelEditPacketSender::queueVoxelEditMessage(unsigned char* codeColorBuffer, length=%ld)\n", length);
if (!_shouldSend) { if (!_shouldSend) {
return; // bail early return; // bail early
} }
@ -217,7 +198,6 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha
// if we've saved MORE than out max, then clear out the oldest packet... // if we've saved MORE than out max, then clear out the oldest packet...
int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size();
if (allPendingMessages > _maxPendingMessages) { if (allPendingMessages > _maxPendingMessages) {
printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n");
EditPacketBuffer* packet = _preServerPackets.front(); EditPacketBuffer* packet = _preServerPackets.front();
delete packet; delete packet;
_preServerPackets.erase(_preServerPackets.begin()); _preServerPackets.erase(_preServerPackets.begin());
@ -267,17 +247,13 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha
} }
void VoxelEditPacketSender::releaseQueuedMessages() { void VoxelEditPacketSender::releaseQueuedMessages() {
//printf("void VoxelEditPacketSender::releaseQueuedMessages()\n");
// if we don't yet have jurisdictions then we can't actually release messages yet because we don't // if we don't yet have jurisdictions then we can't actually release messages yet because we don't
// know where to send them to. Instead, just remember this request and when we eventually get jurisdictions // know where to send them to. Instead, just remember this request and when we eventually get jurisdictions
// call release again at that time. // call release again at that time.
if (!voxelServersExist()) { if (!voxelServersExist()) {
//printf("...no voxel servers... _releaseQueuedMessagesPending=true\n");
_releaseQueuedMessagesPending = true; _releaseQueuedMessagesPending = true;
} else { } else {
for (std::map<uint16_t,EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { for (std::map<uint16_t,EditPacketBuffer>::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) {
//printf("...actually calling releaseQueuedPacket()\n");
releaseQueuedPacket(i->second); releaseQueuedPacket(i->second);
} }
} }
@ -298,12 +274,9 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC
} }
bool VoxelEditPacketSender::process() { bool VoxelEditPacketSender::process() {
printf("VoxelEditPacketSender::process()\n");
// if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those
// before doing our normal process step. This processPreJurisdictionPackets() // before doing our normal process step. This processPreJurisdictionPackets()
if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) {
printf("processPreServerExistsPackets()<<<<<<<<<<<<<<<<<<<<<\n");
processPreServerExistsPackets(); processPreServerExistsPackets();
} }