mirror of
https://github.com/overte-org/overte.git
synced 2025-08-09 23:14:34 +02:00
+ OctreeSendThread::traverseTreeAndSendContents()
this abstracts a portion of OctreeSendThread::packetDistributor() which will make it easier to split apart the tree traversal from the sending of packets also cleaned up some of the packet stats tracking
This commit is contained in:
parent
9d111d1f92
commit
15879b2832
2 changed files with 192 additions and 184 deletions
|
@ -128,8 +128,7 @@ AtomicUIntStat OctreeSendThread::_totalSpecialBytes { 0 };
|
||||||
AtomicUIntStat OctreeSendThread::_totalSpecialPackets { 0 };
|
AtomicUIntStat OctreeSendThread::_totalSpecialPackets { 0 };
|
||||||
|
|
||||||
|
|
||||||
int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, int& trueBytesSent,
|
int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, bool dontSuppressDuplicate) {
|
||||||
bool dontSuppressDuplicate) {
|
|
||||||
OctreeServer::didHandlePacketSend(this);
|
OctreeServer::didHandlePacketSend(this);
|
||||||
|
|
||||||
// if we're shutting down, then exit early
|
// if we're shutting down, then exit early
|
||||||
|
@ -140,15 +139,14 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
bool debug = _myServer->wantsDebugSending();
|
bool debug = _myServer->wantsDebugSending();
|
||||||
quint64 now = usecTimestampNow();
|
quint64 now = usecTimestampNow();
|
||||||
|
|
||||||
bool packetSent = false; // did we send a packet?
|
int numPackets = 0;
|
||||||
int packetsSent = 0;
|
|
||||||
|
|
||||||
// Here's where we check to see if this packet is a duplicate of the last packet. If it is, we will silently
|
// Here's where we check to see if this packet is a duplicate of the last packet. If it is, we will silently
|
||||||
// obscure the packet and not send it. This allows the callers and upper level logic to not need to know about
|
// obscure the packet and not send it. This allows the callers and upper level logic to not need to know about
|
||||||
// this rate control savings.
|
// this rate control savings.
|
||||||
if (!dontSuppressDuplicate && nodeData->shouldSuppressDuplicatePacket()) {
|
if (!dontSuppressDuplicate && nodeData->shouldSuppressDuplicatePacket()) {
|
||||||
nodeData->resetOctreePacket(); // we still need to reset it though!
|
nodeData->resetOctreePacket(); // we still need to reset it though!
|
||||||
return packetsSent; // without sending...
|
return numPackets; // without sending...
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we've got a stats message ready to send, then see if we can piggyback them together
|
// If we've got a stats message ready to send, then see if we can piggyback them together
|
||||||
|
@ -165,9 +163,12 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
|
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
|
||||||
// there was nothing else to send.
|
// there was nothing else to send.
|
||||||
int thisWastedBytes = 0;
|
int thisWastedBytes = 0;
|
||||||
|
int numBytes = statsPacket.getDataSize();
|
||||||
_totalWastedBytes += thisWastedBytes;
|
_totalWastedBytes += thisWastedBytes;
|
||||||
_totalBytes += statsPacket.getDataSize();
|
_totalBytes += numBytes;
|
||||||
_totalPackets++;
|
_totalPackets++;
|
||||||
|
_trueBytesSent += numBytes;
|
||||||
|
numPackets++;
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
NLPacket& sentPacket = nodeData->getPacket();
|
NLPacket& sentPacket = nodeData->getPacket();
|
||||||
|
@ -190,18 +191,22 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
// actually send it
|
// actually send it
|
||||||
OctreeServer::didCallWriteDatagram(this);
|
OctreeServer::didCallWriteDatagram(this);
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
|
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
|
||||||
packetSent = true;
|
|
||||||
} else {
|
} else {
|
||||||
// not enough room in the packet, send two packets
|
// not enough room in the packet, send two packets
|
||||||
|
|
||||||
|
// first packet
|
||||||
OctreeServer::didCallWriteDatagram(this);
|
OctreeServer::didCallWriteDatagram(this);
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
|
DependencyManager::get<NodeList>()->sendUnreliablePacket(statsPacket, *node);
|
||||||
|
|
||||||
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
|
// since a stats message is only included on end of scene, don't consider any of these bytes "wasted", since
|
||||||
// there was nothing else to send.
|
// there was nothing else to send.
|
||||||
int thisWastedBytes = 0;
|
int thisWastedBytes = 0;
|
||||||
|
int numBytes = statsPacket.getDataSize();
|
||||||
_totalWastedBytes += thisWastedBytes;
|
_totalWastedBytes += thisWastedBytes;
|
||||||
_totalBytes += statsPacket.getDataSize();
|
_totalBytes += numBytes;
|
||||||
_totalPackets++;
|
_totalPackets++;
|
||||||
|
_trueBytesSent += numBytes;
|
||||||
|
numPackets++;
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
NLPacket& sentPacket = nodeData->getPacket();
|
NLPacket& sentPacket = nodeData->getPacket();
|
||||||
|
@ -220,18 +225,17 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
"] wasted bytes:" << thisWastedBytes << " [" << _totalWastedBytes << "]";
|
"] wasted bytes:" << thisWastedBytes << " [" << _totalWastedBytes << "]";
|
||||||
}
|
}
|
||||||
|
|
||||||
trueBytesSent += statsPacket.getDataSize();
|
// second packet
|
||||||
packetsSent++;
|
|
||||||
|
|
||||||
OctreeServer::didCallWriteDatagram(this);
|
OctreeServer::didCallWriteDatagram(this);
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
|
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
|
||||||
packetSent = true;
|
|
||||||
|
|
||||||
int packetSizeWithHeader = nodeData->getPacket().getDataSize();
|
numBytes = nodeData->getPacket().getDataSize();
|
||||||
thisWastedBytes = udt::MAX_PACKET_SIZE - packetSizeWithHeader;
|
thisWastedBytes = udt::MAX_PACKET_SIZE - numBytes;
|
||||||
_totalWastedBytes += thisWastedBytes;
|
_totalWastedBytes += thisWastedBytes;
|
||||||
_totalBytes += nodeData->getPacket().getDataSize();
|
_totalBytes += numBytes;
|
||||||
_totalPackets++;
|
_totalPackets++;
|
||||||
|
_trueBytesSent += numBytes;
|
||||||
|
numPackets++;
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
NLPacket& sentPacket = nodeData->getPacket();
|
NLPacket& sentPacket = nodeData->getPacket();
|
||||||
|
@ -257,13 +261,13 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
// just send the octree packet
|
// just send the octree packet
|
||||||
OctreeServer::didCallWriteDatagram(this);
|
OctreeServer::didCallWriteDatagram(this);
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
|
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), *node);
|
||||||
packetSent = true;
|
|
||||||
|
|
||||||
int packetSizeWithHeader = nodeData->getPacket().getDataSize();
|
int numBytes = nodeData->getPacket().getDataSize();
|
||||||
int thisWastedBytes = udt::MAX_PACKET_SIZE - packetSizeWithHeader;
|
int thisWastedBytes = udt::MAX_PACKET_SIZE - numBytes;
|
||||||
_totalWastedBytes += thisWastedBytes;
|
_totalWastedBytes += thisWastedBytes;
|
||||||
_totalBytes += packetSizeWithHeader;
|
_totalBytes += numBytes;
|
||||||
_totalPackets++;
|
_totalPackets++;
|
||||||
|
numPackets++;
|
||||||
|
|
||||||
if (debug) {
|
if (debug) {
|
||||||
NLPacket& sentPacket = nodeData->getPacket();
|
NLPacket& sentPacket = nodeData->getPacket();
|
||||||
|
@ -278,22 +282,22 @@ int OctreeSendThread::handlePacketSend(SharedNodePointer node, OctreeQueryNode*
|
||||||
|
|
||||||
qDebug() << "Sending packet at " << now << " [" << _totalPackets <<"]: sequence: " << sequence <<
|
qDebug() << "Sending packet at " << now << " [" << _totalPackets <<"]: sequence: " << sequence <<
|
||||||
" timestamp: " << timestamp <<
|
" timestamp: " << timestamp <<
|
||||||
" size: " << packetSizeWithHeader << " [" << _totalBytes <<
|
" size: " << numBytes << " [" << _totalBytes <<
|
||||||
"] wasted bytes:" << thisWastedBytes << " [" << _totalWastedBytes << "]";
|
"] wasted bytes:" << thisWastedBytes << " [" << _totalWastedBytes << "]";
|
||||||
}
|
}
|
||||||
|
_trueBytesSent += numBytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// remember to track our stats
|
// remember to track our stats
|
||||||
if (packetSent) {
|
if (numPackets > 0) {
|
||||||
nodeData->stats.packetSent(nodeData->getPacket().getPayloadSize());
|
nodeData->stats.packetSent(nodeData->getPacket().getPayloadSize());
|
||||||
trueBytesSent += nodeData->getPacket().getPayloadSize();
|
|
||||||
packetsSent++;
|
|
||||||
nodeData->octreePacketSent();
|
nodeData->octreePacketSent();
|
||||||
nodeData->resetOctreePacket();
|
nodeData->resetOctreePacket();
|
||||||
}
|
}
|
||||||
|
|
||||||
return packetsSent;
|
_truePacketsSent += numPackets;
|
||||||
|
return numPackets;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Version of octree element distributor that sends the deepest LOD level at once
|
/// Version of octree element distributor that sends the deepest LOD level at once
|
||||||
|
@ -312,13 +316,9 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
|
||||||
preDistributionProcessing();
|
preDistributionProcessing();
|
||||||
}
|
}
|
||||||
|
|
||||||
// calculate max number of packets that can be sent during this interval
|
_truePacketsSent = 0;
|
||||||
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND));
|
_trueBytesSent = 0;
|
||||||
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
|
_packetsSentThisInterval = 0;
|
||||||
|
|
||||||
int truePacketsSent = 0;
|
|
||||||
int trueBytesSent = 0;
|
|
||||||
int packetsSentThisInterval = 0;
|
|
||||||
|
|
||||||
bool isFullScene = nodeData->shouldForceFullScene();
|
bool isFullScene = nodeData->shouldForceFullScene();
|
||||||
if (isFullScene) {
|
if (isFullScene) {
|
||||||
|
@ -333,9 +333,7 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
|
||||||
|
|
||||||
// send any waiting packet
|
// send any waiting packet
|
||||||
if (nodeData->isPacketWaiting()) {
|
if (nodeData->isPacketWaiting()) {
|
||||||
int numPackets = handlePacketSend(node, nodeData, trueBytesSent);
|
_packetsSentThisInterval += handlePacketSend(node, nodeData);
|
||||||
truePacketsSent += numPackets;
|
|
||||||
packetsSentThisInterval += numPackets;
|
|
||||||
} else {
|
} else {
|
||||||
nodeData->resetOctreePacket();
|
nodeData->resetOctreePacket();
|
||||||
}
|
}
|
||||||
|
@ -366,9 +364,7 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
|
||||||
//unsigned long encodeTime = nodeData->stats.getTotalEncodeTime();
|
//unsigned long encodeTime = nodeData->stats.getTotalEncodeTime();
|
||||||
//unsigned long elapsedTime = nodeData->stats.getElapsedTime();
|
//unsigned long elapsedTime = nodeData->stats.getElapsedTime();
|
||||||
|
|
||||||
int numPackets = handlePacketSend(node, nodeData, trueBytesSent, isFullScene);
|
_packetsSentThisInterval += handlePacketSend(node, nodeData, isFullScene);
|
||||||
truePacketsSent += numPackets;
|
|
||||||
packetsSentThisInterval += numPackets;
|
|
||||||
|
|
||||||
// If we're starting a full scene, then definitely we want to empty the elementBag
|
// If we're starting a full scene, then definitely we want to empty the elementBag
|
||||||
if (isFullScene) {
|
if (isFullScene) {
|
||||||
|
@ -398,165 +394,42 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
|
||||||
if (!nodeData->elementBag.isEmpty()) {
|
if (!nodeData->elementBag.isEmpty()) {
|
||||||
quint64 start = usecTimestampNow();
|
quint64 start = usecTimestampNow();
|
||||||
|
|
||||||
int extraPackingAttempts = 0;
|
traverseTreeAndSendContents(node, nodeData, viewFrustumChanged, isFullScene);
|
||||||
bool completedScene = false;
|
|
||||||
|
|
||||||
bool somethingToSend = true; // assume we have something
|
|
||||||
while (somethingToSend && packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
|
|
||||||
float lockWaitElapsedUsec = OctreeServer::SKIP_TIME;
|
|
||||||
float encodeElapsedUsec = OctreeServer::SKIP_TIME;
|
|
||||||
float compressAndWriteElapsedUsec = OctreeServer::SKIP_TIME;
|
|
||||||
float packetSendingElapsedUsec = OctreeServer::SKIP_TIME;
|
|
||||||
|
|
||||||
quint64 startInside = usecTimestampNow();
|
|
||||||
|
|
||||||
bool lastNodeDidntFit = false; // assume each node fits
|
|
||||||
if (!nodeData->elementBag.isEmpty()) {
|
|
||||||
|
|
||||||
quint64 lockWaitStart = usecTimestampNow();
|
|
||||||
_myServer->getOctree()->withReadLock([&]{
|
|
||||||
quint64 lockWaitEnd = usecTimestampNow();
|
|
||||||
lockWaitElapsedUsec = (float)(lockWaitEnd - lockWaitStart);
|
|
||||||
quint64 encodeStart = usecTimestampNow();
|
|
||||||
|
|
||||||
OctreeElementPointer subTree = nodeData->elementBag.extract();
|
|
||||||
if (!subTree) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
float octreeSizeScale = nodeData->getOctreeSizeScale();
|
|
||||||
int boundaryLevelAdjustClient = nodeData->getBoundaryLevelAdjust();
|
|
||||||
|
|
||||||
int boundaryLevelAdjust = boundaryLevelAdjustClient +
|
|
||||||
(viewFrustumChanged ? LOW_RES_MOVING_ADJUST : NO_BOUNDARY_ADJUST);
|
|
||||||
|
|
||||||
EncodeBitstreamParams params(INT_MAX, WANT_EXISTS_BITS, DONT_CHOP,
|
|
||||||
viewFrustumChanged, boundaryLevelAdjust, octreeSizeScale,
|
|
||||||
isFullScene, _myServer->getJurisdiction(), nodeData);
|
|
||||||
nodeData->copyCurrentViewFrustum(params.viewFrustum);
|
|
||||||
if (viewFrustumChanged) {
|
|
||||||
nodeData->copyLastKnownViewFrustum(params.lastViewFrustum);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Our trackSend() function is implemented by the server subclass, and will be called back
|
|
||||||
// during the encodeTreeBitstream() as new entities/data elements are sent
|
|
||||||
params.trackSend = [this, node](const QUuid& dataID, quint64 dataEdited) {
|
|
||||||
_myServer->trackSend(dataID, dataEdited, node->getUUID());
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: should this include the lock time or not? This stat is sent down to the client,
|
|
||||||
// it seems like it may be a good idea to include the lock time as part of the encode time
|
|
||||||
// are reported to client. Since you can encode without the lock
|
|
||||||
nodeData->stats.encodeStarted();
|
|
||||||
|
|
||||||
_myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->elementBag, params);
|
|
||||||
|
|
||||||
quint64 encodeEnd = usecTimestampNow();
|
|
||||||
encodeElapsedUsec = (float)(encodeEnd - encodeStart);
|
|
||||||
|
|
||||||
// If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've
|
|
||||||
// sent the entire scene. We want to know this below so we'll actually write this content into
|
|
||||||
// the packet and send it
|
|
||||||
completedScene = nodeData->elementBag.isEmpty();
|
|
||||||
|
|
||||||
if (params.stopReason == EncodeBitstreamParams::DIDNT_FIT) {
|
|
||||||
lastNodeDidntFit = true;
|
|
||||||
extraPackingAttempts++;
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeData->stats.encodeStopped();
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
somethingToSend = false; // this will cause us to drop out of the loop...
|
|
||||||
}
|
|
||||||
|
|
||||||
if (completedScene || lastNodeDidntFit) {
|
|
||||||
// we probably want to flush what has accumulated in nodeData but:
|
|
||||||
// do we have more data to send? and is there room?
|
|
||||||
if (_packetData.hasContent()) {
|
|
||||||
// yes, more data to send
|
|
||||||
quint64 compressAndWriteStart = usecTimestampNow();
|
|
||||||
unsigned int additionalSize = _packetData.getFinalizedSize() + sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
|
|
||||||
if (additionalSize > nodeData->getAvailable()) {
|
|
||||||
// no room --> flush what we've got
|
|
||||||
int numPackets = handlePacketSend(node, nodeData, trueBytesSent);
|
|
||||||
truePacketsSent += numPackets;
|
|
||||||
packetsSentThisInterval += numPackets;
|
|
||||||
}
|
|
||||||
|
|
||||||
// either there is room, or we've flushed and reset nodeData's data buffer
|
|
||||||
// so we can transfer whatever is in _packetData to nodeData
|
|
||||||
nodeData->writeToPacket(_packetData.getFinalizedData(), _packetData.getFinalizedSize());
|
|
||||||
compressAndWriteElapsedUsec = (float)(usecTimestampNow()- compressAndWriteStart);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool sendNow = completedScene ||
|
|
||||||
nodeData->getAvailable() < MINIMUM_ATTEMPT_MORE_PACKING ||
|
|
||||||
extraPackingAttempts > REASONABLE_NUMBER_OF_PACKING_ATTEMPTS;
|
|
||||||
|
|
||||||
int targetSize = MAX_OCTREE_PACKET_DATA_SIZE;
|
|
||||||
if (sendNow) {
|
|
||||||
quint64 packetSendingStart = usecTimestampNow();
|
|
||||||
int numPackets = handlePacketSend(node, nodeData, trueBytesSent);
|
|
||||||
truePacketsSent += numPackets;
|
|
||||||
packetsSentThisInterval += numPackets;
|
|
||||||
quint64 packetSendingEnd = usecTimestampNow();
|
|
||||||
packetSendingElapsedUsec = (float)(packetSendingEnd - packetSendingStart);
|
|
||||||
|
|
||||||
targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
|
|
||||||
extraPackingAttempts = 0;
|
|
||||||
} else {
|
|
||||||
// We want to see if we have room for more in this wire packet but we've copied the _packetData,
|
|
||||||
// so we want to start a new section. We will do that by resetting the packet settings with the max
|
|
||||||
// size of our current available space in the wire packet plus room for our section header and a
|
|
||||||
// little bit of padding.
|
|
||||||
targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE) - COMPRESS_PADDING;
|
|
||||||
}
|
|
||||||
_packetData.changeSettings(true, targetSize); // will do reset - NOTE: Always compressed
|
|
||||||
}
|
|
||||||
OctreeServer::trackTreeWaitTime(lockWaitElapsedUsec);
|
|
||||||
OctreeServer::trackEncodeTime(encodeElapsedUsec);
|
|
||||||
OctreeServer::trackCompressAndWriteTime(compressAndWriteElapsedUsec);
|
|
||||||
OctreeServer::trackPacketSendingTime(packetSendingElapsedUsec);
|
|
||||||
|
|
||||||
quint64 endInside = usecTimestampNow();
|
|
||||||
quint64 elapsedInsideUsecs = endInside - startInside;
|
|
||||||
OctreeServer::trackInsideTime((float)elapsedInsideUsecs);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (somethingToSend && _myServer->wantsVerboseDebug()) {
|
|
||||||
qCDebug(octree) << "Hit PPS Limit, packetsSentThisInterval =" << packetsSentThisInterval
|
|
||||||
<< " maxPacketsPerInterval = " << maxPacketsPerInterval
|
|
||||||
<< " clientMaxPacketsPerInterval = " << clientMaxPacketsPerInterval;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Here's where we can/should allow the server to send other data...
|
// Here's where we can/should allow the server to send other data...
|
||||||
// send the environment packet
|
// send the environment packet
|
||||||
// TODO: should we turn this into a while loop to better handle sending multiple special packets
|
// TODO: should we turn this into a while loop to better handle sending multiple special packets
|
||||||
if (_myServer->hasSpecialPacketsToSend(node) && !nodeData->isShuttingDown()) {
|
if (_myServer->hasSpecialPacketsToSend(node) && !nodeData->isShuttingDown()) {
|
||||||
int specialPacketsSent = 0;
|
int specialPacketsSent = 0;
|
||||||
trueBytesSent += _myServer->sendSpecialPackets(node, nodeData, specialPacketsSent);
|
int specialBytesSent = _myServer->sendSpecialPackets(node, nodeData, specialPacketsSent);
|
||||||
nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed
|
nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed
|
||||||
truePacketsSent += specialPacketsSent;
|
_truePacketsSent += specialPacketsSent;
|
||||||
packetsSentThisInterval += specialPacketsSent;
|
_trueBytesSent += specialBytesSent;
|
||||||
|
_packetsSentThisInterval += specialPacketsSent;
|
||||||
|
|
||||||
_totalPackets += specialPacketsSent;
|
_totalPackets += specialPacketsSent;
|
||||||
_totalBytes += trueBytesSent;
|
_totalBytes += specialBytesSent;
|
||||||
|
|
||||||
_totalSpecialPackets += specialPacketsSent;
|
_totalSpecialPackets += specialPacketsSent;
|
||||||
_totalSpecialBytes += trueBytesSent;
|
_totalSpecialBytes += specialBytesSent;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// calculate max number of packets that can be sent during this interval
|
||||||
|
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND));
|
||||||
|
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
|
||||||
|
|
||||||
// Re-send packets that were nacked by the client
|
// Re-send packets that were nacked by the client
|
||||||
while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) {
|
while (nodeData->hasNextNackedPacket() && _packetsSentThisInterval < maxPacketsPerInterval) {
|
||||||
const NLPacket* packet = nodeData->getNextNackedPacket();
|
const NLPacket* packet = nodeData->getNextNackedPacket();
|
||||||
if (packet) {
|
if (packet) {
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packet, *node);
|
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packet, *node);
|
||||||
truePacketsSent++;
|
int numBytes = packet->getDataSize();
|
||||||
packetsSentThisInterval++;
|
_truePacketsSent++;
|
||||||
|
_trueBytesSent += numBytes;
|
||||||
|
_packetsSentThisInterval++;
|
||||||
|
|
||||||
_totalBytes += packet->getDataSize();
|
|
||||||
_totalPackets++;
|
_totalPackets++;
|
||||||
|
_totalBytes += numBytes;
|
||||||
_totalWastedBytes += udt::MAX_PACKET_SIZE - packet->getDataSize();
|
_totalWastedBytes += udt::MAX_PACKET_SIZE - packet->getDataSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -574,16 +447,147 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
|
||||||
// If this was a full scene then make sure we really send out a stats packet at this point so that
|
// If this was a full scene then make sure we really send out a stats packet at this point so that
|
||||||
// the clients will know the scene is stable
|
// the clients will know the scene is stable
|
||||||
if (isFullScene) {
|
if (isFullScene) {
|
||||||
int thisTrueBytesSent = 0;
|
|
||||||
nodeData->stats.sceneCompleted();
|
nodeData->stats.sceneCompleted();
|
||||||
int numPackets = handlePacketSend(node, nodeData, thisTrueBytesSent, true);
|
handlePacketSend(node, nodeData, true);
|
||||||
_totalBytes += thisTrueBytesSent;
|
|
||||||
_totalPackets += numPackets;
|
|
||||||
truePacketsSent += numPackets;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // end if bag wasn't empty, and so we sent stuff...
|
} // end if bag wasn't empty, and so we sent stuff...
|
||||||
|
|
||||||
return truePacketsSent;
|
return _truePacketsSent;
|
||||||
|
}
|
||||||
|
|
||||||
|
void OctreeSendThread::traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged, bool isFullScene) {
|
||||||
|
// calculate max number of packets that can be sent during this interval
|
||||||
|
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND));
|
||||||
|
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
|
||||||
|
|
||||||
|
int extraPackingAttempts = 0;
|
||||||
|
bool completedScene = false;
|
||||||
|
|
||||||
|
bool somethingToSend = true; // assume we have something
|
||||||
|
while (somethingToSend && _packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
|
||||||
|
float lockWaitElapsedUsec = OctreeServer::SKIP_TIME;
|
||||||
|
float encodeElapsedUsec = OctreeServer::SKIP_TIME;
|
||||||
|
float compressAndWriteElapsedUsec = OctreeServer::SKIP_TIME;
|
||||||
|
float packetSendingElapsedUsec = OctreeServer::SKIP_TIME;
|
||||||
|
|
||||||
|
quint64 startInside = usecTimestampNow();
|
||||||
|
|
||||||
|
bool lastNodeDidntFit = false; // assume each node fits
|
||||||
|
if (!nodeData->elementBag.isEmpty()) {
|
||||||
|
|
||||||
|
quint64 lockWaitStart = usecTimestampNow();
|
||||||
|
_myServer->getOctree()->withReadLock([&]{
|
||||||
|
quint64 lockWaitEnd = usecTimestampNow();
|
||||||
|
lockWaitElapsedUsec = (float)(lockWaitEnd - lockWaitStart);
|
||||||
|
quint64 encodeStart = usecTimestampNow();
|
||||||
|
|
||||||
|
OctreeElementPointer subTree = nodeData->elementBag.extract();
|
||||||
|
if (!subTree) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
float octreeSizeScale = nodeData->getOctreeSizeScale();
|
||||||
|
int boundaryLevelAdjustClient = nodeData->getBoundaryLevelAdjust();
|
||||||
|
|
||||||
|
int boundaryLevelAdjust = boundaryLevelAdjustClient +
|
||||||
|
(viewFrustumChanged ? LOW_RES_MOVING_ADJUST : NO_BOUNDARY_ADJUST);
|
||||||
|
|
||||||
|
EncodeBitstreamParams params(INT_MAX, WANT_EXISTS_BITS, DONT_CHOP,
|
||||||
|
viewFrustumChanged, boundaryLevelAdjust, octreeSizeScale,
|
||||||
|
isFullScene, _myServer->getJurisdiction(), nodeData);
|
||||||
|
nodeData->copyCurrentViewFrustum(params.viewFrustum);
|
||||||
|
if (viewFrustumChanged) {
|
||||||
|
nodeData->copyLastKnownViewFrustum(params.lastViewFrustum);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Our trackSend() function is implemented by the server subclass, and will be called back
|
||||||
|
// during the encodeTreeBitstream() as new entities/data elements are sent
|
||||||
|
params.trackSend = [this, node](const QUuid& dataID, quint64 dataEdited) {
|
||||||
|
_myServer->trackSend(dataID, dataEdited, node->getUUID());
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: should this include the lock time or not? This stat is sent down to the client,
|
||||||
|
// it seems like it may be a good idea to include the lock time as part of the encode time
|
||||||
|
// are reported to client. Since you can encode without the lock
|
||||||
|
nodeData->stats.encodeStarted();
|
||||||
|
|
||||||
|
// NOTE: this is where the tree "contents" are actaully packed
|
||||||
|
_myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->elementBag, params);
|
||||||
|
|
||||||
|
quint64 encodeEnd = usecTimestampNow();
|
||||||
|
encodeElapsedUsec = (float)(encodeEnd - encodeStart);
|
||||||
|
|
||||||
|
// If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've
|
||||||
|
// sent the entire scene. We want to know this below so we'll actually write this content into
|
||||||
|
// the packet and send it
|
||||||
|
completedScene = nodeData->elementBag.isEmpty();
|
||||||
|
|
||||||
|
if (params.stopReason == EncodeBitstreamParams::DIDNT_FIT) {
|
||||||
|
lastNodeDidntFit = true;
|
||||||
|
extraPackingAttempts++;
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeData->stats.encodeStopped();
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
somethingToSend = false; // this will cause us to drop out of the loop...
|
||||||
|
}
|
||||||
|
|
||||||
|
if (completedScene || lastNodeDidntFit) {
|
||||||
|
// we probably want to flush what has accumulated in nodeData but:
|
||||||
|
// do we have more data to send? and is there room?
|
||||||
|
if (_packetData.hasContent()) {
|
||||||
|
// yes, more data to send
|
||||||
|
quint64 compressAndWriteStart = usecTimestampNow();
|
||||||
|
unsigned int additionalSize = _packetData.getFinalizedSize() + sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
|
||||||
|
if (additionalSize > nodeData->getAvailable()) {
|
||||||
|
// no room --> flush what we've got
|
||||||
|
_packetsSentThisInterval += handlePacketSend(node, nodeData);
|
||||||
|
}
|
||||||
|
|
||||||
|
// either there is room, or we've flushed and reset nodeData's data buffer
|
||||||
|
// so we can transfer whatever is in _packetData to nodeData
|
||||||
|
nodeData->writeToPacket(_packetData.getFinalizedData(), _packetData.getFinalizedSize());
|
||||||
|
compressAndWriteElapsedUsec = (float)(usecTimestampNow()- compressAndWriteStart);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool sendNow = completedScene ||
|
||||||
|
nodeData->getAvailable() < MINIMUM_ATTEMPT_MORE_PACKING ||
|
||||||
|
extraPackingAttempts > REASONABLE_NUMBER_OF_PACKING_ATTEMPTS;
|
||||||
|
|
||||||
|
int targetSize = MAX_OCTREE_PACKET_DATA_SIZE;
|
||||||
|
if (sendNow) {
|
||||||
|
quint64 packetSendingStart = usecTimestampNow();
|
||||||
|
_packetsSentThisInterval += handlePacketSend(node, nodeData);
|
||||||
|
quint64 packetSendingEnd = usecTimestampNow();
|
||||||
|
packetSendingElapsedUsec = (float)(packetSendingEnd - packetSendingStart);
|
||||||
|
|
||||||
|
targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
|
||||||
|
extraPackingAttempts = 0;
|
||||||
|
} else {
|
||||||
|
// We want to see if we have room for more in this wire packet but we've copied the _packetData,
|
||||||
|
// so we want to start a new section. We will do that by resetting the packet settings with the max
|
||||||
|
// size of our current available space in the wire packet plus room for our section header and a
|
||||||
|
// little bit of padding.
|
||||||
|
targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE) - COMPRESS_PADDING;
|
||||||
|
}
|
||||||
|
_packetData.changeSettings(true, targetSize); // will do reset - NOTE: Always compressed
|
||||||
|
}
|
||||||
|
OctreeServer::trackTreeWaitTime(lockWaitElapsedUsec);
|
||||||
|
OctreeServer::trackEncodeTime(encodeElapsedUsec);
|
||||||
|
OctreeServer::trackCompressAndWriteTime(compressAndWriteElapsedUsec);
|
||||||
|
OctreeServer::trackPacketSendingTime(packetSendingElapsedUsec);
|
||||||
|
|
||||||
|
quint64 endInside = usecTimestampNow();
|
||||||
|
quint64 elapsedInsideUsecs = endInside - startInside;
|
||||||
|
OctreeServer::trackInsideTime((float)elapsedInsideUsecs);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (somethingToSend && _myServer->wantsVerboseDebug()) {
|
||||||
|
qCDebug(octree) << "Hit PPS Limit, packetsSentThisInterval =" << _packetsSentThisInterval
|
||||||
|
<< " maxPacketsPerInterval = " << maxPacketsPerInterval
|
||||||
|
<< " clientMaxPacketsPerInterval = " << clientMaxPacketsPerInterval;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,12 +53,13 @@ protected:
|
||||||
|
|
||||||
/// Called before a packetDistributor pass to allow for pre-distribution processing
|
/// Called before a packetDistributor pass to allow for pre-distribution processing
|
||||||
virtual void preDistributionProcessing() {};
|
virtual void preDistributionProcessing() {};
|
||||||
|
virtual void traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged, bool isFullScene);
|
||||||
|
|
||||||
OctreeServer* _myServer { nullptr };
|
OctreeServer* _myServer { nullptr };
|
||||||
QWeakPointer<Node> _node;
|
QWeakPointer<Node> _node;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, int& trueBytesSent, bool dontSuppressDuplicate = false);
|
int handlePacketSend(SharedNodePointer node, OctreeQueryNode* nodeData, bool dontSuppressDuplicate = false);
|
||||||
int packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged);
|
int packetDistributor(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged);
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,6 +67,9 @@ private:
|
||||||
|
|
||||||
OctreePacketData _packetData;
|
OctreePacketData _packetData;
|
||||||
|
|
||||||
|
int _truePacketsSent { 0 }; // available for debug stats
|
||||||
|
int _trueBytesSent { 0 }; // available for debug stats
|
||||||
|
int _packetsSentThisInterval { 0 }; // used for bandwidth throttle condition
|
||||||
bool _isShuttingDown { false };
|
bool _isShuttingDown { false };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue