diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 7fc93b4f64..1eb687fe82 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -12,6 +12,7 @@ #ifndef hifi_LimitedNodeList_h #define hifi_LimitedNodeList_h +#include #include #include #include @@ -145,14 +146,20 @@ public: // const HifiSockAddr& overridenSockAddr = HifiSockAddr()); // - qint64 sendUnreliablePacket(std::unique_ptr& packet, const SharedNodePointer& destinationNode) {}; - qint64 sendUnreliablePacket(std::unique_ptr& packet, const HifiSockAddr& sockAddr) {}; + qint64 sendUnreliablePacket(std::unique_ptr& packet, const SharedNodePointer& destinationNode) + { assert(false); return 0; } + qint64 sendUnreliablePacket(std::unique_ptr& packet, const HifiSockAddr& sockAddr) + { assert(false); return 0; } - qint64 sendPacket(std::unique_ptr packet, const SharedNodePointer& destinationNode) {}; - qint64 sendPacket(std::unique_ptr packet, const HifiSockAddr& sockAddr) {}; + qint64 sendPacket(std::unique_ptr packet, const SharedNodePointer& destinationNode) + { assert(false); return 0; } + qint64 sendPacket(std::unique_ptr packet, const HifiSockAddr& sockAddr) + { assert(false); return 0; } - qint64 sendPacketList(NLPacketList& packetList, const SharedNodePointer& destinationNode) {}; - qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr) {}; + qint64 sendPacketList(NLPacketList& packetList, const SharedNodePointer& destinationNode) + { assert(false); return 0; } + qint64 sendPacketList(NLPacketList& packetList, const HifiSockAddr& sockAddr) + { assert(false); return 0; } void (*linkedDataCreateCallback)(Node *); @@ -177,7 +184,8 @@ public: int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet); int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet); - unsigned broadcastToNodes(std::unique_ptr packet, const NodeSet& destinationNodeTypes) {}; + unsigned broadcastToNodes(std::unique_ptr packet, const NodeSet& destinationNodeTypes) + { assert(false); return 0; } SharedNodePointer soloNodeOfType(char nodeType); void getPacketStats(float &packetsPerSecond, float &bytesPerSecond); @@ -293,7 +301,8 @@ protected: const QUuid& peerRequestID = QUuid()); qint64 sendPacket(std::unique_ptr packet, const SharedNodePointer& destinationNode, - const HifiSockAddr& overridenSockAddr) {}; + const HifiSockAddr& overridenSockAddr) + { assert(false); return 0; } QUuid _sessionUUID; diff --git a/libraries/octree/src/JurisdictionMap.h b/libraries/octree/src/JurisdictionMap.h index ad6e4ee833..eccaef0a60 100644 --- a/libraries/octree/src/JurisdictionMap.h +++ b/libraries/octree/src/JurisdictionMap.h @@ -20,6 +20,7 @@ #include #include +#include #include class JurisdictionMap { @@ -29,7 +30,7 @@ public: WITHIN, BELOW }; - + // standard constructors JurisdictionMap(NodeType_t type = NodeType::EntityServer); // default constructor JurisdictionMap(const JurisdictionMap& other); // copy constructor @@ -42,8 +43,8 @@ public: JurisdictionMap(JurisdictionMap&& other); // move constructor JurisdictionMap& operator= (JurisdictionMap&& other); // move assignment #endif - - // application constructors + + // application constructors JurisdictionMap(const char* filename); JurisdictionMap(unsigned char* rootOctalCode, const std::vector& endNodes); JurisdictionMap(const char* rootHextString, const char* endNodesHextString); @@ -62,15 +63,15 @@ public: int unpackFromMessage(const unsigned char* sourceBuffer, int availableBytes); std::unique_ptr packIntoMessage(); - + /// Available to pack an empty or unknown jurisdiction into a network packet, used when no JurisdictionMap is available static std::unique_ptr packEmptyJurisdictionIntoMessage(NodeType_t type); void displayDebugDetails() const; - + NodeType_t getNodeType() const { return _nodeType; } void setNodeType(NodeType_t type) { _nodeType = type; } - + private: void copyContents(const JurisdictionMap& other); // use assignment instead void clear(); @@ -81,7 +82,7 @@ private: NodeType_t _nodeType; }; -/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are +/// Map between node IDs and their reported JurisdictionMap. Typically used by classes that need to know which nodes are /// managing which jurisdictions. class NodeToJurisdictionMap : public QMap, public QReadWriteLock {}; typedef QMap::iterator NodeToJurisdictionMapIterator; diff --git a/libraries/octree/src/JurisdictionSender.cpp b/libraries/octree/src/JurisdictionSender.cpp index 9493061354..e9aa8f99bf 100644 --- a/libraries/octree/src/JurisdictionSender.cpp +++ b/libraries/octree/src/JurisdictionSender.cpp @@ -56,7 +56,7 @@ bool JurisdictionSender::process() { SharedNodePointer node = DependencyManager::get()->nodeWithUUID(nodeUUID); if (node && node->getActiveSocket()) { - _packetSender.queuePacketForSending(node, packet); + _packetSender.queuePacketForSending(node, std::move(packet)); nodeCount++; } } diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 0b1bd02cf7..77bc6a9c70 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -26,8 +26,7 @@ OctreeEditPacketSender::OctreeEditPacketSender() : _shouldSend(true), _maxPendingMessages(DEFAULT_MAX_PENDING_MESSAGES), _releaseQueuedMessagesPending(false), - _serverJurisdictions(NULL), - _destinationWalletUUID() + _serverJurisdictions(NULL) { } @@ -103,17 +102,15 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu quint64 queuedAt = usecTimestampNow(); quint64 transitTime = queuedAt - createdAt; - qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << buffer[0] << - " - command to node bytes=" << length << - " satoshiCost=" << satoshiCost << - " sequence=" << sequence << - " transitTimeSoFar=" << transitTime << " usecs"; + qCDebug(octree) << "OctreeEditPacketSender::queuePacketToNode() queued " << packet->readType() + << " - command to node bytes=" << packet->getSizeWithHeader() + << " sequence=" << sequence << " transitTimeSoFar=" << transitTime << " usecs"; } // add packet to history _sentPacketHistories[nodeUUID].packetSent(sequence, packet); - queuePacketForSending(node, packet); + queuePacketForSending(node, std::move(packet)); } }); } @@ -130,10 +127,10 @@ void OctreeEditPacketSender::processPreServerExistsPackets() { } // Then "process" all the packable messages... - while (!_preServerPackets.empty()) { + while (!_preServerEdits.empty()) { EditMessageTuple editMessage = std::move(_preServerEdits.front()); - queueOctreeEditMessage(editMessage.first(), editMessage.second(), editMessage.third()); - _preServerPackets.pop_front(); + queueOctreeEditMessage(std::get<0>(editMessage), std::get<1>(editMessage), std::get<2>(editMessage)); + _preServerEdits.pop_front(); } _pendingPacketsLock.unlock(); @@ -153,7 +150,7 @@ void OctreeEditPacketSender::queuePendingPacketToNodes(std::unique_ptr _pendingPacketsLock.lock(); _preServerSingleMessagePackets.push_back(packet); // if we've saved MORE than our max, then clear out the oldest packet... - int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerEdits.size(); if (allPendingMessages > _maxPendingMessages) { _preServerSingleMessagePackets.pop_front(); } @@ -168,7 +165,7 @@ void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr packet assert(serversExist()); // we must have jurisdictions to be here!! - const unsigned char* octCode = reinterpret_cast(packet->getPayload()) + sizeof(short) + sizeof(quint64); + const unsigned char* octCode = reinterpret_cast(packet->getPayload()) + sizeof(short) + sizeof(quint64); // We want to filter out edit messages for servers based on the server's Jurisdiction // But we can't really do that with a packed message, since each edit message could be destined @@ -190,7 +187,7 @@ void OctreeEditPacketSender::queuePacketToNodes(std::unique_ptr packet if (isMyJurisdiction) { // make a copy of this packet for this node and queue auto packetCopy = NLPacket::createCopy(packet); - queuePacketToNode(std::move(packetCopy)); + queuePacketToNode(nodeUUID, std::move(packetCopy)); } } }); @@ -213,7 +210,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi _preServerEdits.push_back(messageTuple); // if we've saved MORE than out max, then clear out the oldest packet... - int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerPackets.size(); + int allPendingMessages = _preServerSingleMessagePackets.size() + _preServerEdits.size(); if (allPendingMessages > _maxPendingMessages) { _preServerEdits.pop_front(); } @@ -255,7 +252,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi bufferedPacket = NLPacket::create(type); } else { // If we're switching type, then we send the last one and start over - if ((type != bufferedPacket->getType() && bufferedPacket->getSizeUsed() > 0) || + if ((type != bufferedPacket->readType() && bufferedPacket->getSizeUsed() > 0) || (length >= bufferedPacket->bytesAvailable())) { // create the new packet and swap it with the packet in _pendingEditPackets @@ -263,7 +260,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType::Value type, unsi bufferedPacket.swap(packetToRelease); // release the previously buffered packet - releaseQueuedPacket(packetToRelease); + releaseQueuedPacket(nodeUUID, std::move(packetToRelease)); } } @@ -306,12 +303,10 @@ void OctreeEditPacketSender::releaseQueuedMessages() { } } -void OctreeEditPacketSender::releaseQueuedPacket(std::unique_ptr packet) { +void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::unique_ptr packet) { _releaseQueuedPacketMutex.lock(); - if (packetBuffer._currentSize > 0 && packetBuffer._currentType != PacketType::Unknown) { - queuePacketToNode(std::move(packet)); - packetBuffer._currentSize = 0; - packetBuffer._currentType = PacketType::Unknown; + if (packet->getSizeUsed() > 0 && packet->readType() != PacketType::Unknown) { + queuePacketToNode(nodeID, std::move(packet)); } _releaseQueuedPacketMutex.unlock(); } @@ -363,10 +358,10 @@ void OctreeEditPacketSender::processNackPacket(const QByteArray& packet) { dataAt += sizeof(unsigned short int); // retrieve packet from history - const QByteArray* packet = sentPacketHistory.getPacket(sequenceNumber); + const std::unique_ptr& packet = sentPacketHistory.getPacket(sequenceNumber); if (packet) { const SharedNodePointer& node = DependencyManager::get()->nodeWithUUID(sendingNodeUUID); - queuePacketForSending(node, *packet); + queuePacketForSending(node, NLPacket::createCopy(packet)); } } } diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index fdc4bfc189..22ad9c6679 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -29,7 +29,7 @@ public: /// Queues a single edit message. Will potentially send a pending multi-command packet. Determines which server /// node or nodes the packet should be sent to. Can be called even before servers are known, in which case up to /// MaxPendingMessages will be buffered and processed when servers are known. - void queueOctreeEditMessage(EditMessageTuple); + void queueOctreeEditMessage(PacketType::Value type, unsigned char* editPacketBuffer, size_t length); /// Releases all queued messages even if those messages haven't filled an MTU packet. This will move the packed message /// packets onto the send queue. If running in threaded mode, the caller does not need to do any further processing to @@ -82,15 +82,12 @@ public: public slots: void nodeKilled(SharedNodePointer node); -signals: - void octreePaymentRequired(qint64 satoshiAmount, const QUuid& nodeUUID, const QUuid& destinationWalletUUID); - protected: using EditMessageTuple = std::tuple; bool _shouldSend; void queuePacketToNode(const QUuid& nodeID, std::unique_ptr packet); - void queuePendingPacketToNodes(PacketType::Value type, unsigned char* buffer, size_t length); + void queuePendingPacketToNodes(std::unique_ptr packet); void queuePacketToNodes(std::unique_ptr packet); std::unique_ptr initializePacket(PacketType::Value type, int nodeClockSkew); void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr packetBuffer); // releases specific queued packet