From cc0009976bc4eb3b78aa6fcbb9821e4442ff78dc Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Wed, 2 Oct 2013 15:02:23 -0700 Subject: [PATCH] first cut and handling buffering of edit packets when voxel servers are unknown --- animation-server/src/main.cpp | 2 +- interface/src/Application.cpp | 2 +- libraries/shared/src/PacketSender.h | 9 +- .../voxels/src/VoxelEditPacketSender.cpp | 98 +++++++++++++++++-- libraries/voxels/src/VoxelEditPacketSender.h | 29 +++++- 5 files changed, 121 insertions(+), 19 deletions(-) diff --git a/animation-server/src/main.cpp b/animation-server/src/main.cpp index 50e1da707c..56a7abdb67 100644 --- a/animation-server/src/main.cpp +++ b/animation-server/src/main.cpp @@ -618,7 +618,7 @@ void* animateVoxels(void* args) { } if (::voxelEditPacketSender) { - ::voxelEditPacketSender->flushQueue(); + ::voxelEditPacketSender->releaseQueuedMessages(); } uint64_t end = usecTimestampNow(); diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index e26ed0450d..fefc8b1295 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -1440,7 +1440,7 @@ void Application::pasteVoxels() { _sharedVoxelSystem.changeTree(&_clipboard); } - _voxelEditSender.flushQueue(); + _voxelEditSender.releaseQueuedMessages(); if (calculatedOctCode) { delete[] calculatedOctCode; diff --git a/libraries/shared/src/PacketSender.h b/libraries/shared/src/PacketSender.h index 5309043743..babcaca82b 100644 --- a/libraries/shared/src/PacketSender.h +++ b/libraries/shared/src/PacketSender.h @@ -45,12 +45,15 @@ public: virtual bool process(); virtual void processWithoutSleep(); + /// are there packets waiting in the send queue to be sent + bool hasPacketsToSend() const { return _packets.size() > 0; } + + /// how many packets are there in the send queue waiting to be sent + int packetsToSendCount() const { return _packets.size(); } + protected: int _packetsPerSecond; - bool hasPacketsToSend() const { return _packets.size() > 0; } - int packetsToSendCount() const { return _packets.size(); } - private: std::vector _packets; uint64_t _lastSendTime; diff --git a/libraries/voxels/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp index 5853652688..862d0663e0 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -8,6 +8,8 @@ // Threaded or non-threaded voxel packet Sender for the Application // +#include + #include #include @@ -15,12 +17,30 @@ #include "VoxelEditPacketSender.h" +EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssize_t length, uint16_t nodeID) { + _nodeID = nodeID; + _currentType = type; + _currentSize = length; + memcpy(_currentBuffer, buffer, length); +}; + + VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) : PacketSender(notify), _shouldSend(true), - _voxelServerJurisdictions(NULL) { + _voxelServerJurisdictions(NULL), + _releaseQueuedMessagesPending(false) { } +VoxelEditPacketSender::~VoxelEditPacketSender() { + while (!_preJurisidictionPackets.empty()) { + EditPacketBuffer* packet = _preJurisidictionPackets.front(); + delete packet; + _preJurisidictionPackets.erase(_preJurisidictionPackets.begin()); + } +} + + void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { // allows app to disable sending if for example voxels have been disabled if (!_shouldSend) { @@ -31,12 +51,25 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& int sizeOut; if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ - actuallySendMessage(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! + queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! delete[] bufferOut; } } -void VoxelEditPacketSender::actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { +bool VoxelEditPacketSender::voxelServersExist() const { + NodeList* nodeList = NodeList::getInstance(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER + if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { + return true; + } + } + return false; +} + +// This method is called when the edit packet layer has determined that it has a fully formed packet destined for +// a known nodeID. However, we also want to handle the case where the +void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { NodeList* nodeList = NodeList::getInstance(); for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER @@ -63,12 +96,38 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO } } +void VoxelEditPacketSender::processPreJurisdictionPackets() { + assert(_voxelServerJurisdictions); // we should only be here if we have jurisdictions + + while (!_preJurisidictionPackets.empty()) { + EditPacketBuffer* packet = _preJurisidictionPackets.front(); + queueVoxelEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize); + delete packet; + _preJurisidictionPackets.erase(_preJurisidictionPackets.begin()); + } + + // if while waiting for the jurisdictions the caller called releaseQueuedMessages() + // then we want to honor that request now. + if (_releaseQueuedMessagesPending) { + releaseQueuedMessages(); + _releaseQueuedMessagesPending = false; + } +} + void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { if (!_shouldSend) { return; // bail early } - + + // If we don't have voxel jurisdictions, then we will simply queue up all of these packets and wait till we have + // jurisdictions for processing + if (!_voxelServerJurisdictions) { + EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length); + _preJurisidictionPackets.push_back(packet); + return; // bail early + } + // We want to filter out edit messages for voxel servers based on the server's Jurisdiction // But we can't really do that with a packed message, since each edit message could be destined // for a different voxel server... So we need to actually manage multiple queued packets... one @@ -93,7 +152,7 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha // If we're switching type, then we send the last one and start over if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || (packetBuffer._currentSize + length >= MAX_PACKET_SIZE)) { - flushQueue(packetBuffer); + releaseQueuedPacket(packetBuffer); initializePacket(packetBuffer, type); } @@ -109,14 +168,21 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha } } -void VoxelEditPacketSender::flushQueue() { - for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - flushQueue(i->second); +void VoxelEditPacketSender::releaseQueuedMessages() { + // if we don't yet have jurisdictions then we can't actually release messages yet because we don't + // know where to send them to. Instead, just remember this request and when we eventually get jurisdictions + // call release again at that time. + if (!_voxelServerJurisdictions) { + _releaseQueuedMessagesPending = true; + } else { + for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + releaseQueuedPacket(i->second); + } } } -void VoxelEditPacketSender::flushQueue(EditPacketBuffer& packetBuffer) { - actuallySendMessage(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); +void VoxelEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { + queuePacketToNode(packetBuffer._nodeID, &packetBuffer._currentBuffer[0], packetBuffer._currentSize); packetBuffer._currentSize = 0; packetBuffer._currentType = PACKET_TYPE_UNKNOWN; } @@ -128,3 +194,15 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC packetBuffer._currentSize += sizeof(unsigned short int); // set to command + sequence packetBuffer._currentType = type; } + +bool VoxelEditPacketSender::process() { + + // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those + // before doing our normal process step. This processPreJurisdictionPackets() + if (_voxelServerJurisdictions && !_preJurisidictionPackets.empty()) { + processPreJurisdictionPackets(); + } + + // base class does most of the work. + return PacketSender::process(); +} diff --git a/libraries/voxels/src/VoxelEditPacketSender.h b/libraries/voxels/src/VoxelEditPacketSender.h index bda2c8006d..3c8eec3c00 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.h +++ b/libraries/voxels/src/VoxelEditPacketSender.h @@ -20,6 +20,7 @@ class EditPacketBuffer { public: EditPacketBuffer() { _currentSize = 0; _currentType = PACKET_TYPE_UNKNOWN; _nodeID = UNKNOWN_NODE_ID; } + EditPacketBuffer(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length, uint16_t nodeID = UNKNOWN_NODE_ID); uint16_t _nodeID; PACKET_TYPE _currentType; unsigned char _currentBuffer[MAX_PACKET_SIZE]; @@ -30,6 +31,7 @@ public: class VoxelEditPacketSender : public PacketSender { public: VoxelEditPacketSender(PacketSenderNotify* notify = NULL); + ~VoxelEditPacketSender(); /// Send voxel edit message immediately void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail); @@ -42,24 +44,43 @@ public: /// which voxel-server node or nodes the packet should be sent to. void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details); - /// flushes all queued packets for all nodes - void flushQueue(); + /// releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message + /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to + /// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular + /// interval to ensure that the packets are actually sent. + void releaseQueuedMessages(); + /// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and + /// not queued and not sent bool getShouldSend() const { return _shouldSend; } + + /// set sending mode. By default we are set to shouldSend=TRUE and packets will be sent. If shouldSend=FALSE, then we'll + /// switch to not sending mode, and all packets and messages will be ignored, not queued, and not sent. This might be used + /// in an application like interface when all voxel features are disabled. void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; } + /// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation. + /// As initialized the VoxelEditPacketSender will wait until it knows of the jurisdictions before queuing and sending + /// packets to voxel servers. void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) { _voxelServerJurisdictions = voxelServerJurisdictions; } + /// if you're running in non-threaded mode, you must call this method regularly + virtual bool process(); + private: bool _shouldSend; - void actuallySendMessage(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); + void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type); - void flushQueue(EditPacketBuffer& packetBuffer); // flushes specific queued packet + void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet + bool voxelServersExist() const; + void processPreJurisdictionPackets(); std::map _pendingEditPackets; + std::vector _preJurisidictionPackets; NodeToJurisdictionMap* _voxelServerJurisdictions; + bool _releaseQueuedMessagesPending; }; #endif // __shared__VoxelEditPacketSender__