diff --git a/libraries/entities/src/EntityEditPacketSender.cpp b/libraries/entities/src/EntityEditPacketSender.cpp index 168b0cd446..90740948ce 100644 --- a/libraries/entities/src/EntityEditPacketSender.cpp +++ b/libraries/entities/src/EntityEditPacketSender.cpp @@ -93,6 +93,11 @@ void EntityEditPacketSender::queueEditEntityMessage(PacketType type, QByteArray bufferOut(NLPacket::maxPayloadSize(type), 0); + if (type == PacketType::EntityAdd) { + auto MAX_ADD_DATA_SIZE = NLPacket::maxPayloadSize(type) * 10; // a really big buffer + bufferOut.resize(MAX_ADD_DATA_SIZE); + } + OctreeElement::AppendState encodeResult = OctreeElement::PARTIAL; // start the loop assuming there's more to send auto nodeList = DependencyManager::get(); @@ -115,6 +120,7 @@ void EntityEditPacketSender::queueEditEntityMessage(PacketType type, qCDebug(entities) << " id:" << entityItemID; qCDebug(entities) << " properties:" << properties; #endif + queueOctreeEditMessage(type, bufferOut); if (type == PacketType::EntityAdd && !properties.getCertificateID().isEmpty()) { emit addingEntityWithCertificate(properties.getCertificateID(), DependencyManager::get()->getPlaceName()); diff --git a/libraries/networking/src/NodeList.h b/libraries/networking/src/NodeList.h index b3a12153e5..0ebc1f0b22 100644 --- a/libraries/networking/src/NodeList.h +++ b/libraries/networking/src/NodeList.h @@ -40,6 +40,9 @@ const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000; const int MAX_SILENT_DOMAIN_SERVER_CHECK_INS = 5; +using PacketOrPacketList = std::pair, std::unique_ptr>; +using NodePacketOrPacketListPair = std::pair; + using NodePacketPair = std::pair>; using NodeSharedPacketPair = std::pair>; using NodeSharedReceivedMessagePair = std::pair>; diff --git a/libraries/networking/src/PacketSender.cpp b/libraries/networking/src/PacketSender.cpp index 0cfd67cc4e..02c4815f1f 100644 --- a/libraries/networking/src/PacketSender.cpp +++ b/libraries/networking/src/PacketSender.cpp @@ -53,7 +53,19 @@ void PacketSender::queuePacketForSending(const SharedNodePointer& destinationNod _totalBytesQueued += packet->getDataSize(); lock(); - _packets.push_back({destinationNode, std::move(packet)}); + _packets.push_back({destinationNode, PacketOrPacketList { std::move(packet), nullptr} }); + unlock(); + + // Make sure to wake our actual processing thread because we now have packets for it to process. + _hasPackets.wakeAll(); +} + +void PacketSender::queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr packetList) { + _totalPacketsQueued += packetList->getNumPackets(); + _totalBytesQueued += packetList->getMessageSize(); + + lock(); + _packets.push_back({ destinationNode, PacketOrPacketList { nullptr, std::move(packetList)} }); unlock(); // Make sure to wake our actual processing thread because we now have packets for it to process. @@ -178,8 +190,8 @@ bool PacketSender::nonThreadedProcess() { float averagePacketsPerCall = 0; // might be less than 1, if our caller calls us more frequently than the target PPS - int packetsSentThisCall = 0; - int packetsToSendThisCall = 0; + size_t packetsSentThisCall = 0; + size_t packetsToSendThisCall = 0; // Since we're in non-threaded mode, we need to determine how many packets to send per call to process // based on how often we get called... We do this by keeping a running average of our call times, and we determine @@ -265,24 +277,32 @@ bool PacketSender::nonThreadedProcess() { while ((packetsSentThisCall < packetsToSendThisCall) && (packetsLeft > 0)) { lock(); - NodePacketPair packetPair = std::move(_packets.front()); + NodePacketOrPacketListPair packetPair = std::move(_packets.front()); _packets.pop_front(); packetsLeft = _packets.size(); unlock(); // send the packet through the NodeList... - DependencyManager::get()->sendUnreliablePacket(*packetPair.second, *packetPair.first); + //PacketOrPacketList packetOrList = packetPair.second; + bool sendAsPacket = packetPair.second.first.get(); + size_t packetSize = sendAsPacket ? packetPair.second.first->getDataSize() : packetPair.second.second->getMessageSize(); + size_t packetCount = sendAsPacket ? 1 : packetPair.second.second->getNumPackets(); - packetsSentThisCall++; - _packetsOverCheckInterval++; - _totalPacketsSent++; + if (sendAsPacket) { + DependencyManager::get()->sendUnreliablePacket(*packetPair.second.first, *packetPair.first); + } else { + DependencyManager::get()->sendPacketList(std::move(packetPair.second.second), *packetPair.first); + } + + + packetsSentThisCall += packetCount; + _packetsOverCheckInterval += packetCount; + _totalPacketsSent += packetCount; - int packetSize = packetPair.second->getDataSize(); _totalBytesSent += packetSize; - emit packetSent(packetSize); - + emit packetSent(packetSize); // FIXME should include number of packets? _lastSendTime = now; } return isStillRunning(); diff --git a/libraries/networking/src/PacketSender.h b/libraries/networking/src/PacketSender.h index 68faeaca47..fead49df72 100644 --- a/libraries/networking/src/PacketSender.h +++ b/libraries/networking/src/PacketSender.h @@ -39,6 +39,7 @@ public: /// Add packet to outbound queue. void queuePacketForSending(const SharedNodePointer& destinationNode, std::unique_ptr packet); + void queuePacketListForSending(const SharedNodePointer& destinationNode, std::unique_ptr packetList); void setPacketsPerSecond(int packetsPerSecond); int getPacketsPerSecond() const { return _packetsPerSecond; } @@ -99,14 +100,14 @@ protected: SimpleMovingAverage _averageProcessCallTime; private: - std::list _packets; + std::list _packets; quint64 _lastSendTime; bool threadedProcess(); bool nonThreadedProcess(); quint64 _lastPPSCheck; - int _packetsOverCheckInterval; + size_t _packetsOverCheckInterval; quint64 _started; quint64 _totalPacketsSent; diff --git a/libraries/networking/src/ReceivedMessage.cpp b/libraries/networking/src/ReceivedMessage.cpp index 6ca249fb22..00b16908ce 100644 --- a/libraries/networking/src/ReceivedMessage.cpp +++ b/libraries/networking/src/ReceivedMessage.cpp @@ -53,7 +53,6 @@ ReceivedMessage::ReceivedMessage(QByteArray byteArray, PacketType packetType, Pa _senderSockAddr(senderSockAddr), _isComplete(true) { - } void ReceivedMessage::setFailed() { diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index f3c9ece9fe..9cb383df41 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -115,6 +115,22 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu }); } +// This method is called when the edit packet layer has determined that it has a fully formed packet destined for +// a known nodeID. +void OctreeEditPacketSender::queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr packetList) { + DependencyManager::get()->eachNode([&](const SharedNodePointer& node) { + // only send to the NodeTypes that are getMyNodeType() + if (node->getType() == getMyNodeType() + && ((node->getUUID() == nodeUUID) || (nodeUUID.isNull())) + && node->getActiveSocket()) { + + // NOTE: unlike packets, the packet lists don't get rewritten sequence numbers. + // or do history for resend + queuePacketListForSending(node, std::move(packetList)); + } + }); +} + void OctreeEditPacketSender::processPreServerExistsPackets() { assert(serversExist()); // we should only be here if we have jurisdictions @@ -247,33 +263,65 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, QByteArray& }); } if (isMyJurisdiction) { - std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID]; - if (!bufferedPacket) { - bufferedPacket = initializePacket(type, node->getClockSkewUsec()); - } else { - // If we're switching type, then we send the last one and start over - if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) || - (editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) { + // for edit messages, we will attempt to combine multiple edit commands where possible, we + // don't do this for add because we send those reliably + if (type == PacketType::EntityAdd) { - // create the new packet and swap it with the packet in _pendingEditPackets - auto packetToRelease = initializePacket(type, node->getClockSkewUsec()); - bufferedPacket.swap(packetToRelease); + auto newPacket = NLPacketList::create(type, QByteArray(), true, true); + auto nodeClockSkew = node->getClockSkewUsec(); - // release the previously buffered packet - releaseQueuedPacket(nodeUUID, std::move(packetToRelease)); + // pack sequence number + quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; + newPacket->writePrimitive(sequence); + + // pack in timestamp + quint64 now = usecTimestampNow() + nodeClockSkew; + newPacket->writePrimitive(now); + + + // We call this virtual function that allows our specific type of EditPacketSender to + // fixup the buffer for any clock skew + if (nodeClockSkew != 0) { + adjustEditPacketForClockSkew(type, editMessage, nodeClockSkew); } - } - // This is really the first time we know which server/node this particular edit message - // is going to, so we couldn't adjust for clock skew till now. But here's our chance. - // We call this virtual function that allows our specific type of EditPacketSender to - // fixup the buffer for any clock skew - if (node->getClockSkewUsec() != 0) { - adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec()); - } + newPacket->write(editMessage); - bufferedPacket->write(editMessage); + // release the new packet + releaseQueuedPacketList(nodeUUID, std::move(newPacket)); + + } else { + + std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now + + if (!bufferedPacket) { + bufferedPacket = initializePacket(type, node->getClockSkewUsec()); + } else { + // If we're switching type, then we send the last one and start over + if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) || + (editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) { + + // create the new packet and swap it with the packet in _pendingEditPackets + auto packetToRelease = initializePacket(type, node->getClockSkewUsec()); + bufferedPacket.swap(packetToRelease); + + // release the previously buffered packet + releaseQueuedPacket(nodeUUID, std::move(packetToRelease)); + } + } + + // This is really the first time we know which server/node this particular edit message + // is going to, so we couldn't adjust for clock skew till now. But here's our chance. + // We call this virtual function that allows our specific type of EditPacketSender to + // fixup the buffer for any clock skew + if (node->getClockSkewUsec() != 0) { + adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec()); + } + + bufferedPacket->write(editMessage); + + } } } }); @@ -291,15 +339,24 @@ void OctreeEditPacketSender::releaseQueuedMessages() { } else { _packetsQueueLock.lock(); for (auto& i : _pendingEditPackets) { - if (i.second) { + if (i.second.first) { // construct a null unique_ptr to an NL packet std::unique_ptr releasedPacket; // swap the null ptr with the packet we want to release - i.second.swap(releasedPacket); + i.second.first.swap(releasedPacket); // move and release the queued packet releaseQueuedPacket(i.first, std::move(releasedPacket)); + } else if (i.second.second) { + // construct a null unique_ptr to an NLPacketList + std::unique_ptr releasedPacketList; + + // swap the null ptr with the NLPacketList we want to release + i.second.second.swap(releasedPacketList); + + // move and release the queued NLPacketList + releaseQueuedPacketList(i.first, std::move(releasedPacketList)); } } @@ -315,6 +372,14 @@ void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::uniqu _releaseQueuedPacketMutex.unlock(); } +void OctreeEditPacketSender::releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr packetList) { + _releaseQueuedPacketMutex.lock(); + if (packetList->getMessageSize() > 0 && packetList->getType() != PacketType::Unknown) { + queuePacketListToNode(nodeID, std::move(packetList)); + } + _releaseQueuedPacketMutex.unlock(); +} + std::unique_ptr OctreeEditPacketSender::initializePacket(PacketType type, qint64 nodeClockSkew) { auto newPacket = NLPacket::create(type); diff --git a/libraries/octree/src/OctreeEditPacketSender.h b/libraries/octree/src/OctreeEditPacketSender.h index fd8cc85f91..79c363bec5 100644 --- a/libraries/octree/src/OctreeEditPacketSender.h +++ b/libraries/octree/src/OctreeEditPacketSender.h @@ -87,15 +87,18 @@ protected: bool _shouldSend; void queuePacketToNode(const QUuid& nodeID, std::unique_ptr packet); + void queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr packetList); + void queuePendingPacketToNodes(std::unique_ptr packet); void queuePacketToNodes(std::unique_ptr packet); std::unique_ptr initializePacket(PacketType type, qint64 nodeClockSkew); void releaseQueuedPacket(const QUuid& nodeUUID, std::unique_ptr packetBuffer); // releases specific queued packet + void releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr packetList); void processPreServerExistsPackets(); // These are packets which are destined from know servers but haven't been released because they're still too small - std::unordered_map> _pendingEditPackets; + std::unordered_map _pendingEditPackets; // These are packets that are waiting to be processed because we don't yet know if there are servers int _maxPendingMessages; diff --git a/libraries/octree/src/OctreePacketData.cpp b/libraries/octree/src/OctreePacketData.cpp index b5b4a161ef..a2aad33058 100644 --- a/libraries/octree/src/OctreePacketData.cpp +++ b/libraries/octree/src/OctreePacketData.cpp @@ -35,7 +35,13 @@ OctreePacketData::OctreePacketData(bool enableCompression, int targetSize) { void OctreePacketData::changeSettings(bool enableCompression, unsigned int targetSize) { _enableCompression = enableCompression; - _targetSize = std::min(MAX_OCTREE_UNCOMRESSED_PACKET_SIZE, targetSize); + _targetSize = targetSize; + _uncompressedByteArray.resize(_targetSize); + _compressedByteArray.resize(_targetSize); + + _uncompressed = (unsigned char*)_uncompressedByteArray.data(); + _compressed = (unsigned char*)_compressedByteArray.data(); + reset(); } @@ -689,6 +695,8 @@ int OctreePacketData::unpackDataFromBytes(const unsigned char *dataBytes, QVecto uint16_t length; memcpy(&length, dataBytes, sizeof(uint16_t)); dataBytes += sizeof(length); + + // FIXME - this size check is wrong if we allow larger packets if (length * sizeof(glm::vec3) > MAX_OCTREE_UNCOMRESSED_PACKET_SIZE) { result.resize(0); return sizeof(uint16_t); @@ -702,6 +710,8 @@ int OctreePacketData::unpackDataFromBytes(const unsigned char *dataBytes, QVecto uint16_t length; memcpy(&length, dataBytes, sizeof(uint16_t)); dataBytes += sizeof(length); + + // FIXME - this size check is wrong if we allow larger packets if (length * sizeof(glm::quat) > MAX_OCTREE_UNCOMRESSED_PACKET_SIZE) { result.resize(0); return sizeof(uint16_t); @@ -720,6 +730,8 @@ int OctreePacketData::unpackDataFromBytes(const unsigned char* dataBytes, QVecto uint16_t length; memcpy(&length, dataBytes, sizeof(uint16_t)); dataBytes += sizeof(length); + + // FIXME - this size check is wrong if we allow larger packets if (length * sizeof(float) > MAX_OCTREE_UNCOMRESSED_PACKET_SIZE) { result.resize(0); return sizeof(uint16_t); @@ -733,6 +745,8 @@ int OctreePacketData::unpackDataFromBytes(const unsigned char* dataBytes, QVecto uint16_t length; memcpy(&length, dataBytes, sizeof(uint16_t)); dataBytes += sizeof(length); + + // FIXME - this size check is wrong if we allow larger packets if (length / 8 > MAX_OCTREE_UNCOMRESSED_PACKET_SIZE) { result.resize(0); return sizeof(uint16_t); diff --git a/libraries/octree/src/OctreePacketData.h b/libraries/octree/src/OctreePacketData.h index 37c171504b..09eb134124 100644 --- a/libraries/octree/src/OctreePacketData.h +++ b/libraries/octree/src/OctreePacketData.h @@ -279,7 +279,8 @@ private: unsigned int _targetSize; bool _enableCompression; - unsigned char _uncompressed[MAX_OCTREE_UNCOMRESSED_PACKET_SIZE]; + QByteArray _uncompressedByteArray; + unsigned char* _uncompressed { nullptr }; int _bytesInUse; int _bytesAvailable; int _subTreeAt; @@ -288,7 +289,8 @@ private: bool compressContent(); - unsigned char _compressed[MAX_OCTREE_UNCOMRESSED_PACKET_SIZE]; + QByteArray _compressedByteArray; + unsigned char* _compressed { nullptr }; int _compressedBytes; int _bytesInUseLastCheck; bool _dirty;