more networking API changes in octree classes

This commit is contained in:
Stephen Birarda 2015-07-07 12:31:22 -07:00
parent fca23cc69b
commit 01b1c46237
5 changed files with 68 additions and 65 deletions

View file

@ -365,7 +365,7 @@ bool OctreeQueryNode::hasNextNackedPacket() const {
return !_nackedSequenceNumbers.isEmpty();
}
NLPacket* OctreeQueryNode::getNextNackedPacket() {
const NLPacket* OctreeQueryNode::getNextNackedPacket() {
if (!_nackedSequenceNumbers.isEmpty()) {
// could return null if packet is not in the history
return _sentPacketHistory.getPacket(_nackedSequenceNumbers.dequeue());

View file

@ -111,7 +111,7 @@ public:
void parseNackPacket(const QByteArray& packet);
bool hasNextNackedPacket() const;
NLPacket* getNextNackedPacket();
const NLPacket* getNextNackedPacket();
private slots:
void sendThreadFinished();

View file

@ -30,10 +30,10 @@ OctreeSendThread::OctreeSendThread(OctreeServer* myServer, const SharedNodePoint
_isShuttingDown(false)
{
QString safeServerName("Octree");
// set our QThread object name so we can identify this thread while debugging
setObjectName(QString("Octree Send Thread (%1)").arg(uuidStringWithoutCurlyBraces(node->getUUID())));
if (_myServer) {
safeServerName = _myServer->getMyServerName();
}
@ -49,7 +49,7 @@ OctreeSendThread::~OctreeSendThread() {
if (_myServer) {
safeServerName = _myServer->getMyServerName();
}
qDebug() << qPrintable(safeServerName) << "server [" << _myServer << "]: client disconnected "
"- ending sending thread [" << this << "]";
@ -121,18 +121,18 @@ quint64 OctreeSendThread::_totalPackets = 0;
int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytesSent, int& truePacketsSent) {
OctreeServer::didHandlePacketSend(this);
// if we're shutting down, then exit early
// if we're shutting down, then exit early
if (nodeData->isShuttingDown()) {
return 0;
}
bool debug = _myServer->wantsDebugSending();
quint64 now = usecTimestampNow();
bool packetSent = false; // did we send a packet?
int packetsSent = 0;
// Here's where we check to see if this packet is a duplicate of the last packet. If it is, we will silently
// obscure the packet and not send it. This allows the callers and upper level logic to not need to know about
// this rate control savings.
@ -202,7 +202,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
OCTREE_PACKET_SENT_TIME timestamp = (*(OCTREE_PACKET_SENT_TIME*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
qDebug() << "Sending separate stats packet at " << now << " [" << _totalPackets <<"]: sequence: " << sequence <<
" timestamp: " << timestamp <<
" size: " << statsMessageLength << " [" << _totalBytes <<
@ -230,7 +230,7 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
OCTREE_PACKET_SENT_TIME timestamp = (*(OCTREE_PACKET_SENT_TIME*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
qDebug() << "Sending packet at " << now << " [" << _totalPackets <<"]: sequence: " << sequence <<
" timestamp: " << timestamp <<
" size: " << nodeData->getPacketLength() << " [" << _totalBytes <<
@ -243,23 +243,25 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
if (nodeData->isPacketWaiting() && !nodeData->isShuttingDown()) {
// just send the octree packet
OctreeServer::didCallWriteDatagram(this);
DependencyManager::get<NodeList>()->writeDatagram((char*)nodeData->getPacket(), nodeData->getPacketLength(), _node);
DependencyManager::get<NodeList>()->sendUnreliablePacket(nodeData->getPacket(), _node);
packetSent = true;
int thisWastedBytes = MAX_PACKET_SIZE - nodeData->getPacketLength();
_totalWastedBytes += thisWastedBytes;
_totalBytes += nodeData->getPacketLength();
_totalPackets++;
if (debug) {
const unsigned char* messageData = nodeData->getPacket();
int numBytesPacketHeader = numBytesForPacketHeader(reinterpret_cast<const char*>(messageData));
const unsigned char* dataAt = messageData + numBytesPacketHeader;
dataAt += sizeof(OCTREE_PACKET_FLAGS);
OCTREE_PACKET_SEQUENCE sequence = (*(OCTREE_PACKET_SEQUENCE*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SEQUENCE);
OCTREE_PACKET_SENT_TIME timestamp = (*(OCTREE_PACKET_SENT_TIME*)dataAt);
dataAt += sizeof(OCTREE_PACKET_SENT_TIME);
NLPacket& sentPacket = nodeData->getPacket();
sentPacket->seek(sizeof(OCTREE_PACKET_FLAGS));
OCTREE_PACKET_SEQUENCE sequence;
sentPacket->read(&sequence, sizeof(sequence));
OCTREE_PACKET_SENT_TIME timestamp;
sentPacket->read(&timestamp, sizeof(timestamp));
qDebug() << "Sending packet at " << now << " [" << _totalPackets <<"]: sequence: " << sequence <<
" timestamp: " << timestamp <<
" size: " << nodeData->getPacketLength() << " [" << _totalBytes <<
@ -267,10 +269,11 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
}
}
}
// remember to track our stats
if (packetSent) {
nodeData->stats.packetSent(nodeData->getPacketLength());
trueBytesSent += nodeData->getPacketLength();
nodeData->stats.packetSent(nodeData->getPacket()->getSizeUsed());
trueBytesSent += nodeData->getPacket()->getSizeUsed();
truePacketsSent++;
packetsSent++;
nodeData->octreePacketSent();
@ -282,14 +285,14 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
/// Version of octree element distributor that sends the deepest LOD level at once
int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) {
OctreeServer::didPacketDistributor(this);
// if shutting down, exit early
if (nodeData->isShuttingDown()) {
return 0;
}
// calculate max number of packets that can be sent during this interval
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxQueryPacketsPerSecond() / INTERVALS_PER_SECOND));
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
@ -297,7 +300,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
int truePacketsSent = 0;
int trueBytesSent = 0;
int packetsSentThisInterval = 0;
bool isFullScene = ((!viewFrustumChanged || !nodeData->getWantDelta()) && nodeData->getViewFrustumJustStoppedChanging())
bool isFullScene = ((!viewFrustumChanged || !nodeData->getWantDelta()) && nodeData->getViewFrustumJustStoppedChanging())
|| nodeData->hasLodChanged();
bool somethingToSend = true; // assume we have something
@ -395,18 +398,18 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
int extraPackingAttempts = 0;
bool completedScene = false;
while (somethingToSend && packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
float lockWaitElapsedUsec = OctreeServer::SKIP_TIME;
float encodeElapsedUsec = OctreeServer::SKIP_TIME;
float compressAndWriteElapsedUsec = OctreeServer::SKIP_TIME;
float packetSendingElapsedUsec = OctreeServer::SKIP_TIME;
quint64 startInside = usecTimestampNow();
quint64 startInside = usecTimestampNow();
bool lastNodeDidntFit = false; // assume each node fits
if (!nodeData->elementBag.isEmpty()) {
quint64 lockWaitStart = usecTimestampNow();
_myServer->getOctree()->lockForRead();
quint64 lockWaitEnd = usecTimestampNow();
@ -419,15 +422,15 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// going to result in any packets being sent...
//
// If our node is root, and the root hasn't changed, and our view hasn't changed,
// and we've already seen at least one duplicate packet, then we probably don't need
// to lock the tree and encode, because the result should be that no bytes will be
// and we've already seen at least one duplicate packet, then we probably don't need
// to lock the tree and encode, because the result should be that no bytes will be
// encoded, and this will be a duplicate packet from the last one we sent...
OctreeElement* root = _myServer->getOctree()->getRoot();
bool skipEncode = false;
if (
(subTree == root)
&& (nodeData->getLastRootTimestamp() == root->getLastChanged())
&& !viewFrustumChanged
&& !viewFrustumChanged
&& (nodeData->getDuplicatePacketCount() > 0)
) {
qDebug() << "is root, root not changed, view not changed, already seen a duplicate!"
@ -438,13 +441,13 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
bool wantOcclusionCulling = nodeData->getWantOcclusionCulling();
CoverageMap* coverageMap = wantOcclusionCulling ? &nodeData->map : IGNORE_COVERAGE_MAP;
float octreeSizeScale = nodeData->getOctreeSizeScale();
int boundaryLevelAdjustClient = nodeData->getBoundaryLevelAdjust();
int boundaryLevelAdjust = boundaryLevelAdjustClient + (viewFrustumChanged && nodeData->getWantLowResMoving()
? LOW_RES_MOVING_ADJUST : NO_BOUNDARY_ADJUST);
EncodeBitstreamParams params(INT_MAX, &nodeData->getCurrentViewFrustum(), wantColor,
WANT_EXISTS_BITS, DONT_CHOP, wantDelta, lastViewFrustum,
wantOcclusionCulling, coverageMap, boundaryLevelAdjust, octreeSizeScale,
@ -461,7 +464,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
quint64 encodeEnd = usecTimestampNow();
encodeElapsedUsec = (float)(encodeEnd - encodeStart);
// If after calling encodeTreeBitstream() there are no nodes left to send, then we know we've
// sent the entire scene. We want to know this below so we'll actually write this content into
// the packet and send it
@ -502,7 +505,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
if (_packetData.hasContent()) {
quint64 compressAndWriteStart = usecTimestampNow();
// if for some reason the finalized size is greater than our available size, then probably the "compressed"
// form actually inflated beyond our padding, and in this case we will send the current packet, then
// write to out new packet...
@ -522,7 +525,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// If we're not running compressed, then we know we can just send now. Or if we're running compressed, but
// the packet doesn't have enough space to bother attempting to pack more...
bool sendNow = true;
if (nodeData->getCurrentPacketIsCompressed() &&
nodeData->getAvailable() >= MINIMUM_ATTEMPT_MORE_PACKING &&
extraPackingAttempts <= REASONABLE_NUMBER_OF_PACKING_ATTEMPTS) {
@ -555,7 +558,7 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
OctreeServer::trackEncodeTime(encodeElapsedUsec);
OctreeServer::trackCompressAndWriteTime(compressAndWriteElapsedUsec);
OctreeServer::trackPacketSendingTime(packetSendingElapsedUsec);
quint64 endInside = usecTimestampNow();
quint64 elapsedInsideUsecs = endInside - startInside;
OctreeServer::trackInsideTime((float)elapsedInsideUsecs);
@ -582,20 +585,20 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// Re-send packets that were nacked by the client
while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) {
const QByteArray* packet = nodeData->getNextNackedPacket();
const NLPacket* packet = nodeData->getNextNackedPacket();
if (packet) {
DependencyManager::get<NodeList>()->writeDatagram(*packet, _node);
DependencyManager::get<NodeList>()->sendUnreliablePacket(*packet, _node);
truePacketsSent++;
packetsSentThisInterval++;
_totalBytes += packet->size();
_totalBytes += packet->getSizeWithHeader();
_totalPackets++;
_totalWastedBytes += MAX_PACKET_SIZE - packet->size();
_totalWastedBytes += MAX_PACKET_SIZE - packet->getSizeWithHeader();
}
}
quint64 end = usecTimestampNow();
int elapsedmsec = (end - start)/USECS_PER_MSEC;
int elapsedmsec = (end - start) / USECS_PER_MSEC;
OctreeServer::trackLoopTime(elapsedmsec);
// TODO: add these to stats page

View file

@ -23,7 +23,7 @@ public:
SentPacketHistory(int size = MAX_REASONABLE_SEQUENCE_GAP);
void packetSent(uint16_t sequenceNumber, const NLPacket& packet);
const QByteArray* getPacket(uint16_t sequenceNumber) const;
const NLPacket* getPacket(uint16_t sequenceNumber) const;
private:
RingBufferHistory<NLPacket*> _sentPackets; // circular buffer

View file

@ -32,10 +32,10 @@ public:
OctreeSceneStats();
~OctreeSceneStats();
void reset();
OctreeSceneStats(const OctreeSceneStats& other); // copy constructor
OctreeSceneStats& operator= (const OctreeSceneStats& other); // copy assignment
/// Call when beginning the computation of a scene. Initializes internal structures
void sceneStarted(bool fullScene, bool moving, OctreeElement* root, JurisdictionMap* jurisdictionMap);
bool getIsSceneStarted() const { return _isStarted; }
@ -44,7 +44,7 @@ public:
void sceneCompleted();
void printDebugDetails();
/// Track that a packet was sent as part of the scene.
void packetSent(int bytes);
@ -53,7 +53,7 @@ public:
/// Tracks the ending of an encode pass during scene calculation.
void encodeStopped();
/// Track that a element was traversed as part of computation of a scene.
void traversed(const OctreeElement* element);
@ -133,7 +133,7 @@ public:
int detailsCount;
const char* detailsLabels;
};
/// Returns details about items tracked by OctreeSceneStats
/// \param Item item The item from the stats you're interested in.
ItemInfo& getItemInfo(Item item) { return _ITEMS[item]; }
@ -147,7 +147,7 @@ public:
/// Returns list of OctCodes for end elements of the jurisdiction of this particular octree server
const std::vector<unsigned char*>& getJurisdictionEndNodes() const { return _jurisdictionEndNodes; }
bool isMoving() const { return _isMoving; }
quint64 getTotalElements() const { return _totalElements; }
quint64 getTotalInternal() const { return _totalInternal; }
@ -164,10 +164,10 @@ public:
void trackIncomingOctreePacket(const QByteArray& packet, bool wasStatsPacket, int nodeClockSkewUsec);
quint32 getIncomingPackets() const { return _incomingPacket; }
quint64 getIncomingBytes() const { return _incomingBytes; }
quint64 getIncomingBytes() const { return _incomingBytes; }
quint64 getIncomingWastedBytes() const { return _incomingWastedBytes; }
float getIncomingFlightTimeAverage() { return _incomingFlightTimeAverage.getAverage(); }
const SequenceNumberStats& getIncomingOctreeSequenceNumberStats() const { return _incomingOctreeSequenceNumberStats; }
SequenceNumberStats& getIncomingOctreeSequenceNumberStats() { return _incomingOctreeSequenceNumberStats; }
@ -177,7 +177,7 @@ private:
bool _isReadyToSend;
unsigned char _statsMessage[MAX_PACKET_SIZE];
qint32 _statsMessageLength;
// scene timing data in usecs
@ -190,13 +190,13 @@ private:
quint64 _lastFullTotalEncodeTime;
quint32 _lastFullTotalPackets;
quint64 _lastFullTotalBytes;
SimpleMovingAverage _elapsedAverage;
SimpleMovingAverage _bitsPerOctreeAverage;
quint64 _totalEncodeTime;
quint64 _encodeStart;
// scene octree related data
quint64 _totalElements;
quint64 _totalInternal;
@ -205,7 +205,7 @@ private:
quint64 _traversed;
quint64 _internal;
quint64 _leaves;
quint64 _skippedDistance;
quint64 _internalSkippedDistance;
quint64 _leavesSkippedDistance;
@ -241,24 +241,24 @@ private:
// Accounting Notes:
//
// 1) number of octrees sent can be calculated as _colorSent + _colorBitsWritten. This works because each internal
// 1) number of octrees sent can be calculated as _colorSent + _colorBitsWritten. This works because each internal
// element in a packet will have a _colorBitsWritten included for it and each "leaf" in the packet will have a
// _colorSent written for it. Note that these "leaf" elements in the packets may not be actual leaves in the full
// tree, because LOD may cause us to send an average color for an internal element instead of recursing deeper to
// the leaves.
//
// 2) the stats balance if: (working assumption)
// if _colorSent > 0
// 2) the stats balance if: (working assumption)
// if _colorSent > 0
// _traversed = all skipped + _colorSent + _colorBitsWritten
// else
// _traversed = all skipped + _colorSent + _colorBitsWritten + _treesRemoved
//
// scene network related data
quint32 _packets;
quint64 _bytes;
quint32 _passes;
// incoming packets stats
quint32 _incomingPacket;
quint64 _incomingBytes;
@ -267,7 +267,7 @@ private:
SequenceNumberStats _incomingOctreeSequenceNumberStats;
SimpleMovingAverage _incomingFlightTimeAverage;
// features related items
bool _isMoving;
bool _isFullScene;
@ -276,7 +276,7 @@ private:
static ItemInfo _ITEMS[];
static const int MAX_ITEM_VALUE_LENGTH = 128;
char _itemValueBuffer[MAX_ITEM_VALUE_LENGTH];
unsigned char* _jurisdictionRoot;
std::vector<unsigned char*> _jurisdictionEndNodes;
};