From e438ac54d575583d250790a7cf805e599fe21b2f Mon Sep 17 00:00:00 2001 From: ZappoMan Date: Thu, 16 Nov 2017 13:16:23 -0800 Subject: [PATCH] make adds go over NLPacketList as reliable --- .../src/entities/EntityServer.cpp | 1 + .../octree/OctreeInboundPacketProcessor.cpp | 5 +- libraries/networking/src/PacketSender.cpp | 10 +++ .../src/ReceivedPacketProcessor.cpp | 4 + .../octree/src/OctreeEditPacketSender.cpp | 79 ++++++++++++++----- 5 files changed, 77 insertions(+), 22 deletions(-) diff --git a/assignment-client/src/entities/EntityServer.cpp b/assignment-client/src/entities/EntityServer.cpp index 995a5bad27..5417e3f6fe 100644 --- a/assignment-client/src/entities/EntityServer.cpp +++ b/assignment-client/src/entities/EntityServer.cpp @@ -72,6 +72,7 @@ void EntityServer::aboutToFinish() { } void EntityServer::handleEntityPacket(QSharedPointer message, SharedNodePointer senderNode) { + qDebug() << __FUNCTION__ << "from:" << senderNode->getUUID() << "type:" << message->getType(); if (_octreeInboundPacketProcessor) { _octreeInboundPacketProcessor->queueReceivedPacket(message, senderNode); } diff --git a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp index bce6e7fe44..efadb5650a 100644 --- a/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp +++ b/assignment-client/src/octree/OctreeInboundPacketProcessor.cpp @@ -76,12 +76,15 @@ void OctreeInboundPacketProcessor::midProcess() { } void OctreeInboundPacketProcessor::processPacket(QSharedPointer message, SharedNodePointer sendingNode) { + + qDebug() << __FUNCTION__ << "from:" << sendingNode->getUUID() << "type:" << message->getType(); + if (_shuttingDown) { qDebug() << "OctreeInboundPacketProcessor::processPacket() while shutting down... ignoring incoming packet"; return; } - bool debugProcessPacket = _myServer->wantsVerboseDebug(); + bool debugProcessPacket = true; // _myServer->wantsVerboseDebug(); if (debugProcessPacket) { qDebug("OctreeInboundPacketProcessor::processPacket() payload=%p payloadLength=%lld", diff --git a/libraries/networking/src/PacketSender.cpp b/libraries/networking/src/PacketSender.cpp index 01b78585c2..b7fc802883 100644 --- a/libraries/networking/src/PacketSender.cpp +++ b/libraries/networking/src/PacketSender.cpp @@ -64,6 +64,8 @@ void PacketSender::queuePacketListForSending(const SharedNodePointer& destinatio _totalPacketsQueued += packetList->getNumPackets(); _totalBytesQueued += packetList->getMessageSize(); + qDebug() << __FUNCTION__ << "to:" << destinationNode->getUUID() << "type:" << packetList->getType() << "_totalPacketsQueued:" << _totalPacketsQueued << "_totalBytesQueued:" << _totalBytesQueued; + lock(); _packets.push_back({ destinationNode, PacketOrPacketList { nullptr, std::move(packetList)} }); unlock(); @@ -287,8 +289,12 @@ bool PacketSender::nonThreadedProcess() { //PacketOrPacketList packetOrList = packetPair.second; bool sendAsPacket = packetPair.second.first.get(); if (sendAsPacket) { + + qDebug() << __FUNCTION__ << "sendUnreliablePacket() to:" << packetPair.first->getUUID() << "type:" << packetPair.second.first->getType(); DependencyManager::get()->sendUnreliablePacket(*packetPair.second.first, *packetPair.first); } else { + + qDebug() << __FUNCTION__ << "sendPacketList() to:" << packetPair.first->getUUID() << "type:" << packetPair.second.second->getType(); DependencyManager::get()->sendPacketList(*packetPair.second.second, *packetPair.first); } @@ -303,6 +309,10 @@ bool PacketSender::nonThreadedProcess() { _totalBytesSent += packetSize; emit packetSent(packetSize); // FIXME should include number of packets? + qDebug() << __FUNCTION__ << "packetsSentThisCall:" << packetsSentThisCall + << "_packetsOverCheckInterval:" << _packetsOverCheckInterval + << "_totalPacketsSent:" << _totalPacketsSent; + _lastSendTime = now; } return isStillRunning(); diff --git a/libraries/networking/src/ReceivedPacketProcessor.cpp b/libraries/networking/src/ReceivedPacketProcessor.cpp index c18d4ed1e8..9ed17ec586 100644 --- a/libraries/networking/src/ReceivedPacketProcessor.cpp +++ b/libraries/networking/src/ReceivedPacketProcessor.cpp @@ -25,6 +25,10 @@ void ReceivedPacketProcessor::terminating() { } void ReceivedPacketProcessor::queueReceivedPacket(QSharedPointer message, SharedNodePointer sendingNode) { + + qDebug() << __FUNCTION__ << "from:" << sendingNode->getUUID() << "type:" << message->getType(); + + lock(); _packets.push_back({ sendingNode, message }); _nodePacketCounts[sendingNode->getUUID()]++; diff --git a/libraries/octree/src/OctreeEditPacketSender.cpp b/libraries/octree/src/OctreeEditPacketSender.cpp index 7b612a828c..38a34d75dc 100644 --- a/libraries/octree/src/OctreeEditPacketSender.cpp +++ b/libraries/octree/src/OctreeEditPacketSender.cpp @@ -119,6 +119,8 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu // a known nodeID. void OctreeEditPacketSender::queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr packetList) { + qDebug() << __FUNCTION__ << "to:" << nodeUUID << "type:" << packetList->getType(); + bool wantDebug = false; DependencyManager::get()->eachNode([&](const SharedNodePointer& node) { // only send to the NodeTypes that are getMyNodeType() @@ -268,33 +270,65 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, QByteArray& }); } if (isMyJurisdiction) { - std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now - if (!bufferedPacket) { - bufferedPacket = initializePacket(type, node->getClockSkewUsec()); - } else { - // If we're switching type, then we send the last one and start over - if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) || - (editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) { + // for edit messages, we will attempt to combine multiple edit commands where possible, we + // don't do this for add because we send those reliably + if (type == PacketType::EntityAdd) { - // create the new packet and swap it with the packet in _pendingEditPackets - auto packetToRelease = initializePacket(type, node->getClockSkewUsec()); - bufferedPacket.swap(packetToRelease); + auto newPacket = NLPacketList::create(type, QByteArray(), true, true); + auto nodeClockSkew = node->getClockSkewUsec(); - // release the previously buffered packet - releaseQueuedPacket(nodeUUID, std::move(packetToRelease)); + // pack sequence number + quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++; + newPacket->writePrimitive(sequence); + + // pack in timestamp + quint64 now = usecTimestampNow() + nodeClockSkew; + newPacket->writePrimitive(now); + + + // We call this virtual function that allows our specific type of EditPacketSender to + // fixup the buffer for any clock skew + if (nodeClockSkew != 0) { + adjustEditPacketForClockSkew(type, editMessage, nodeClockSkew); } - } - // This is really the first time we know which server/node this particular edit message - // is going to, so we couldn't adjust for clock skew till now. But here's our chance. - // We call this virtual function that allows our specific type of EditPacketSender to - // fixup the buffer for any clock skew - if (node->getClockSkewUsec() != 0) { - adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec()); - } + newPacket->write(editMessage); - bufferedPacket->write(editMessage); + // release the new packet + releaseQueuedPacketList(nodeUUID, std::move(newPacket)); + + } else { + + std::unique_ptr& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now + + if (!bufferedPacket) { + bufferedPacket = initializePacket(type, node->getClockSkewUsec()); + } else { + // If we're switching type, then we send the last one and start over + if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) || + (editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) { + + // create the new packet and swap it with the packet in _pendingEditPackets + auto packetToRelease = initializePacket(type, node->getClockSkewUsec()); + bufferedPacket.swap(packetToRelease); + + // release the previously buffered packet + releaseQueuedPacket(nodeUUID, std::move(packetToRelease)); + } + } + + // This is really the first time we know which server/node this particular edit message + // is going to, so we couldn't adjust for clock skew till now. But here's our chance. + // We call this virtual function that allows our specific type of EditPacketSender to + // fixup the buffer for any clock skew + if (node->getClockSkewUsec() != 0) { + adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec()); + } + + bufferedPacket->write(editMessage); + + } } } }); @@ -346,6 +380,9 @@ void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::uniqu } void OctreeEditPacketSender::releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr packetList) { + + qDebug() << __FUNCTION__ << "to:" << nodeID << "type:" << packetList->getType(); + _releaseQueuedPacketMutex.lock(); if (packetList->getMessageSize() > 0 && packetList->getType() != PacketType::Unknown) { queuePacketListToNode(nodeID, std::move(packetList));