make adds go over NLPacketList as reliable

This commit is contained in:
ZappoMan 2017-11-16 13:16:23 -08:00
parent 4c4f23e312
commit e438ac54d5
5 changed files with 77 additions and 22 deletions

View file

@ -72,6 +72,7 @@ void EntityServer::aboutToFinish() {
}
void EntityServer::handleEntityPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
qDebug() << __FUNCTION__ << "from:" << senderNode->getUUID() << "type:" << message->getType();
if (_octreeInboundPacketProcessor) {
_octreeInboundPacketProcessor->queueReceivedPacket(message, senderNode);
}

View file

@ -76,12 +76,15 @@ void OctreeInboundPacketProcessor::midProcess() {
}
void OctreeInboundPacketProcessor::processPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
qDebug() << __FUNCTION__ << "from:" << sendingNode->getUUID() << "type:" << message->getType();
if (_shuttingDown) {
qDebug() << "OctreeInboundPacketProcessor::processPacket() while shutting down... ignoring incoming packet";
return;
}
bool debugProcessPacket = _myServer->wantsVerboseDebug();
bool debugProcessPacket = true; // _myServer->wantsVerboseDebug();
if (debugProcessPacket) {
qDebug("OctreeInboundPacketProcessor::processPacket() payload=%p payloadLength=%lld",

View file

@ -64,6 +64,8 @@ void PacketSender::queuePacketListForSending(const SharedNodePointer& destinatio
_totalPacketsQueued += packetList->getNumPackets();
_totalBytesQueued += packetList->getMessageSize();
qDebug() << __FUNCTION__ << "to:" << destinationNode->getUUID() << "type:" << packetList->getType() << "_totalPacketsQueued:" << _totalPacketsQueued << "_totalBytesQueued:" << _totalBytesQueued;
lock();
_packets.push_back({ destinationNode, PacketOrPacketList { nullptr, std::move(packetList)} });
unlock();
@ -287,8 +289,12 @@ bool PacketSender::nonThreadedProcess() {
//PacketOrPacketList packetOrList = packetPair.second;
bool sendAsPacket = packetPair.second.first.get();
if (sendAsPacket) {
qDebug() << __FUNCTION__ << "sendUnreliablePacket() to:" << packetPair.first->getUUID() << "type:" << packetPair.second.first->getType();
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packetPair.second.first, *packetPair.first);
} else {
qDebug() << __FUNCTION__ << "sendPacketList() to:" << packetPair.first->getUUID() << "type:" << packetPair.second.second->getType();
DependencyManager::get<NodeList>()->sendPacketList(*packetPair.second.second, *packetPair.first);
}
@ -303,6 +309,10 @@ bool PacketSender::nonThreadedProcess() {
_totalBytesSent += packetSize;
emit packetSent(packetSize); // FIXME should include number of packets?
qDebug() << __FUNCTION__ << "packetsSentThisCall:" << packetsSentThisCall
<< "_packetsOverCheckInterval:" << _packetsOverCheckInterval
<< "_totalPacketsSent:" << _totalPacketsSent;
_lastSendTime = now;
}
return isStillRunning();

View file

@ -25,6 +25,10 @@ void ReceivedPacketProcessor::terminating() {
}
void ReceivedPacketProcessor::queueReceivedPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
qDebug() << __FUNCTION__ << "from:" << sendingNode->getUUID() << "type:" << message->getType();
lock();
_packets.push_back({ sendingNode, message });
_nodePacketCounts[sendingNode->getUUID()]++;

View file

@ -119,6 +119,8 @@ void OctreeEditPacketSender::queuePacketToNode(const QUuid& nodeUUID, std::uniqu
// a known nodeID.
void OctreeEditPacketSender::queuePacketListToNode(const QUuid& nodeUUID, std::unique_ptr<NLPacketList> packetList) {
qDebug() << __FUNCTION__ << "to:" << nodeUUID << "type:" << packetList->getType();
bool wantDebug = false;
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node) {
// only send to the NodeTypes that are getMyNodeType()
@ -268,33 +270,65 @@ void OctreeEditPacketSender::queueOctreeEditMessage(PacketType type, QByteArray&
});
}
if (isMyJurisdiction) {
std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now
if (!bufferedPacket) {
bufferedPacket = initializePacket(type, node->getClockSkewUsec());
} else {
// If we're switching type, then we send the last one and start over
if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) ||
(editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) {
// for edit messages, we will attempt to combine multiple edit commands where possible, we
// don't do this for add because we send those reliably
if (type == PacketType::EntityAdd) {
// create the new packet and swap it with the packet in _pendingEditPackets
auto packetToRelease = initializePacket(type, node->getClockSkewUsec());
bufferedPacket.swap(packetToRelease);
auto newPacket = NLPacketList::create(type, QByteArray(), true, true);
auto nodeClockSkew = node->getClockSkewUsec();
// release the previously buffered packet
releaseQueuedPacket(nodeUUID, std::move(packetToRelease));
// pack sequence number
quint16 sequence = _outgoingSequenceNumbers[nodeUUID]++;
newPacket->writePrimitive(sequence);
// pack in timestamp
quint64 now = usecTimestampNow() + nodeClockSkew;
newPacket->writePrimitive(now);
// We call this virtual function that allows our specific type of EditPacketSender to
// fixup the buffer for any clock skew
if (nodeClockSkew != 0) {
adjustEditPacketForClockSkew(type, editMessage, nodeClockSkew);
}
}
// This is really the first time we know which server/node this particular edit message
// is going to, so we couldn't adjust for clock skew till now. But here's our chance.
// We call this virtual function that allows our specific type of EditPacketSender to
// fixup the buffer for any clock skew
if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec());
}
newPacket->write(editMessage);
bufferedPacket->write(editMessage);
// release the new packet
releaseQueuedPacketList(nodeUUID, std::move(newPacket));
} else {
std::unique_ptr<NLPacket>& bufferedPacket = _pendingEditPackets[nodeUUID].first; //only a NLPacket for now
if (!bufferedPacket) {
bufferedPacket = initializePacket(type, node->getClockSkewUsec());
} else {
// If we're switching type, then we send the last one and start over
if ((type != bufferedPacket->getType() && bufferedPacket->getPayloadSize() > 0) ||
(editMessage.size() >= bufferedPacket->bytesAvailableForWrite())) {
// create the new packet and swap it with the packet in _pendingEditPackets
auto packetToRelease = initializePacket(type, node->getClockSkewUsec());
bufferedPacket.swap(packetToRelease);
// release the previously buffered packet
releaseQueuedPacket(nodeUUID, std::move(packetToRelease));
}
}
// This is really the first time we know which server/node this particular edit message
// is going to, so we couldn't adjust for clock skew till now. But here's our chance.
// We call this virtual function that allows our specific type of EditPacketSender to
// fixup the buffer for any clock skew
if (node->getClockSkewUsec() != 0) {
adjustEditPacketForClockSkew(type, editMessage, node->getClockSkewUsec());
}
bufferedPacket->write(editMessage);
}
}
}
});
@ -346,6 +380,9 @@ void OctreeEditPacketSender::releaseQueuedPacket(const QUuid& nodeID, std::uniqu
}
void OctreeEditPacketSender::releaseQueuedPacketList(const QUuid& nodeID, std::unique_ptr<NLPacketList> packetList) {
qDebug() << __FUNCTION__ << "to:" << nodeID << "type:" << packetList->getType();
_releaseQueuedPacketMutex.lock();
if (packetList->getMessageSize() > 0 && packetList->getType() != PacketType::Unknown) {
queuePacketListToNode(nodeID, std::move(packetList));