From c6947dd165691887ac64e0fae778be96af0d6b4e Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Thu, 9 Nov 2017 18:20:06 -0800 Subject: [PATCH] some side by side plumbing for NLPackets and NLPacketLists --- libraries/networking/src/NodeList.h | 3 ++ libraries/networking/src/PacketSender.cpp | 38 ++++++++++++---- libraries/networking/src/PacketSender.h | 5 ++- .../octree/src/OctreeEditPacketSender.cpp | 44 +++++++++++++++++-- libraries/octree/src/OctreeEditPacketSender.h | 5 ++- 5 files changed, 80 insertions(+), 15 deletions(-) diff --git a/libraries/networking/src/NodeList.h b/libraries/networking/src/NodeList.h index b3a12153e5..0ebc1f0b22 100644 --- a/libraries/networking/src/NodeList.h +++ b/libraries/networking/src/NodeList.h @@ -40,6 +40,9 @@ const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000; const int MAX_SILENT_DOMAIN_SERVER_CHECK_INS = 5; +using PacketOrPacketList = std::pair, std::unique_ptr>; +using NodePacketOrPacketListPair = std::pair; + using NodePacketPair = std::pair>; using NodeSharedPacketPair = std::pair>; using NodeSharedReceivedMessagePair = std::pair>; diff --git a/libraries/networking/src/PacketSender.cpp b/libraries/networking/src/PacketSender.cpp index 0cfd67cc4e..01b78585c2 100644 --- a/libraries/networking/src/PacketSender.cpp +++ b/libraries/networking/src/PacketSender.cpp @@ -53,7 +53,19 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod _totalBytesQueued += packet->getDataSize(); lock(); - _packets.push_back({destinationNode, std::move(packet)}); + _packets.push_back({destinationNode, PacketOrPacketList { std::move(packet), nullptr} }); + unlock(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + _hasPackets.wakeAll(); +} + +void PacketSender::queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr packetList) { + _totalPacketsQueued += packetList->getNumPackets(); + _totalBytesQueued += packetList->getMessageSize(); + + lock(); + _packets.push_back({ destinationNode, PacketOrPacketList { nullptr, std::move(packetList)} }); unlock(); // Make sure to wake our actual processing thread because we now have packets for it to process. @@ -178,7 +190,7 @@ bool PacketSender::nonThreadedProcess() { float averagePacketsPerCall = 0; // might be less than 1, if our caller calls us more frequently than the target PPS - int packetsSentThisCall = 0; + size_t packetsSentThisCall = 0; int packetsToSendThisCall = 0; // Since we're in non-threaded mode, we need to determine how many packets to send per call to process @@ -265,23 +277,31 @@ bool PacketSender::nonThreadedProcess() { while ((packetsSentThisCall < packetsToSendThisCall) && (packetsLeft > 0)) { lock(); - NodePacketPair packetPair = std::move(_packets.front()); + NodePacketOrPacketListPair packetPair = std::move(_packets.front()); _packets.pop_front(); packetsLeft = _packets.size(); unlock(); // send the packet through the NodeList... - DependencyManager::get()->sendUnreliablePacket(*packetPair.second, *packetPair.first); + //PacketOrPacketList packetOrList = packetPair.second; + bool sendAsPacket = packetPair.second.first.get(); + if (sendAsPacket) { + DependencyManager::get()->sendUnreliablePacket(*packetPair.second.first, *packetPair.first); + } else { + DependencyManager::get()->sendPacketList(*packetPair.second.second, *packetPair.first); + } - packetsSentThisCall++; - _packetsOverCheckInterval++; - _totalPacketsSent++; + size_t packetSize = sendAsPacket ? packetPair.second.first->getDataSize() : packetPair.second.second->getMessageSize(); + size_t packetCount = sendAsPacket ? 1 : packetPair.second.second->getNumPackets(); + + packetsSentThisCall += packetCount; + _packetsOverCheckInterval += packetCount; + _totalPacketsSent += packetCount; - int packetSize = packetPair.second->getDataSize(); _totalBytesSent += packetSize; - emit packetSent(packetSize); + emit packetSent(packetSize); // FIXME should include number of packets? _lastSendTime = now; } diff --git a/libraries/networking/src/PacketSender.h b/libraries/networking/src/PacketSender.h index 68faeaca47..fead49df72 100644 --- a/libraries/networking/src/PacketSender.h +++ b/libraries/networking/src/PacketSender.h @@ -39,6 +39,7 @@ public: /// Add packet to outbound queue. void queuePacketForSending(const SharedNodePointer& destinationNode, std::unique_ptr packet); + void queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr packetList); void setPacketsPerSecond(int packetsPerSecond); int getPacketsPerSecond() const { return _packetsPerSecond; } @@ -99,14 +100,14 @@ protected: SimpleMovingAverage _averageProcessCallTime; private: - std::list _packets; + std::list _packets; quint64 _lastSendTime; bool threadedProcess(); bool nonThreadedProcess(); quint64 _lastPPSCheck; - int _packetsOverCheckInterval; + size_t _packetsOverCheckInterval; quint64 _started; quint64 _totalPacketsSent; diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index f3c9ece9fe..7b612a828c 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -115,6 +115,27 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu }); } +// This method is called when the edit packet layer has determined that it has a fully formed packet destined for +// a known nodeID. +void OctreeEditPacketSender::queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr packetList) { + + bool wantDebug = false; + DependencyManager::get()->eachNode([&](const SharedNodePointer& node) { + // only send to the NodeTypes that are getMyNodeType() + if (node->getType() == getMyNodeType() + && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) + && node->getActiveSocket()) { + + // NOTE: unlike packets, the packet lists don't get rewritten sequence numbers. + + // add packet to history -- we don't keep track of sent PacketLists + //_sentPacketHistories[nodeUUID].packetSent(sequence, *packet); + + queuePacketListForSending(node, std::move(packetList)); + } + }); +} + void OctreeEditPacketSender::processPreServerExistsPackets() { assert(serversExist()); // we should only be here if we have jurisdictions @@ -247,7 +268,7 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, QByteArray& }); } if (isMyJurisdiction) { - std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID]; + std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now if (!bufferedPacket) { bufferedPacket = initializePacket(type, node->getClockSkewUsec()); @@ -291,15 +312,24 @@ void OctreeEditPacketSender::releaseQueuedMessages() { } else { _packetsQueueLock.lock(); for (auto& i : _pendingEditPackets) { - if (i.second) { + if (i.second.first) { // construct a null unique_ptr to an NL packet std::unique_ptr releasedPacket; // swap the null ptr with the packet we want to release - i.second.swap(releasedPacket); + i.second.first.swap(releasedPacket); // move and release the queued packet releaseQueuedPacket(i.first, std::move(releasedPacket)); + } else if (i.second.second) { + // construct a null unique_ptr to an NLPacketList + std::unique_ptr releasedPacketList; + + // swap the null ptr with the NLPacketList we want to release + i.second.second.swap(releasedPacketList); + + // move and release the queued NLPacketList + releaseQueuedPacketList(i.first, std::move(releasedPacketList)); } } @@ -315,6 +345,14 @@ void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::uniqu _releaseQueuedPacketMutex.unlock(); } +void OctreeEditPacketSender::releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr packetList) { + _releaseQueuedPacketMutex.lock(); + if (packetList->getMessageSize() > 0 && packetList->getType() != PacketType::Unknown) { + queuePacketListToNode(nodeID, std::move(packetList)); + } + _releaseQueuedPacketMutex.unlock(); +} + std::unique_ptr OctreeEditPacketSender::initializePacket(PacketType type, qint64 nodeClockSkew) { auto newPacket = NLPacket::create(type); diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index fd8cc85f91..79c363bec5 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -87,15 +87,18 @@ protected: bool _shouldSend; void queuePacketToNode(const QUuid& nodeID, std::unique_ptr packet); + void queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr packetList); + void queuePendingPacketToNodes(std::unique_ptr packet); void queuePacketToNodes(std::unique_ptr packet); std::unique_ptr initializePacket(PacketType type, qint64 nodeClockSkew); void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr packetBuffer); // releases specific queued packet + void releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr packetList); void processPreServerExistsPackets(); // These are packets which are destined from know servers but haven't been released because they're still too small - std::unordered_map> _pendingEditPackets; + std::unordered_map _pendingEditPackets; // These are packets that are waiting to be processed because we don't yet know if there are servers int _maxPendingMessages;