diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index 50e1da707c..2ffc46ee2e 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -6,6 +6,7 @@ // Copyright (c) 2012 High Fidelity, Inc. All rights reserved. // +#include #include #include #include @@ -39,12 +40,14 @@ bool includeMovingBug = true; bool includeBlinkingVoxel = false; bool includeDanceFloor = true; bool buildStreet = false; +bool nonThreadedPacketSender = false; const int ANIMATION_LISTEN_PORT = 40107; const int ACTUAL_FPS = 60; const double OUR_FPS_IN_MILLISECONDS = 1000.0/ACTUAL_FPS; // determines FPS from our desired FPS -const int ANIMATE_VOXELS_INTERVAL_USECS = OUR_FPS_IN_MILLISECONDS * 1000.0; // converts from milliseconds to usecs +const int FUDGE_USECS = 10; // a little bit of fudge to actually do some processing +const int ANIMATE_VOXELS_INTERVAL_USECS = (OUR_FPS_IN_MILLISECONDS * 1000.0) - FUDGE_USECS; // converts from milliseconds to usecs bool wantLocalDomain = false; @@ -618,7 +621,10 @@ void* animateVoxels(void* args) { } if (::voxelEditPacketSender) { - ::voxelEditPacketSender->flushQueue(); + ::voxelEditPacketSender->releaseQueuedMessages(); + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->process(); + } } uint64_t end = usecTimestampNow(); @@ -629,6 +635,9 @@ void* animateVoxels(void* args) { } // dynamically sleep until we need to fire off the next set of voxels uint64_t usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSendTime)); + if (usecToSleep > ANIMATE_VOXELS_INTERVAL_USECS) { + usecToSleep = ANIMATE_VOXELS_INTERVAL_USECS; + } if (usecToSleep > 0) { usleep(usecToSleep); @@ -648,6 +657,11 @@ int main(int argc, const char * argv[]) NodeList* nodeList = NodeList::createInstance(NODE_TYPE_ANIMATION_SERVER, ANIMATION_LISTEN_PORT); setvbuf(stdout, NULL, _IOLBF, 0); + // Handle Local Domain testing with the --local command line + const char* NON_THREADED_PACKETSENDER = "--NonThreadedPacketSender"; + ::nonThreadedPacketSender = cmdOptionExists(argc, argv, NON_THREADED_PACKETSENDER); + printf("nonThreadedPacketSender=%s\n", debug::valueOf(::nonThreadedPacketSender)); + // Handle Local Domain testing with the --local command line const char* NO_BILLBOARD = "--NoBillboard"; ::includeBillboard = !cmdOptionExists(argc, argv, NO_BILLBOARD); @@ -702,10 +716,13 @@ int main(int argc, const char * argv[]) // Create out VoxelEditPacketSender ::voxelEditPacketSender = new VoxelEditPacketSender; if (::voxelEditPacketSender) { - ::voxelEditPacketSender->initialize(true); + ::voxelEditPacketSender->initialize(!::nonThreadedPacketSender); if (::jurisdictionListener) { ::voxelEditPacketSender->setVoxelServerJurisdictions(::jurisdictionListener->getJurisdictions()); } + if (::nonThreadedPacketSender) { + ::voxelEditPacketSender->setProcessCallIntervalHint(ANIMATE_VOXELS_INTERVAL_USECS); + } } srand((unsigned)time(0)); diff --git a/assignment-client/src/Agent.cpp b/assignment-client/src/Agent.cpp index c9d5dd3d09..2906233d78 100644 --- a/assignment-client/src/Agent.cpp +++ b/assignment-client/src/Agent.cpp @@ -94,6 +94,9 @@ void Agent::run() { engine.globalObject().setProperty("TREE_SCALE", treeScaleValue); const long long VISUAL_DATA_SEND_INTERVAL_USECS = (1 / 60.0f) * 1000 * 1000; + + // let the VoxelPacketSender know how frequently we plan to call it + voxelScripter.getVoxelPacketSender()->setProcessCallIntervalHint(VISUAL_DATA_SEND_INTERVAL_USECS); QScriptValue visualSendIntervalValue = engine.newVariant((QVariant(VISUAL_DATA_SEND_INTERVAL_USECS / 1000))); engine.globalObject().setProperty("VISUAL_DATA_SEND_INTERVAL_MS", visualSendIntervalValue); @@ -149,9 +152,11 @@ void Agent::run() { qDebug() << "Uncaught exception at line" << line << ":" << engine.uncaughtException().toString() << "\n"; } - // flush the queue of packets and then process them so they are all sent off - voxelScripter.getVoxelPacketSender()->flushQueue(); - voxelScripter.getVoxelPacketSender()->processWithoutSleep(); + // release the queue of edit voxel messages. + voxelScripter.getVoxelPacketSender()->releaseQueuedMessages(); + + // since we're in non-threaded mode, call process so that the packets are sent + voxelScripter.getVoxelPacketSender()->process(); } while (NodeList::getInstance()->getNodeSocket()->receive((sockaddr*) &senderAddress, receivedData, &receivedBytes)) { diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index daef58da24..615fcfaa01 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -1440,7 +1440,7 @@ void Application::pasteVoxels() { _sharedVoxelSystem.changeTree(&_clipboard); } - _voxelEditSender.flushQueue(); + _voxelEditSender.releaseQueuedMessages(); if (calculatedOctCode) { delete[] calculatedOctCode; @@ -3354,6 +3354,10 @@ void Application::deleteVoxelUnderCursor() { if (_mouseVoxel.s != 0) { // sending delete to the server is sufficient, server will send new version so we see updates soon enough _voxelEditSender.sendVoxelEditMessage(PACKET_TYPE_ERASE_VOXEL, _mouseVoxel); + + // delete it locally to see the effect immediately (and in case no voxel server is present) + _voxels.deleteVoxelAt(_mouseVoxel.x, _mouseVoxel.y, _mouseVoxel.z, _mouseVoxel.s); + AudioInjector* voxelInjector = AudioInjectionManager::injectorWithCapacity(5000); if (voxelInjector) { diff --git a/libraries/shared/src/GenericThread.h b/libraries/shared/src/GenericThread.h index e363d16178..013b7a7936 100644 --- a/libraries/shared/src/GenericThread.h +++ b/libraries/shared/src/GenericThread.h @@ -42,13 +42,15 @@ protected: void unlock() { pthread_mutex_unlock(&_mutex); } bool isStillRunning() const { return !_stopThread; } + + bool isThreaded() const { return _isThreaded; } private: pthread_mutex_t _mutex; - bool _stopThread; - bool _isThreaded; - pthread_t _thread; + bool _stopThread; + bool _isThreaded; + pthread_t _thread; }; extern "C" void* GenericThreadEntry(void* arg); diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index ebf3b50060..4a2b879e74 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -8,6 +8,7 @@ // Threaded or non-threaded packet sender. // +#include #include #include "NodeList.h" @@ -17,9 +18,13 @@ const int PacketSender::DEFAULT_PACKETS_PER_SECOND = 200; const int PacketSender::MINIMUM_PACKETS_PER_SECOND = 1; +const int AVERAGE_CALL_TIME_SAMPLES = 10; PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : _packetsPerSecond(packetsPerSecond), + _usecsPerProcessCallHint(0), + _lastProcessCallTime(usecTimestampNow()), + _averageProcessCallTime(AVERAGE_CALL_TIME_SAMPLES), _lastSendTime(usecTimestampNow()), _notify(notify) { @@ -37,16 +42,64 @@ bool PacketSender::process() { uint64_t USECS_PER_SECOND = 1000 * 1000; uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); + // keep track of our process call times, so we have a reliable account of how often our caller calls us + uint64_t now = usecTimestampNow(); + uint64_t elapsedSinceLastCall = now - _lastProcessCallTime; + _lastProcessCallTime = now; + _averageProcessCallTime.updateAverage(elapsedSinceLastCall); + if (_packets.size() == 0) { - usleep(SEND_INTERVAL_USECS); + if (isThreaded()) { + usleep(SEND_INTERVAL_USECS); + } else { + return isStillRunning(); // in non-threaded mode, if there's nothing to do, just return, keep running till they terminate us + } } - while (_packets.size() > 0) { + + int packetsPerCall = _packets.size(); // in threaded mode, we just empty this! + int packetsThisCall = 0; + + // if we're in non-threaded mode, then we actually need to determine how many packets to send per call to process + // based on how often we get called... We do this by keeping a running average of our call times, and we determine + // how many packets to send per call + if (!isThreaded()) { + int averageCallTime; + const int TRUST_AVERAGE_AFTER = AVERAGE_CALL_TIME_SAMPLES * 2; + if (_usecsPerProcessCallHint == 0 || _averageProcessCallTime.getSampleCount() > TRUST_AVERAGE_AFTER) { + averageCallTime = _averageProcessCallTime.getAverage(); + } else { + averageCallTime = _usecsPerProcessCallHint; + } + + // we can determine how many packets we need to send per call to achieve our desired + // packets per second send rate. + int callsPerSecond = USECS_PER_SECOND / averageCallTime; + packetsPerCall = ceil(_packetsPerSecond / callsPerSecond); + + // send at least one packet per call, if we have it + if (packetsPerCall < 1) { + packetsPerCall = 1; + } + } + + bool keepGoing = _packets.size() > 0; + while (keepGoing) { + + // in threaded mode, we go till we're empty + if (isThreaded()) { + keepGoing = _packets.size() > 0; + } else { + // in non-threaded mode, we send as many packets as we need per expected call to process() + keepGoing = (packetsThisCall < packetsPerCall) && (_packets.size() > 0); + } + NetworkPacket& packet = _packets.front(); // send the packet through the NodeList... UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); + packetsThisCall++; if (_notify) { _notify->packetSentNotification(packet.getLength()); @@ -56,34 +109,18 @@ bool PacketSender::process() { _packets.erase(_packets.begin()); unlock(); - uint64_t now = usecTimestampNow(); - // dynamically sleep until we need to fire off the next set of voxels - uint64_t elapsed = now - _lastSendTime; - int usecToSleep = SEND_INTERVAL_USECS - elapsed; - _lastSendTime = now; - if (usecToSleep > 0) { - usleep(usecToSleep); + // dynamically sleep until we need to fire off the next set of voxels we only sleep in threaded mode + if (isThreaded()) { + uint64_t elapsed = now - _lastSendTime; + int usecToSleep = std::max(SEND_INTERVAL_USECS, SEND_INTERVAL_USECS - elapsed); + + // we only sleep in non-threaded mode + if (usecToSleep > 0) { + usleep(usecToSleep); + } } - + _lastSendTime = now; } + return isStillRunning(); // keep running till they terminate us } - -void PacketSender::processWithoutSleep() { - while (_packets.size() > 0) { - NetworkPacket& packet = _packets.front(); - - // send the packet through the NodeList... - UDPSocket* nodeSocket = NodeList::getInstance()->getNodeSocket(); - - nodeSocket->send(&packet.getAddress(), packet.getData(), packet.getLength()); - - if (_notify) { - _notify->packetSentNotification(packet.getLength()); - } - - lock(); - _packets.erase(_packets.begin()); - unlock(); - } -} diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 5309043743..84ae6ae9ca 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -43,14 +43,24 @@ public: PacketSenderNotify* getPacketSenderNotify() const { return _notify; } virtual bool process(); - virtual void processWithoutSleep(); + + /// are there packets waiting in the send queue to be sent + bool hasPacketsToSend() const { return _packets.size() > 0; } + + /// how many packets are there in the send queue waiting to be sent + int packetsToSendCount() const { return _packets.size(); } + + /// If you're running in non-threaded mode, call this to give us a hint as to how frequently you will call process. + /// This has no effect in threaded mode. This is only considered a hint in non-threaded mode. + /// \param int usecsPerProcessCall expected number of usecs between calls to process in non-threaded mode. + void setProcessCallIntervalHint(int usecsPerProcessCall) { _usecsPerProcessCallHint = usecsPerProcessCall; } protected: int _packetsPerSecond; + int _usecsPerProcessCallHint; + uint64_t _lastProcessCallTime; + SimpleMovingAverage _averageProcessCallTime; - bool hasPacketsToSend() const { return _packets.size() > 0; } - int packetsToSendCount() const { return _packets.size(); } - private: std::vector _packets; uint64_t _lastSendTime; diff --git a/libraries/voxels/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp index 5853652688..de61a67c44 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -8,6 +8,8 @@ // Threaded or non-threaded voxel packet Sender for the Application // +#include + #include #include @@ -15,12 +17,38 @@ #include "VoxelEditPacketSender.h" +EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssize_t length, uint16_t nodeID) { + _nodeID = nodeID; + _currentType = type; + _currentSize = length; + memcpy(_currentBuffer, buffer, length); +}; + +const int VoxelEditPacketSender::DEFAULT_MAX_PENDING_MESSAGES = PacketSender::DEFAULT_PACKETS_PER_SECOND; + + VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) : PacketSender(notify), _shouldSend(true), + _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), + _releaseQueuedMessagesPending(false), _voxelServerJurisdictions(NULL) { } +VoxelEditPacketSender::~VoxelEditPacketSender() { + while (!_preServerSingleMessagePackets.empty()) { + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); + delete packet; + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + while (!_preServerPackets.empty()) { + EditPacketBuffer* packet = _preServerPackets.front(); + delete packet; + _preServerPackets.erase(_preServerPackets.begin()); + } +} + + void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { // allows app to disable sending if for example voxels have been disabled if (!_shouldSend) { @@ -30,13 +58,49 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& unsigned char* bufferOut; int sizeOut; + // This encodes the voxel edit message into a buffer... if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ - actuallySendMessage(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! + + // If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have + // jurisdictions for processing + if (!voxelServersExist()) { + + // If we're asked to save messages while waiting for voxel servers to arrive, then do so... + if (_maxPendingMessages > 0) { + EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut); + _preServerSingleMessagePackets.push_back(packet); + // if we've saved MORE than out max, then clear out the oldest packet... + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + if (allPendingMessages > _maxPendingMessages) { + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); + delete packet; + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + } + return; // bail early + } else { + queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! + } + + // either way, clean up the created buffer delete[] bufferOut; } } -void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { +bool VoxelEditPacketSender::voxelServersExist() const { + NodeList* nodeList = NodeList::getInstance(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER + if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { + return true; + } + } + return false; +} + +// This method is called when the edit packet layer has determined that it has a fully formed packet destined for +// a known nodeID. However, we also want to handle the case where the +void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { NodeList* nodeList = NodeList::getInstance(); for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER @@ -63,12 +127,87 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO } } -void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { +void VoxelEditPacketSender::processPreServerExistsPackets() { + assert(voxelServersExist()); // we should only be here if we have jurisdictions + + // First send out all the single message packets... + while (!_preServerSingleMessagePackets.empty()) { + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); + queuePacketToNode(UNKNOWN_NODE_ID, &packet->_currentBuffer[0], packet->_currentSize); + delete packet; + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + // Then "process" all the packable messages... + while (!_preServerPackets.empty()) { + EditPacketBuffer* packet = _preServerPackets.front(); + queueVoxelEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize); + delete packet; + _preServerPackets.erase(_preServerPackets.begin()); + } + + // if while waiting for the jurisdictions the caller called releaseQueuedMessages() + // then we want to honor that request now. + if (_releaseQueuedMessagesPending) { + releaseQueuedMessages(); + _releaseQueuedMessagesPending = false; + } +} + +void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) { if (!_shouldSend) { return; // bail early } + + assert(voxelServersExist()); // we must have jurisdictions to be here!! + + // We want to filter out edit messages for voxel servers based on the server's Jurisdiction + // But we can't really do that with a packed message, since each edit message could be destined + // for a different voxel server... So we need to actually manage multiple queued packets... one + // for each voxel server + NodeList* nodeList = NodeList::getInstance(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER + if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { + uint16_t nodeID = node->getNodeID(); + bool isMyJurisdiction = true; + // we need to get the jurisdiction for this + // here we need to get the "pending packet" for this server + const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID]; + isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); + + if (isMyJurisdiction) { + queuePacketToNode(nodeID, codeColorBuffer, length); + } + } + } +} + + +void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { + if (!_shouldSend) { + return; // bail early + } + + // If we don't have voxel jurisdictions, then we will simply queue up all of these packets and wait till we have + // jurisdictions for processing + if (!voxelServersExist()) { + if (_maxPendingMessages > 0) { + EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length); + _preServerPackets.push_back(packet); + + // if we've saved MORE than out max, then clear out the oldest packet... + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + if (allPendingMessages > _maxPendingMessages) { + EditPacketBuffer* packet = _preServerPackets.front(); + delete packet; + _preServerPackets.erase(_preServerPackets.begin()); + } + } + return; // bail early + } + // We want to filter out edit messages for voxel servers based on the server's Jurisdiction // But we can't really do that with a packed message, since each edit message could be destined // for a different voxel server... So we need to actually manage multiple queued packets... one @@ -93,7 +232,7 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha // If we're switching type, then we send the last one and start over if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || (packetBuffer._currentSize + length >= MAX_PACKET_SIZE)) { - flushQueue(packetBuffer); + releaseQueuedPacket(packetBuffer); initializePacket(packetBuffer, type); } @@ -109,14 +248,21 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha } } -void VoxelEditPacketSender::flushQueue() { - for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - flushQueue(i->second); +void VoxelEditPacketSender::releaseQueuedMessages() { + // if we don't yet have jurisdictions then we can't actually release messages yet because we don't + // know where to send them to. Instead, just remember this request and when we eventually get jurisdictions + // call release again at that time. + if (!voxelServersExist()) { + _releaseQueuedMessagesPending = true; + } else { + for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + releaseQueuedPacket(i->second); + } } } -void VoxelEditPacketSender::flushQueue(EditPacketBuffer& packetBuffer) { - actuallySendMessage(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); +void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { + queuePacketToNode(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); packetBuffer._currentSize = 0; packetBuffer._currentType = PACKET_TYPE_UNKNOWN; } @@ -128,3 +274,14 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC packetBuffer._currentSize += sizeof(unsigned short int); // set to command + sequence packetBuffer._currentType = type; } + +bool VoxelEditPacketSender::process() { + // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those + // before doing our normal process step. This processPreJurisdictionPackets() + if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { + processPreServerExistsPackets(); + } + + // base class does most of the work. + return PacketSender::process(); +} diff --git a/libraries/voxels/src/VoxelEditPacketSender.h b/libraries/voxels/src/VoxelEditPacketSender.h index bda2c8006d..a065f3f53c 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.h +++ b/libraries/voxels/src/VoxelEditPacketSender.h @@ -20,45 +20,87 @@ class EditPacketBuffer { public: EditPacketBuffer() { _currentSize = 0; _currentType = PACKET_TYPE_UNKNOWN; _nodeID = UNKNOWN_NODE_ID; } + EditPacketBuffer(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length, uint16_t nodeID = UNKNOWN_NODE_ID); uint16_t _nodeID; PACKET_TYPE _currentType; unsigned char _currentBuffer[MAX_PACKET_SIZE]; ssize_t _currentSize; }; -/// Threaded processor for queueing and sending of outbound edit voxel packets. -class VoxelEditPacketSender : public PacketSender { +/// Utility for processing, packing, queueing and sending of outbound edit voxel messages. +class VoxelEditPacketSender : public virtual PacketSender { public: VoxelEditPacketSender(PacketSenderNotify* notify = NULL); + ~VoxelEditPacketSender(); /// Send voxel edit message immediately void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail); - /// Queues a voxel edit message. Will potentially sends a pending multi-command packet. Determines which voxel-server - /// node or nodes the packet should be sent to. + /// Queues a single voxel edit message. Will potentially send a pending multi-command packet. Determines which voxel-server + /// node or nodes the packet should be sent to. Can be called even before voxel servers are known, in which case up to + /// MaxPendingMessages will be buffered and processed when voxel servers are known. void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length); /// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines - /// which voxel-server node or nodes the packet should be sent to. + /// which voxel-server node or nodes the packet should be sent to. Can be called even before voxel servers are known, in + /// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known. void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details); - /// flushes all queued packets for all nodes - void flushQueue(); + /// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message + /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to + /// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular + /// interval to ensure that the packets are actually sent. Can be called even before voxel servers are known, in + /// which case up to MaxPendingMessages of the released messages will be buffered and actually released when + /// voxel servers are known. + void releaseQueuedMessages(); + /// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and + /// not queued and not sent bool getShouldSend() const { return _shouldSend; } + + /// set sending mode. By default we are set to shouldSend=TRUE and packets will be sent. If shouldSend=FALSE, then we'll + /// switch to not sending mode, and all packets and messages will be ignored, not queued, and not sent. This might be used + /// in an application like interface when all voxel features are disabled. void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; } + /// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation. + /// The internal contents of the jurisdiction map may change throughout the lifetime of the VoxelEditPacketSender. This map + /// can be set prior to voxel servers being present, so long as the contents of the map accurately reflect the current + /// known jurisdictions. void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) { - _voxelServerJurisdictions = voxelServerJurisdictions; + _voxelServerJurisdictions = voxelServerJurisdictions; } + /// if you're running in non-threaded mode, you must call this method regularly + virtual bool process(); + + /// Set the desired number of pending messages that the VoxelEditPacketSender should attempt to queue even if voxel + /// servers are not present. This only applies to how the VoxelEditPacketSender will manage messages when no voxel + /// servers are present. By default, this value is the same as the default packets that will be sent in one second. + /// Which means the VoxelEditPacketSender will not buffer all messages given to it if no voxel servers are present. + /// This is the maximum number of queued messages and single messages. + void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; } + + // the default number of pending messages we will store if no voxel servers are available + static const int DEFAULT_MAX_PENDING_MESSAGES; + private: bool _shouldSend; - void actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); + void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); + void queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length); void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type); - void flushQueue(EditPacketBuffer& packetBuffer); // flushes specific queued packet + void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet + bool voxelServersExist() const; + void processPreServerExistsPackets(); + // These are packets which are destined from know servers but haven't been released because they're still too small std::map _pendingEditPackets; + + // These are packets that are waiting to be processed because we don't yet know if there are voxel servers + int _maxPendingMessages; + bool _releaseQueuedMessagesPending; + std::vector _preServerPackets; // these will get packed into other larger packets + std::vector _preServerSingleMessagePackets; // these will go out as is NodeToJurisdictionMap* _voxelServerJurisdictions; };