add OctreeSendThread::traverseAndBuildPacket()

cleanup and split data packing from bandwidth management
This commit is contained in:
Andrew Meadows 2017-08-18 11:32:05 -07:00
parent 075b8574fb
commit 65cf7b4f61
2 changed files with 64 additions and 70 deletions

View file

@ -458,84 +458,77 @@ int OctreeSendThread::packetDistributor(SharedNodePointer node, OctreeQueryNode*
return _truePacketsSent; return _truePacketsSent;
} }
bool OctreeSendThread::traverseAndBuildPacket(EncodeBitstreamParams& params) {
bool somethingToSend = false;
OctreeQueryNode* nodeData = static_cast<OctreeQueryNode*>(params.nodeData);
if (!nodeData->elementBag.isEmpty()) {
quint64 encodeStart = usecTimestampNow();
quint64 lockWaitStart = encodeStart;
_myServer->getOctree()->withReadLock([&]{
OctreeServer::trackTreeWaitTime((float)(usecTimestampNow() - lockWaitStart));
OctreeElementPointer subTree = nodeData->elementBag.extract();
if (subTree) {
// NOTE: this is where the tree "contents" are actually packed
nodeData->stats.encodeStarted();
_myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->elementBag, params);
nodeData->stats.encodeStopped();
somethingToSend = true;
}
});
OctreeServer::trackEncodeTime((float)(usecTimestampNow() - encodeStart));
}
return somethingToSend;
}
void OctreeSendThread::traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged, bool isFullScene) { void OctreeSendThread::traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged, bool isFullScene) {
// calculate max number of packets that can be sent during this interval // calculate max number of packets that can be sent during this interval
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND)); int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND));
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval()); int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
int extraPackingAttempts = 0; int extraPackingAttempts = 0;
bool completedScene = false;
bool somethingToSend = true; // assume we have something // init params once outside the while loop
while (somethingToSend && _packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
float lockWaitElapsedUsec = OctreeServer::SKIP_TIME;
float encodeElapsedUsec = OctreeServer::SKIP_TIME;
float compressAndWriteElapsedUsec = OctreeServer::SKIP_TIME;
float packetSendingElapsedUsec = OctreeServer::SKIP_TIME;
quint64 startInside = usecTimestampNow();
bool lastNodeDidntFit = false; // assume each node fits
if (!nodeData->elementBag.isEmpty()) {
quint64 lockWaitStart = usecTimestampNow();
_myServer->getOctree()->withReadLock([&]{
quint64 lockWaitEnd = usecTimestampNow();
lockWaitElapsedUsec = (float)(lockWaitEnd - lockWaitStart);
quint64 encodeStart = usecTimestampNow();
OctreeElementPointer subTree = nodeData->elementBag.extract();
if (!subTree) {
return;
}
float octreeSizeScale = nodeData->getOctreeSizeScale();
int boundaryLevelAdjustClient = nodeData->getBoundaryLevelAdjust(); int boundaryLevelAdjustClient = nodeData->getBoundaryLevelAdjust();
int boundaryLevelAdjust = boundaryLevelAdjustClient + int boundaryLevelAdjust = boundaryLevelAdjustClient +
(viewFrustumChanged ? LOW_RES_MOVING_ADJUST : NO_BOUNDARY_ADJUST); (viewFrustumChanged ? LOW_RES_MOVING_ADJUST : NO_BOUNDARY_ADJUST);
float octreeSizeScale = nodeData->getOctreeSizeScale();
EncodeBitstreamParams params(INT_MAX, WANT_EXISTS_BITS, DONT_CHOP, EncodeBitstreamParams params(INT_MAX, WANT_EXISTS_BITS, DONT_CHOP,
viewFrustumChanged, boundaryLevelAdjust, octreeSizeScale, viewFrustumChanged, boundaryLevelAdjust, octreeSizeScale,
isFullScene, _myServer->getJurisdiction(), nodeData); isFullScene, _myServer->getJurisdiction(), nodeData);
nodeData->copyCurrentViewFrustum(params.viewFrustum);
if (viewFrustumChanged) {
nodeData->copyLastKnownViewFrustum(params.lastViewFrustum);
}
// Our trackSend() function is implemented by the server subclass, and will be called back // Our trackSend() function is implemented by the server subclass, and will be called back
// during the encodeTreeBitstream() as new entities/data elements are sent // during the encodeTreeBitstream() as new entities/data elements are sent
params.trackSend = [this](const QUuid& dataID, quint64 dataEdited) { params.trackSend = [this](const QUuid& dataID, quint64 dataEdited) {
_myServer->trackSend(dataID, dataEdited, _nodeUuid); _myServer->trackSend(dataID, dataEdited, _nodeUuid);
}; };
nodeData->copyCurrentViewFrustum(params.viewFrustum);
if (viewFrustumChanged) {
nodeData->copyLastKnownViewFrustum(params.lastViewFrustum);
}
// TODO: should this include the lock time or not? This stat is sent down to the client, bool somethingToSend = true; // assume we have something
// it seems like it may be a good idea to include the lock time as part of the encode time bool bagHadSomething = !nodeData->elementBag.isEmpty();
// are reported to client. Since you can encode without the lock while (somethingToSend && _packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
nodeData->stats.encodeStarted(); float compressAndWriteElapsedUsec = 0.0f;
float packetSendingElapsedUsec = 0.0f;
// NOTE: this is where the tree "contents" are actaully packed quint64 startInside = usecTimestampNow();
_myServer->getOctree()->encodeTreeBitstream(subTree, &_packetData, nodeData->elementBag, params);
quint64 encodeEnd = usecTimestampNow(); bool lastNodeDidntFit = false; // assume each node fits
encodeElapsedUsec = (float)(encodeEnd - encodeStart); params.stopReason = EncodeBitstreamParams::UNKNOWN; // reset params.stopReason before traversal
// If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've somethingToSend = traverseAndBuildPacket(params);
// sent the entire scene. We want to know this below so we'll actually write this content into
// the packet and send it
completedScene = nodeData->elementBag.isEmpty();
if (params.stopReason == EncodeBitstreamParams::DIDNT_FIT) { if (params.stopReason == EncodeBitstreamParams::DIDNT_FIT) {
lastNodeDidntFit = true; lastNodeDidntFit = true;
extraPackingAttempts++; extraPackingAttempts++;
} }
nodeData->stats.encodeStopped(); // If the bag had contents but is now empty then we know we've sent the entire scene.
}); bool completedScene = bagHadSomething && nodeData->elementBag.isEmpty();
} else {
somethingToSend = false; // this will cause us to drop out of the loop...
}
if (completedScene || lastNodeDidntFit) { if (completedScene || lastNodeDidntFit) {
// we probably want to flush what has accumulated in nodeData but: // we probably want to flush what has accumulated in nodeData but:
// do we have more data to send? and is there room? // do we have more data to send? and is there room?
@ -562,8 +555,7 @@ void OctreeSendThread::traverseTreeAndSendContents(SharedNodePointer node, Octre
if (sendNow) { if (sendNow) {
quint64 packetSendingStart = usecTimestampNow(); quint64 packetSendingStart = usecTimestampNow();
_packetsSentThisInterval += handlePacketSend(node, nodeData); _packetsSentThisInterval += handlePacketSend(node, nodeData);
quint64 packetSendingEnd = usecTimestampNow(); packetSendingElapsedUsec = (float)(usecTimestampNow() - packetSendingStart);
packetSendingElapsedUsec = (float)(packetSendingEnd - packetSendingStart);
targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE); targetSize = nodeData->getAvailable() - sizeof(OCTREE_PACKET_INTERNAL_SECTION_SIZE);
extraPackingAttempts = 0; extraPackingAttempts = 0;
@ -576,14 +568,14 @@ void OctreeSendThread::traverseTreeAndSendContents(SharedNodePointer node, Octre
} }
_packetData.changeSettings(true, targetSize); // will do reset - NOTE: Always compressed _packetData.changeSettings(true, targetSize); // will do reset - NOTE: Always compressed
} }
OctreeServer::trackTreeWaitTime(lockWaitElapsedUsec); if (!somethingToSend) {
OctreeServer::trackEncodeTime(encodeElapsedUsec); // these stats were not updated so we record zero to record this fact
OctreeServer::trackTreeWaitTime(0.0f);
OctreeServer::trackEncodeTime(0.0f);
}
OctreeServer::trackCompressAndWriteTime(compressAndWriteElapsedUsec); OctreeServer::trackCompressAndWriteTime(compressAndWriteElapsedUsec);
OctreeServer::trackPacketSendingTime(packetSendingElapsedUsec); OctreeServer::trackPacketSendingTime(packetSendingElapsedUsec);
OctreeServer::trackInsideTime((float)(usecTimestampNow() - startInside));
quint64 endInside = usecTimestampNow();
quint64 elapsedInsideUsecs = endInside - startInside;
OctreeServer::trackInsideTime((float)elapsedInsideUsecs);
} }
if (somethingToSend && _myServer->wantsVerboseDebug()) { if (somethingToSend && _myServer->wantsVerboseDebug()) {

View file

@ -53,7 +53,9 @@ protected:
/// Called before a packetDistributor pass to allow for pre-distribution processing /// Called before a packetDistributor pass to allow for pre-distribution processing
virtual void preDistributionProcessing() {}; virtual void preDistributionProcessing() {};
virtual void traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData, bool viewFrustumChanged, bool isFullScene); virtual void traverseTreeAndSendContents(SharedNodePointer node, OctreeQueryNode* nodeData,
bool viewFrustumChanged, bool isFullScene);
virtual bool traverseAndBuildPacket(EncodeBitstreamParams& params);
OctreeServer* _myServer { nullptr }; OctreeServer* _myServer { nullptr };
QWeakPointer<Node> _node; QWeakPointer<Node> _node;