bug fixes to JurisdictionListener, PacketSender, addition of packetsToSendCount() to Voxels JS

This commit is contained in:
ZappoMan 2013-11-08 02:22:10 -08:00
parent f1704e9d79
commit 8043970df7
7 changed files with 56 additions and 15 deletions

View file

@ -40,4 +40,9 @@ void VoxelScriptingInterface::queueVoxelDelete(float x, float y, float z, float
VoxelDetail deleteVoxelDetail = {x, y, z, scale, 0, 0, 0}; VoxelDetail deleteVoxelDetail = {x, y, z, scale, 0, 0, 0};
_voxelPacketSender.queueVoxelEditMessages(PACKET_TYPE_ERASE_VOXEL, 1, &deleteVoxelDetail); _voxelPacketSender.queueVoxelEditMessages(PACKET_TYPE_ERASE_VOXEL, 1, &deleteVoxelDetail);
} }
int VoxelScriptingInterface::packetsToSendCount() const {
return _voxelPacketSender.packetsToSendCount();
}

View file

@ -49,6 +49,10 @@ public slots:
/// \param z the z-coordinate of the voxel (in VS space) /// \param z the z-coordinate of the voxel (in VS space)
/// \param scale the scale of the voxel (in VS space) /// \param scale the scale of the voxel (in VS space)
void queueVoxelDelete(float x, float y, float z, float scale); void queueVoxelDelete(float x, float y, float z, float scale);
/// get the current number of pending, queued, but unsent packets
int packetsToSendCount() const;
private: private:
/// attached VoxelEditPacketSender that handles queuing and sending of packets to VS /// attached VoxelEditPacketSender that handles queuing and sending of packets to VS
VoxelEditPacketSender _voxelPacketSender; VoxelEditPacketSender _voxelPacketSender;

View file

