From 90c64ab25cc5a066bbe8b05b75b5334f81d1e395 Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Wed, 2 Oct 2013 16:33:06 -0700 Subject: [PATCH] more work on handleing pending packets in VoxelEditPacketSender --- libraries/shared/src/PacketSender.cpp | 2 + .../voxels/src/VoxelEditPacketSender.cpp | 139 +++++++++++++++--- libraries/voxels/src/VoxelEditPacketSender.h | 47 ++++-- 3 files changed, 158 insertions(+), 30 deletions(-) diff --git a/libraries/shared/src/PacketSender.cpp b/libraries/shared/src/PacketSender.cpp index ebf3b50060..707fbed825 100644 --- a/libraries/shared/src/PacketSender.cpp +++ b/libraries/shared/src/PacketSender.cpp @@ -27,6 +27,7 @@ PacketSender::PacketSender(PacketSenderNotify* notify, int packetsPerSecond) : void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packetData, ssize_t packetLength) { + //printf("PacketSender::queuePacketForSending packetLength=%ld\n",packetLength); NetworkPacket packet(address, packetData, packetLength); lock(); _packets.push_back(packet); @@ -34,6 +35,7 @@ void PacketSender::queuePacketForSending(sockaddr& address, unsigned char* packe } bool PacketSender::process() { + printf("PacketSender::process() _packets.size()=%ld\n",_packets.size()); uint64_t USECS_PER_SECOND = 1000 * 1000; uint64_t SEND_INTERVAL_USECS = (_packetsPerSecond == 0) ? USECS_PER_SECOND : (USECS_PER_SECOND / _packetsPerSecond); diff --git a/libraries/voxels/src/VoxelEditPacketSender.cpp b/libraries/voxels/src/VoxelEditPacketSender.cpp index 862d0663e0..974be4262e 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.cpp +++ b/libraries/voxels/src/VoxelEditPacketSender.cpp @@ -28,20 +28,29 @@ EditPacketBuffer::EditPacketBuffer(PACKET_TYPE type, unsigned char* buffer, ssiz VoxelEditPacketSender::VoxelEditPacketSender(PacketSenderNotify* notify) : PacketSender(notify), _shouldSend(true), - _voxelServerJurisdictions(NULL), - _releaseQueuedMessagesPending(false) { + _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), + _releaseQueuedMessagesPending(false), + _voxelServerJurisdictions(NULL) { } VoxelEditPacketSender::~VoxelEditPacketSender() { - while (!_preJurisidictionPackets.empty()) { - EditPacketBuffer* packet = _preJurisidictionPackets.front(); + while (!_preServerSingleMessagePackets.empty()) { + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); delete packet; - _preJurisidictionPackets.erase(_preJurisidictionPackets.begin()); + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + while (!_preServerPackets.empty()) { + EditPacketBuffer* packet = _preServerPackets.front(); + delete packet; + _preServerPackets.erase(_preServerPackets.begin()); } } void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail) { + +printf("void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail)\n"); + // allows app to disable sending if for example voxels have been disabled if (!_shouldSend) { return; // bail early @@ -50,8 +59,32 @@ void VoxelEditPacketSender::sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& unsigned char* bufferOut; int sizeOut; + // This encodes the voxel edit message into a buffer... if (createVoxelEditMessage(type, 0, 1, &detail, bufferOut, sizeOut)){ - queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! + + // If we don't have voxel jurisdictions, then we will simply queue up these packets and wait till we have + // jurisdictions for processing + if (!voxelServersExist()) { + + // If we're asked to save messages while waiting for voxel servers to arrive, then do so... + if (_maxPendingMessages > 0) { + EditPacketBuffer* packet = new EditPacketBuffer(type, bufferOut, sizeOut); + _preServerSingleMessagePackets.push_back(packet); + // if we've saved MORE than out max, then clear out the oldest packet... + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + if (allPendingMessages > _maxPendingMessages) { + printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n"); + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); + delete packet; + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + } + return; // bail early + } else { + queuePacketToNode(UNKNOWN_NODE_ID, bufferOut, sizeOut); // sends to all servers... not ideal! + } + + // either way, clean up the created buffer delete[] bufferOut; } } @@ -70,6 +103,9 @@ bool VoxelEditPacketSender::voxelServersExist() const { // This method is called when the edit packet layer has determined that it has a fully formed packet destined for // a known nodeID. However, we also want to handle the case where the void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut) { + +//printf("void VoxelEditPacketSender::queuePacketToNode(nodeID=%d)\n",nodeID); + NodeList* nodeList = NodeList::getInstance(); for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER @@ -82,6 +118,9 @@ void VoxelEditPacketSender::queuePacketToNode(uint16_t nodeID, unsigned char* bu } void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details) { + +//printf("void VoxelEditPacketSender::queueVoxelEditMessages()\n"); + if (!_shouldSend) { return; // bail early } @@ -96,14 +135,27 @@ void VoxelEditPacketSender::queueVoxelEditMessages(PACKET_TYPE type, int numberO } } -void VoxelEditPacketSender::processPreJurisdictionPackets() { - assert(_voxelServerJurisdictions); // we should only be here if we have jurisdictions +void VoxelEditPacketSender::processPreServerExistsPackets() { - while (!_preJurisidictionPackets.empty()) { - EditPacketBuffer* packet = _preJurisidictionPackets.front(); +printf("void VoxelEditPacketSender::processPreServerExistsPackets()\n"); + + + assert(voxelServersExist()); // we should only be here if we have jurisdictions + + // First send out all the single message packets... + while (!_preServerSingleMessagePackets.empty()) { + EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); + queuePacketToNode(UNKNOWN_NODE_ID, &packet->_currentBuffer[0], packet->_currentSize); + delete packet; + _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + } + + // Then "process" all the packable messages... + while (!_preServerPackets.empty()) { + EditPacketBuffer* packet = _preServerPackets.front(); queueVoxelEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize); delete packet; - _preJurisidictionPackets.erase(_preJurisidictionPackets.begin()); + _preServerPackets.erase(_preServerPackets.begin()); } // if while waiting for the jurisdictions the caller called releaseQueuedMessages() @@ -114,17 +166,64 @@ void VoxelEditPacketSender::processPreJurisdictionPackets() { } } +void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length) { + +printf("void VoxelEditPacketSender::queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, length=%ld)\n", length); + + if (!_shouldSend) { + return; // bail early + } + + assert(voxelServersExist()); // we must have jurisdictions to be here!! + + // We want to filter out edit messages for voxel servers based on the server's Jurisdiction + // But we can't really do that with a packed message, since each edit message could be destined + // for a different voxel server... So we need to actually manage multiple queued packets... one + // for each voxel server + NodeList* nodeList = NodeList::getInstance(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { + // only send to the NodeTypes that are NODE_TYPE_VOXEL_SERVER + if (node->getActiveSocket() != NULL && node->getType() == NODE_TYPE_VOXEL_SERVER) { + uint16_t nodeID = node->getNodeID(); + bool isMyJurisdiction = true; + + // we need to get the jurisdiction for this + // here we need to get the "pending packet" for this server + const JurisdictionMap& map = (*_voxelServerJurisdictions)[nodeID]; + isMyJurisdiction = (map.isMyJurisdiction(codeColorBuffer, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); + + if (isMyJurisdiction) { + queuePacketToNode(nodeID, codeColorBuffer, length); + } + } + } +} + + void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length) { +//printf("void VoxelEditPacketSender::queueVoxelEditMessage(unsigned char* codeColorBuffer, length=%ld)\n", length); + if (!_shouldSend) { return; // bail early } // If we don't have voxel jurisdictions, then we will simply queue up all of these packets and wait till we have // jurisdictions for processing - if (!_voxelServerJurisdictions) { - EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length); - _preJurisidictionPackets.push_back(packet); + if (!voxelServersExist()) { + if (_maxPendingMessages > 0) { + EditPacketBuffer* packet = new EditPacketBuffer(type, codeColorBuffer, length); + _preServerPackets.push_back(packet); + + // if we've saved MORE than out max, then clear out the oldest packet... + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + if (allPendingMessages > _maxPendingMessages) { + printf("Pending messages exceed Max Pending Messages, discarding oldest message.\n"); + EditPacketBuffer* packet = _preServerPackets.front(); + delete packet; + _preServerPackets.erase(_preServerPackets.begin()); + } + } return; // bail early } @@ -169,13 +268,17 @@ void VoxelEditPacketSender::queueVoxelEditMessage(PACKET_TYPE type, unsigned cha } void VoxelEditPacketSender::releaseQueuedMessages() { + //printf("void VoxelEditPacketSender::releaseQueuedMessages()\n"); + // if we don't yet have jurisdictions then we can't actually release messages yet because we don't // know where to send them to. Instead, just remember this request and when we eventually get jurisdictions // call release again at that time. - if (!_voxelServerJurisdictions) { + if (!voxelServersExist()) { + //printf("...no voxel servers... _releaseQueuedMessagesPending=true\n"); _releaseQueuedMessagesPending = true; } else { for (std::map::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + //printf("...actually calling releaseQueuedPacket()\n"); releaseQueuedPacket(i->second); } } @@ -196,11 +299,13 @@ void VoxelEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PAC } bool VoxelEditPacketSender::process() { + printf("VoxelEditPacketSender::process()\n"); // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those // before doing our normal process step. This processPreJurisdictionPackets() - if (_voxelServerJurisdictions && !_preJurisidictionPackets.empty()) { - processPreJurisdictionPackets(); + if (voxelServersExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { + printf("processPreServerExistsPackets()<<<<<<<<<<<<<<<<<<<<<\n"); + processPreServerExistsPackets(); } // base class does most of the work. diff --git a/libraries/voxels/src/VoxelEditPacketSender.h b/libraries/voxels/src/VoxelEditPacketSender.h index 3c8eec3c00..4b9ffa377e 100644 --- a/libraries/voxels/src/VoxelEditPacketSender.h +++ b/libraries/voxels/src/VoxelEditPacketSender.h @@ -27,8 +27,8 @@ public: ssize_t _currentSize; }; -/// Threaded processor for queueing and sending of outbound edit voxel packets. -class VoxelEditPacketSender : public PacketSender { +/// Utility for processing, packing, queueing and sending of outbound edit voxel messages. +class VoxelEditPacketSender : public virtual PacketSender { public: VoxelEditPacketSender(PacketSenderNotify* notify = NULL); ~VoxelEditPacketSender(); @@ -36,18 +36,22 @@ public: /// Send voxel edit message immediately void sendVoxelEditMessage(PACKET_TYPE type, VoxelDetail& detail); - /// Queues a voxel edit message. Will potentially sends a pending multi-command packet. Determines which voxel-server - /// node or nodes the packet should be sent to. + /// Queues a single voxel edit message. Will potentially send a pending multi-command packet. Determines which voxel-server + /// node or nodes the packet should be sent to. Can be called even before voxel servers are known, in which case up to + /// MaxPendingMessages will be buffered and processed when voxel servers are known. void queueVoxelEditMessage(PACKET_TYPE type, unsigned char* codeColorBuffer, ssize_t length); /// Queues an array of several voxel edit messages. Will potentially send a pending multi-command packet. Determines - /// which voxel-server node or nodes the packet should be sent to. + /// which voxel-server node or nodes the packet should be sent to. Can be called even before voxel servers are known, in + /// which case up to MaxPendingMessages will be buffered and processed when voxel servers are known. void queueVoxelEditMessages(PACKET_TYPE type, int numberOfDetails, VoxelDetail* details); - /// releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message + /// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to /// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular - /// interval to ensure that the packets are actually sent. + /// interval to ensure that the packets are actually sent. Can be called even before voxel servers are known, in + /// which case up to MaxPendingMessages of the released messages will be buffered and actually released when + /// voxel servers are known. void releaseQueuedMessages(); /// are we in sending mode. If we're not in sending mode then all packets and messages will be ignored and @@ -60,27 +64,44 @@ public: void setShouldSend(bool shouldSend) { _shouldSend = shouldSend; } /// call this to inform the VoxelEditPacketSender of the voxel server jurisdictions. This is required for normal operation. - /// As initialized the VoxelEditPacketSender will wait until it knows of the jurisdictions before queuing and sending - /// packets to voxel servers. + /// The internal contents of the jurisdiction map may change throughout the lifetime of the VoxelEditPacketSender. This map + /// can be set prior to voxel servers being present, so long as the contents of the map accurately reflect the current + /// known jurisdictions. void setVoxelServerJurisdictions(NodeToJurisdictionMap* voxelServerJurisdictions) { - _voxelServerJurisdictions = voxelServerJurisdictions; + _voxelServerJurisdictions = voxelServerJurisdictions; } /// if you're running in non-threaded mode, you must call this method regularly virtual bool process(); + /// Set the desired number of pending messages that the VoxelEditPacketSender should attempt to queue even if voxel + /// servers are not present. This only applies to how the VoxelEditPacketSender will manage messages when no voxel + /// servers are present. By default, this value is the same as the default packets that will be sent in one second. + /// Which means the VoxelEditPacketSender will not buffer all messages given to it if no voxel servers are present. + /// This is the maximum number of queued messages and single messages. + void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; } + + // the default number of pending messages we will store if no voxel servers are available + const int DEFAULT_MAX_PENDING_MESSAGES = PacketSender::DEFAULT_PACKETS_PER_SECOND; + private: bool _shouldSend; void queuePacketToNode(uint16_t nodeID, unsigned char* bufferOut, ssize_t sizeOut); + void queueVoxelEditMessageToNodes(unsigned char* codeColorBuffer, ssize_t length); void initializePacket(EditPacketBuffer& packetBuffer, PACKET_TYPE type); void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet bool voxelServersExist() const; - void processPreJurisdictionPackets(); + void processPreServerExistsPackets(); + // These are packets which are destined from know servers but haven't been released because they're still too small std::map _pendingEditPackets; - std::vector _preJurisidictionPackets; + + // These are packets that are waiting to be processed because we don't yet know if there are voxel servers + int _maxPendingMessages; + bool _releaseQueuedMessagesPending; + std::vector _preServerPackets; // these will get packed into other larger packets + std::vector _preServerSingleMessagePackets; // these will go out as is NodeToJurisdictionMap* _voxelServerJurisdictions; - bool _releaseQueuedMessagesPending; }; #endif // __shared__VoxelEditPacketSender__