From e13360b1b6faebce3d592943f621b344b7b3749d Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 8 Jul 2015 12:18:38 -0700 Subject: [PATCH] initial changes to OctreeEditPacketSender for new API --- .../networking/src/SentPacketHistory.cpp | 8 +- libraries/networking/src/SentPacketHistory.h | 2 +- libraries/octree/src/EditPacketBuffer.cpp | 30 ---- libraries/octree/src/EditPacketBuffer.h | 34 ---- .../octree/src/OctreeEditPacketSender.cpp | 169 ++++++++---------- libraries/octree/src/OctreeEditPacketSender.h | 67 +++---- .../octree/src/OctreeScriptingInterface.h | 14 +- 7 files changed, 110 insertions(+), 214 deletions(-) delete mode 100644 libraries/octree/src/EditPacketBuffer.cpp delete mode 100644 libraries/octree/src/EditPacketBuffer.h diff --git a/libraries/networking/src/SentPacketHistory.cpp b/libraries/networking/src/SentPacketHistory.cpp index 2a90f0f094..770973f8d8 100644 --- a/libraries/networking/src/SentPacketHistory.cpp +++ b/libraries/networking/src/SentPacketHistory.cpp @@ -24,7 +24,7 @@ SentPacketHistory::SentPacketHistory(int size) } -void SentPacketHistory::packetSent(uint16_t sequenceNumber, const NLPacket& packet) { +void SentPacketHistory::packetSent(uint16_t sequenceNumber, const std::unique_ptr& packet) { // check if given seq number has the expected value. if not, something's wrong with // the code calling this function @@ -34,10 +34,8 @@ void SentPacketHistory::packetSent(uint16_t sequenceNumber, const NLPacket& pack << "Expected:" << expectedSequenceNumber << "Actual:" << sequenceNumber; } _newestSequenceNumber = sequenceNumber; - - auto temp = std::unique_ptr(const_cast(&packet)); - _sentPackets.insert(NLPacket::createCopy(temp)); - temp.release(); + + _sentPackets.insert(NLPacket::createCopy(packet)); } const std::unique_ptr& SentPacketHistory::getPacket(uint16_t sequenceNumber) const { diff --git a/libraries/networking/src/SentPacketHistory.h b/libraries/networking/src/SentPacketHistory.h index 08aa2f9764..ad03f87f88 100644 --- a/libraries/networking/src/SentPacketHistory.h +++ b/libraries/networking/src/SentPacketHistory.h @@ -24,7 +24,7 @@ class SentPacketHistory { public: SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP); - void packetSent(uint16_t sequenceNumber, const NLPacket& packet); + void packetSent(uint16_t sequenceNumber, const std::unique_ptr& packet); const std::unique_ptr& getPacket(uint16_t sequenceNumber) const; private: diff --git a/libraries/octree/src/EditPacketBuffer.cpp b/libraries/octree/src/EditPacketBuffer.cpp deleted file mode 100644 index 5e2f74549c..0000000000 --- a/libraries/octree/src/EditPacketBuffer.cpp +++ /dev/null @@ -1,30 +0,0 @@ -// -// EditPacketBuffer.cpp -// libraries/octree/src -// -// Created by Stephen Birarda on 2014-07-30. -// Copyright 2014 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include "EditPacketBuffer.h" - -EditPacketBuffer::EditPacketBuffer() : - _nodeUUID(), - _currentType(PacketType::Unknown), - _currentSize(0), - _satoshiCost(0) -{ - -} - -EditPacketBuffer::EditPacketBuffer(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost, QUuid nodeUUID) : - _nodeUUID(nodeUUID), - _currentType(type), - _currentSize(length), - _satoshiCost(satoshiCost) -{ - memcpy(_currentBuffer, buffer, length); -} \ No newline at end of file diff --git a/libraries/octree/src/EditPacketBuffer.h b/libraries/octree/src/EditPacketBuffer.h deleted file mode 100644 index 9a7ec47b9a..0000000000 --- a/libraries/octree/src/EditPacketBuffer.h +++ /dev/null @@ -1,34 +0,0 @@ -// -// EditPacketBuffer.h -// libraries/octree/src -// -// Created by Stephen Birarda on 2014-07-30. -// Copyright 2014 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#ifndef hifi_EditPacketBuffer_h -#define hifi_EditPacketBuffer_h - -#include - -#include -#include - -/// Used for construction of edit packets -class EditPacketBuffer { -public: - EditPacketBuffer(); - EditPacketBuffer(PacketType::Value type, unsigned char* codeColorBuffer, size_t length, - qint64 satoshiCost = 0, const QUuid nodeUUID = QUuid()); - - QUuid _nodeUUID; - PacketType::Value _currentType; - unsigned char _currentBuffer[MAX_PACKET_SIZE]; - size_t _currentSize; - qint64 _satoshiCost; -}; - -#endif // hifi_EditPacketBuffer_h \ No newline at end of file diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 260f49e4b8..0b1bd02cf7 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -27,7 +27,6 @@ OctreeEditPacketSender::OctreeEditPacketSender() : _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), _releaseQueuedMessagesPending(false), _serverJurisdictions(NULL), - _maxPacketSize(MAX_PACKET_SIZE), _destinationWalletUUID() { @@ -35,16 +34,8 @@ OctreeEditPacketSender::OctreeEditPacketSender() : OctreeEditPacketSender::~OctreeEditPacketSender() { _pendingPacketsLock.lock(); - while (!_preServerSingleMessagePackets.empty()) { - EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); - delete packet; - _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); - } - while (!_preServerPackets.empty()) { - EditPacketBuffer* packet = _preServerPackets.front(); - delete packet; - _preServerPackets.erase(_preServerPackets.begin()); - } + _preServerSingleMessagePackets.clear(); + _preServerEdits.clear(); _pendingPacketsLock.unlock(); } @@ -82,8 +73,7 @@ bool OctreeEditPacketSender::serversExist() const { // This method is called when the edit packet layer has determined that it has a fully formed packet destined for // a known nodeID. -void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned char* buffer, - size_t length, qint64 satoshiCost) { +void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::unique_ptr packet) { bool wantDebug = false; DependencyManager::get()->eachNode([&](const SharedNodePointer& node){ @@ -92,40 +82,38 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, unsigned c && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) && node->getActiveSocket()) { + // jump to the beginning of the payload + packet->seek(0); + // pack sequence number - int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); - unsigned char* sequenceAt = buffer + numBytesPacketHeader; quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; - memcpy(sequenceAt, &sequence, sizeof(quint16)); + packet->write(reinterpret_cast(&sequence), sizeof(sequence)); - // send packet - QByteArray packet(reinterpret_cast(buffer), length); + // debugging output... + if (wantDebug) { + unsigned short int sequence; + quint64 createdAt; - queuePacketForSending(node, packet); + packet->seek(0); - if (hasDestinationWalletUUID() && satoshiCost > 0) { - // if we have a destination wallet UUID and a cost associated with this packet, signal that it - // needs to be sent - emit octreePaymentRequired(satoshiCost, nodeUUID, _destinationWalletUUID); + // read the sequence number and createdAt + packet->read(&sequence); + packet->read(&createdAt); + + quint64 queuedAt = usecTimestampNow(); + quint64 transitTime = queuedAt - createdAt; + + qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] << + " - command to node bytes=" << length << + " satoshiCost=" << satoshiCost << + " sequence=" << sequence << + " transitTimeSoFar=" << transitTime << " usecs"; } // add packet to history _sentPacketHistories[nodeUUID].packetSent(sequence, packet); - // debugging output... - if (wantDebug) { - int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast(buffer)); - unsigned short int sequence = (*((unsigned short int*)(buffer + numBytesPacketHeader))); - quint64 createdAt = (*((quint64*)(buffer + numBytesPacketHeader + sizeof(sequence)))); - quint64 queuedAt = usecTimestampNow(); - quint64 transitTime = queuedAt - createdAt; - - qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] << - " - command to node bytes=" << length << - " satoshiCost=" << satoshiCost << - " sequence=" << sequence << - " transitTimeSoFar=" << transitTime << " usecs"; - } + queuePacketForSending(node, packet); } }); } @@ -136,19 +124,18 @@ void OctreeEditPacketSender::processPreServerExistsPackets() { // First send out all the single message packets... _pendingPacketsLock.lock(); while (!_preServerSingleMessagePackets.empty()) { - EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); - queuePacketToNodes(&packet->_currentBuffer[0], packet->_currentSize, packet->_satoshiCost); - delete packet; - _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + std::unique_ptr packet = std::move(_preServerSingleMessagePackets.front()); + queuePacketToNodes(std::move(packet)); + _preServerSingleMessagePackets.pop_front(); } // Then "process" all the packable messages... while (!_preServerPackets.empty()) { - EditPacketBuffer* packet = _preServerPackets.front(); - queueOctreeEditMessage(packet->_currentType, &packet->_currentBuffer[0], packet->_currentSize); - delete packet; - _preServerPackets.erase(_preServerPackets.begin()); + EditMessageTuple editMessage = std::move(_preServerEdits.front()); + queueOctreeEditMessage(editMessage.first(), editMessage.second(), editMessage.third()); + _preServerPackets.pop_front(); } + _pendingPacketsLock.unlock(); // if while waiting for the jurisdictions the caller called releaseQueuedMessages() @@ -159,34 +146,29 @@ void OctreeEditPacketSender::processPreServerExistsPackets() { } } -void OctreeEditPacketSender::queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, - size_t length, qint64 satoshiCost) { +void OctreeEditPacketSender::queuePendingPacketToNodes(std::unique_ptr packet) { // If we're asked to save messages while waiting for voxel servers to arrive, then do so... if (_maxPendingMessages > 0) { - EditPacketBuffer* packet = new EditPacketBuffer(type, buffer, length, satoshiCost); _pendingPacketsLock.lock(); _preServerSingleMessagePackets.push_back(packet); // if we've saved MORE than our max, then clear out the oldest packet... int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); if (allPendingMessages > _maxPendingMessages) { - EditPacketBuffer* packet = _preServerSingleMessagePackets.front(); - delete packet; - _preServerSingleMessagePackets.erase(_preServerSingleMessagePackets.begin()); + _preServerSingleMessagePackets.pop_front(); } _pendingPacketsLock.unlock(); } } -void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t length, qint64 satoshiCost) { +void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr packet) { if (!_shouldSend) { return; // bail early } assert(serversExist()); // we must have jurisdictions to be here!! - int headerBytes = numBytesForPacketHeader(reinterpret_cast(buffer)) + sizeof(short) + sizeof(quint64); - unsigned char* octCode = buffer + headerBytes; // skip the packet header to get to the octcode + const unsigned char* octCode = reinterpret_cast(packet->getPayload()) + sizeof(short) + sizeof(quint64); // We want to filter out edit messages for servers based on the server's Jurisdiction // But we can't really do that with a packed message, since each edit message could be destined @@ -204,8 +186,11 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le const JurisdictionMap& map = (*_serverJurisdictions)[nodeUUID]; isMyJurisdiction = (map.isMyJurisdiction(octCode, CHECK_NODE_ONLY) == JurisdictionMap::WITHIN); _serverJurisdictions->unlock(); + if (isMyJurisdiction) { - queuePacketToNode(nodeUUID, buffer, length, satoshiCost); + // make a copy of this packet for this node and queue + auto packetCopy = NLPacket::createCopy(packet); + queuePacketToNode(std::move(packetCopy)); } } }); @@ -213,8 +198,7 @@ void OctreeEditPacketSender::queuePacketToNodes(unsigned char* buffer, size_t le // NOTE: editPacketBuffer - is JUST the octcode/color and does not contain the packet header! -void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, - size_t length, qint64 satoshiCost) { +void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length) { if (!_shouldSend) { return; // bail early @@ -224,16 +208,14 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi // jurisdictions for processing if (!serversExist()) { if (_maxPendingMessages > 0) { - EditPacketBuffer* packet = new EditPacketBuffer(type, editPacketBuffer, length); + EditMessageTuple messageTuple { type, editPacketBuffer, length }; _pendingPacketsLock.lock(); - _preServerPackets.push_back(packet); + _preServerEdits.push_back(messageTuple); // if we've saved MORE than out max, then clear out the oldest packet... int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); if (allPendingMessages > _maxPendingMessages) { - EditPacketBuffer* packet = _preServerPackets.front(); - delete packet; - _preServerPackets.erase(_preServerPackets.begin()); + _preServerEdits.pop_front(); } _pendingPacketsLock.unlock(); } @@ -267,19 +249,22 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi _serverJurisdictions->unlock(); } if (isMyJurisdiction) { - EditPacketBuffer& packetBuffer = _pendingEditPackets[nodeUUID]; - packetBuffer._nodeUUID = nodeUUID; + std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID]; - // If we're switching type, then we send the last one and start over - if ((type != packetBuffer._currentType && packetBuffer._currentSize > 0) || - (packetBuffer._currentSize + length >= (size_t)_maxPacketSize)) { - releaseQueuedPacket(packetBuffer); - initializePacket(packetBuffer, type, node->getClockSkewUsec()); - } + if (!bufferedPacket) { + bufferedPacket = NLPacket::create(type); + } else { + // If we're switching type, then we send the last one and start over + if ((type != bufferedPacket->getType() && bufferedPacket->getSizeUsed() > 0) || + (length >= bufferedPacket->bytesAvailable())) { - // If the buffer is empty and not correctly initialized for our type... - if (type != packetBuffer._currentType && packetBuffer._currentSize == 0) { - initializePacket(packetBuffer, type, node->getClockSkewUsec()); + // create the new packet and swap it with the packet in _pendingEditPackets + auto packetToRelease = initializePacket(type, node->getClockSkewUsec()); + bufferedPacket.swap(packetToRelease); + + // release the previously buffered packet + releaseQueuedPacket(packetToRelease); + } } // This is really the first time we know which server/node this particular edit message @@ -290,9 +275,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi adjustEditPacketForClockSkew(type, editPacketBuffer, length, node->getClockSkewUsec()); } - memcpy(&packetBuffer._currentBuffer[packetBuffer._currentSize], editPacketBuffer, length); - packetBuffer._currentSize += length; - packetBuffer._satoshiCost += satoshiCost; + bufferedPacket->write(reinterpret_cast(editPacketBuffer), length); } } }); @@ -309,47 +292,45 @@ void OctreeEditPacketSender::releaseQueuedMessages() { _releaseQueuedMessagesPending = true; } else { _packetsQueueLock.lock(); - for (QHash::iterator i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { - releaseQueuedPacket(i.value()); + for (auto i = _pendingEditPackets.begin(); i != _pendingEditPackets.end(); i++) { + // construct a null unique_ptr to an NL packet + std::unique_ptr releasedPacket; + + // swap the null ptr with the packet we want to release + i.value().swap(releasedPacket); + + // move and release the queued packet + releaseQueuedPacket(i.key(), std::move(releasedPacket)); } _packetsQueueLock.unlock(); } } -void OctreeEditPacketSender::releaseQueuedPacket(EditPacketBuffer& packetBuffer) { +void OctreeEditPacketSender::releaseQueuedPacket(std::unique_ptr packet) { _releaseQueuedPacketMutex.lock(); if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketType::Unknown) { - queuePacketToNode(packetBuffer._nodeUUID, &packetBuffer._currentBuffer[0], - packetBuffer._currentSize, packetBuffer._satoshiCost); + queuePacketToNode(std::move(packet)); packetBuffer._currentSize = 0; packetBuffer._currentType = PacketType::Unknown; } _releaseQueuedPacketMutex.unlock(); } -void OctreeEditPacketSender::initializePacket(EditPacketBuffer& packetBuffer, PacketType::Value type, int nodeClockSkew) { - packetBuffer._currentSize = - DependencyManager::get()->populatePacketHeader(reinterpret_cast(&packetBuffer._currentBuffer[0]), type); +std::unique_ptr OctreeEditPacketSender::initializePacket(PacketType::Value type, int nodeClockSkew) { + auto newPacket = NLPacket::create(type); // skip over sequence number for now; will be packed when packet is ready to be sent out - packetBuffer._currentSize += sizeof(quint16); + newPacket->seek(sizeof(quint16)); // pack in timestamp quint64 now = usecTimestampNow() + nodeClockSkew; - quint64* timeAt = (quint64*)&packetBuffer._currentBuffer[packetBuffer._currentSize]; - *timeAt = now; - packetBuffer._currentSize += sizeof(quint64); // nudge past timestamp - - packetBuffer._currentType = type; - - // reset cost for packet to 0 - packetBuffer._satoshiCost = 0; + newPacket->write(reinterpret_cast(&now), sizeof(now)); } bool OctreeEditPacketSender::process() { // if we have server jurisdiction details, and we have pending pre-jurisdiction packets, then process those // before doing our normal process step. This processPreJurisdictionPackets() - if (serversExist() && (!_preServerPackets.empty() || !_preServerSingleMessagePackets.empty() )) { + if (serversExist() && (!_preServerEdits.empty() || !_preServerSingleMessagePackets.empty() )) { processPreServerExistsPackets(); } diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index caeeea5508..fdc4bfc189 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -16,7 +16,6 @@ #include #include -#include "EditPacketBuffer.h" #include "JurisdictionMap.h" #include "SentPacketHistory.h" @@ -26,17 +25,17 @@ class OctreeEditPacketSender : public PacketSender { public: OctreeEditPacketSender(); ~OctreeEditPacketSender(); - - /// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server - /// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to - /// MaxPendingMessages will be buffered and processed when servers are known. - void queueOctreeEditMessage(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost = 0); - /// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message + /// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server + /// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to + /// MaxPendingMessages will be buffered and processed when servers are known. + void queueOctreeEditMessage(EditMessageTuple); + + /// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to /// have these packets get sent. If running in non-threaded mode, the caller must still call process() on a regular - /// interval to ensure that the packets are actually sent. Can be called even before servers are known, in - /// which case up to MaxPendingMessages of the released messages will be buffered and actually released when + /// interval to ensure that the packets are actually sent. Can be called even before servers are known, in + /// which case up to MaxPendingMessages of the released messages will be buffered and actually released when /// servers are known. void releaseQueuedMessages(); @@ -53,7 +52,7 @@ public: /// The internal contents of the jurisdiction map may change throughout the lifetime of the OctreeEditPacketSender. This map /// can be set prior to servers being present, so long as the contents of the map accurately reflect the current /// known jurisdictions. - void setServerJurisdictions(NodeToJurisdictionMap* serverJurisdictions) { + void setServerJurisdictions(NodeToJurisdictionMap* serverJurisdictions) { _serverJurisdictions = serverJurisdictions; } @@ -61,33 +60,23 @@ public: virtual bool process(); /// Set the desired number of pending messages that the OctreeEditPacketSender should attempt to queue even if - /// servers are not present. This only applies to how the OctreeEditPacketSender will manage messages when no + /// servers are not present. This only applies to how the OctreeEditPacketSender will manage messages when no /// servers are present. By default, this value is the same as the default packets that will be sent in one second. - /// Which means the OctreeEditPacketSender will not buffer all messages given to it if no servers are present. + /// Which means the OctreeEditPacketSender will not buffer all messages given to it if no servers are present. /// This is the maximum number of queued messages and single messages. void setMaxPendingMessages(int maxPendingMessages) { _maxPendingMessages = maxPendingMessages; } // the default number of pending messages we will store if no servers are available static const int DEFAULT_MAX_PENDING_MESSAGES; - // is there an octree server available to send packets to + // is there an octree server available to send packets to bool serversExist() const; - /// Set the desired max packet size in bytes that the OctreeEditPacketSender should create - void setMaxPacketSize(int maxPacketSize) { _maxPacketSize = maxPacketSize; } - - /// returns the current desired max packet size in bytes that the OctreeEditPacketSender will create - int getMaxPacketSize() const { return _maxPacketSize; } - // you must override these... virtual char getMyNodeType() const = 0; - virtual void adjustEditPacketForClockSkew(PacketType::Value type, + virtual void adjustEditPacketForClockSkew(PacketType::Value type, unsigned char* editPacketBuffer, size_t length, int clockSkew) { } - - bool hasDestinationWalletUUID() const { return !_destinationWalletUUID.isNull(); } - void setDestinationWalletUUID(const QUuid& destinationWalletUUID) { _destinationWalletUUID = destinationWalletUUID; } - const QUuid& getDestinationWalletUUID() { return _destinationWalletUUID; } - + void processNackPacket(const QByteArray& packet); public slots: @@ -95,38 +84,36 @@ public slots: signals: void octreePaymentRequired(qint64 satoshiAmount, const QUuid& nodeUUID, const QUuid& destinationWalletUUID); - + protected: + using EditMessageTuple = std::tuple; + bool _shouldSend; - void queuePacketToNode(const QUuid& nodeID, unsigned char* buffer, size_t length, qint64 satoshiCost = 0); - void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length, qint64 satoshiCost = 0); - void queuePacketToNodes(unsigned char* buffer, size_t length, qint64 satoshiCost = 0); - void initializePacket(EditPacketBuffer& packetBuffer, PacketType::Value type, int nodeClockSkew); - void releaseQueuedPacket(EditPacketBuffer& packetBuffer); // releases specific queued packet - + void queuePacketToNode(const QUuid& nodeID, std::unique_ptr packet); + void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length); + void queuePacketToNodes(std::unique_ptr packet); + std::unique_ptr initializePacket(PacketType::Value type, int nodeClockSkew); + void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr packetBuffer); // releases specific queued packet + void processPreServerExistsPackets(); // These are packets which are destined from know servers but haven't been released because they're still too small - QHash _pendingEditPackets; - + QHash> _pendingEditPackets; + // These are packets that are waiting to be processed because we don't yet know if there are servers int _maxPendingMessages; bool _releaseQueuedMessagesPending; QMutex _pendingPacketsLock; QMutex _packetsQueueLock; // don't let different threads release the queue while another thread is writing to it - QVector _preServerPackets; // these will get packed into other larger packets - QVector _preServerSingleMessagePackets; // these will go out as is + std::list _preServerEdits; // these will get packed into other larger packets + std::list> _preServerSingleMessagePackets; // these will go out as is NodeToJurisdictionMap* _serverJurisdictions; - - int _maxPacketSize; QMutex _releaseQueuedPacketMutex; // TODO: add locks for this and _pendingEditPackets QHash _sentPacketHistories; QHash _outgoingSequenceNumbers; - - QUuid _destinationWalletUUID; }; #endif // hifi_OctreeEditPacketSender_h diff --git a/libraries/octree/src/OctreeScriptingInterface.h b/libraries/octree/src/OctreeScriptingInterface.h index 52d7c8be34..ea897bbb4f 100644 --- a/libraries/octree/src/OctreeScriptingInterface.h +++ b/libraries/octree/src/OctreeScriptingInterface.h @@ -21,7 +21,7 @@ class OctreeScriptingInterface : public QObject { Q_OBJECT public: - OctreeScriptingInterface(OctreeEditPacketSender* packetSender = NULL, + OctreeScriptingInterface(OctreeEditPacketSender* packetSender = NULL, JurisdictionListener* jurisdictionListener = NULL); ~OctreeScriptingInterface(); @@ -31,20 +31,14 @@ public: void setPacketSender(OctreeEditPacketSender* packetSender); void setJurisdictionListener(JurisdictionListener* jurisdictionListener); void init(); - + virtual NodeType_t getServerNodeType() const = 0; virtual OctreeEditPacketSender* createPacketSender() = 0; private slots: void cleanupManagedObjects(); - + public slots: - /// Set the desired max packet size in bytes that should be created - void setMaxPacketSize(int maxPacketSize) { return _packetSender->setMaxPacketSize(maxPacketSize); } - - /// returns the current desired max packet size in bytes that will be created - int getMaxPacketSize() const { return _packetSender->getMaxPacketSize(); } - /// set the max packets per second send rate void setPacketsPerSecond(int packetsPerSecond) { return _packetSender->setPacketsPerSecond(packetsPerSecond); } @@ -65,7 +59,7 @@ public slots: /// returns the bytes per second send rate of this object over its lifetime float getLifetimeBPS() const { return _packetSender->getLifetimeBPS(); } - + /// returns the packets per second queued rate of this object over its lifetime float getLifetimePPSQueued() const { return _packetSender->getLifetimePPSQueued(); }