@ -39,8 +39,11 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe
} }
bool PacketSender::process() { bool PacketSender::process() {
bool hasSlept = false;
uint64_t USECS_PER_SECOND = 1000 * 1000; uint64_t USECS_PER_SECOND = 1000 * 1000;
uint64_t USECS_SMALL_ADJUST = 2 * 1000; // approaximate 2ms
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
uint64_t INTERVAL_SLEEP_USECS = (SEND_INTERVAL_USECS > USECS_SMALL_ADJUST) ? SEND_INTERVAL_USECS - USECS_SMALL_ADJUST : SEND_INTERVAL_USECS;
// keep track of our process call times, so we have a reliable account of how often our caller calls us // keep track of our process call times, so we have a reliable account of how often our caller calls us
uint64_t now = usecTimestampNow(); uint64_t now = usecTimestampNow();
@ -50,7 +53,8 @@ bool PacketSender::process() {
if (_packets.size() == 0) { if (_packets.size() == 0) {
if (isThreaded()) { if (isThreaded()) {
usleep(SEND_INTERVAL_USECS); usleep(INTERVAL_SLEEP_USECS);
hasSlept = true;
} else { } else {
return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us
} }
@ -62,6 +66,7 @@ bool PacketSender::process() {
// if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process // if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process
// based on how often we get called... We do this by keeping a running average of our call times, and we determine // based on how often we get called... We do this by keeping a running average of our call times, and we determine
// how many packets to send per call // how many packets to send per call
if (!isThreaded()) { if (!isThreaded()) {
int averageCallTime; int averageCallTime;
const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2; const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2;
@ -89,8 +94,6 @@ bool PacketSender::process() {
int packetsLeft = _packets.size(); int packetsLeft = _packets.size();
bool keepGoing = packetsLeft > 0; bool keepGoing = packetsLeft > 0;
while (keepGoing) { while (keepGoing) {
uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond);
lock(); lock();
NetworkPacket& packet = _packets.front(); NetworkPacket& packet = _packets.front();
NetworkPacket temporary = packet; // make a copy NetworkPacket temporary = packet; // make a copy
@ -117,14 +120,15 @@ bool PacketSender::process() {
if (keepGoing) { if (keepGoing) {
now = usecTimestampNow(); now = usecTimestampNow();
uint64_t elapsed = now - _lastSendTime; uint64_t elapsed = now - _lastSendTime;
int usecToSleep = SEND_INTERVAL_USECS - elapsed; int usecToSleep = INTERVAL_SLEEP_USECS - elapsed;
// we only sleep in non-threaded mode // we only sleep in non-threaded mode
if (usecToSleep > 0) { if (usecToSleep > 0) {
if (usecToSleep > SEND_INTERVAL_USECS) { if (usecToSleep > INTERVAL_SLEEP_USECS) {
usecToSleep = SEND_INTERVAL_USECS; usecToSleep = INTERVAL_SLEEP_USECS;
} }
usleep(usecToSleep); usleep(usecToSleep);
hasSlept = true;
} }
} }
@ -133,8 +137,21 @@ bool PacketSender::process() {
keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0); keepGoing = (packetsThisCall < packetsPerCall) && (packetsLeft > 0);
} }
// if threaded and we only sent one packet, we still want to sleep....
if (isThreaded() && !hasSlept) {
now = usecTimestampNow();
uint64_t elapsed = now - _lastSendTime;
int usecToSleep = INTERVAL_SLEEP_USECS - elapsed;
// we only sleep in non-threaded mode
if (usecToSleep > 0) {
if (usecToSleep > INTERVAL_SLEEP_USECS) {
usecToSleep = INTERVAL_SLEEP_USECS;
}
usleep(usecToSleep);
}
}
_lastSendTime = now; _lastSendTime = now;
} }
return isStillRunning();
return isStillRunning(); // keep running till they terminate us
} }

View file

@ -12,6 +12,10 @@
#include "ReceivedPacketProcessor.h" #include "ReceivedPacketProcessor.h"
#include "SharedUtil.h" #include "SharedUtil.h"
ReceivedPacketProcessor::ReceivedPacketProcessor() {
_dontSleep = false;
}
void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned char* packetData, ssize_t packetLength) {
// Make sure our Node and NodeList knows we've heard from this node. // Make sure our Node and NodeList knows we've heard from this node.
Node* node = NodeList::getInstance()->nodeWithAddress(&address); Node* node = NodeList::getInstance()->nodeWithAddress(&address);
@ -26,7 +30,10 @@ void ReceivedPacketProcessor::queueReceivedPacket(sockaddr& address, unsigned ch
} }
bool ReceivedPacketProcessor::process() { bool ReceivedPacketProcessor::process() {
if (_packets.size() == 0) {
// If a derived class handles process sleeping, like the JurisdiciontListener, then it can set
// this _dontSleep member and we will honor that request.
if (_packets.size() == 0 && !_dontSleep) {
const uint64_t RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps const uint64_t RECEIVED_THREAD_SLEEP_INTERVAL = (1000 * 1000)/60; // check at 60fps
usleep(RECEIVED_THREAD_SLEEP_INTERVAL); usleep(RECEIVED_THREAD_SLEEP_INTERVAL);
} }

View file

@ -17,6 +17,7 @@
/// Generalized threaded processor for handling received inbound packets. /// Generalized threaded processor for handling received inbound packets.
class ReceivedPacketProcessor : public virtual GenericThread { class ReceivedPacketProcessor : public virtual GenericThread {
public: public:
ReceivedPacketProcessor();
/// Add packet from network receive thread to the processing queue. /// Add packet from network receive thread to the processing queue.
/// \param sockaddr& senderAddress the address of the sender /// \param sockaddr& senderAddress the address of the sender
@ -30,7 +31,7 @@ public:
/// How many received packets waiting are to be processed /// How many received packets waiting are to be processed
int packetsToProcessCount() const { return _packets.size(); } int packetsToProcessCount() const { return _packets.size(); }
protected: protected:
/// Callback for processing of recieved packets. Implement this to process the incoming packets. /// Callback for processing of recieved packets. Implement this to process the incoming packets.
/// \param sockaddr& senderAddress the address of the sender /// \param sockaddr& senderAddress the address of the sender
@ -42,6 +43,8 @@ protected:
/// Implements generic processing behavior for this thread. /// Implements generic processing behavior for this thread.
virtual bool process(); virtual bool process();
bool _dontSleep;
private: private:
std::vector<NetworkPacket> _packets; std::vector<NetworkPacket> _packets;

View file

@ -19,6 +19,7 @@
JurisdictionListener::JurisdictionListener(PacketSenderNotify* notify) : JurisdictionListener::JurisdictionListener(PacketSenderNotify* notify) :
PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND) PacketSender(notify, JurisdictionListener::DEFAULT_PACKETS_PER_SECOND)
{ {
ReceivedPacketProcessor::_dontSleep = true; // we handle sleeping so this class doesn't need to
NodeList* nodeList = NodeList::getInstance(); NodeList* nodeList = NodeList::getInstance();
nodeList->addHook(this); nodeList->addHook(this);
} }
@ -53,8 +54,11 @@ bool JurisdictionListener::queueJurisdictionRequest() {
} }
} }
// set our packets per second to be the number of nodes if (nodeCount > 0){
setPacketsPerSecond(nodeCount); setPacketsPerSecond(nodeCount);
} else {
setPacketsPerSecond(NO_SERVER_CHECK_RATE);
}
// keep going if still running // keep going if still running
return isStillRunning(); return isStillRunning();
@ -84,5 +88,6 @@ bool JurisdictionListener::process() {
// NOTE: This will sleep if there are no pending packets to process // NOTE: This will sleep if there are no pending packets to process
continueProcessing = ReceivedPacketProcessor::process(); continueProcessing = ReceivedPacketProcessor::process();
} }
return continueProcessing; return continueProcessing;
} }

View file

@ -24,6 +24,7 @@
class JurisdictionListener : public NodeListHook, public PacketSender, public ReceivedPacketProcessor { class JurisdictionListener : public NodeListHook, public PacketSender, public ReceivedPacketProcessor {
public: public:
static const int DEFAULT_PACKETS_PER_SECOND = 1; static const int DEFAULT_PACKETS_PER_SECOND = 1;
static const int NO_SERVER_CHECK_RATE=60;
JurisdictionListener(PacketSenderNotify* notify = NULL); JurisdictionListener(PacketSenderNotify* notify = NULL);
~JurisdictionListener(); ~JurisdictionListener();
@ -50,6 +51,5 @@ private:
NodeToJurisdictionMap _jurisdictions; NodeToJurisdictionMap _jurisdictions;
bool queueJurisdictionRequest(); bool queueJurisdictionRequest();
}; };
#endif // __shared__JurisdictionListener__ #endif // __shared__JurisdictionListener__