mirror of
https://github.com/overte-org/overte.git
synced 2025-04-20 11:45:36 +02:00
VoxelPacketProcessor -> OctreePacketProcessor
added nodes bookkeeping in ReceivedPacketProcessor; added check in sendNack() to not send NACKs to nodes that have sent packets that are waiting in the message queue.
This commit is contained in:
parent
ffda98fe0b
commit
d84beee3e4
11 changed files with 56 additions and 41 deletions
|
@ -158,7 +158,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
|
|||
_mousePressed(false),
|
||||
_audio(STARTUP_JITTER_SAMPLES),
|
||||
_enableProcessVoxelsThread(true),
|
||||
_voxelProcessor(),
|
||||
_octreeProcessor(),
|
||||
_voxelHideShowThread(&_voxels),
|
||||
_packetsPerSecond(0),
|
||||
_bytesPerSecond(0),
|
||||
|
@ -418,7 +418,7 @@ Application::~Application() {
|
|||
_audio.thread()->quit();
|
||||
_audio.thread()->wait();
|
||||
|
||||
_voxelProcessor.terminate();
|
||||
_octreeProcessor.terminate();
|
||||
_voxelHideShowThread.terminate();
|
||||
_voxelEditSender.terminate();
|
||||
_particleEditSender.terminate();
|
||||
|
@ -517,7 +517,7 @@ void Application::initializeGL() {
|
|||
qDebug( "init() complete.");
|
||||
|
||||
// create thread for parsing of voxel data independent of the main network and rendering threads
|
||||
_voxelProcessor.initialize(_enableProcessVoxelsThread);
|
||||
_octreeProcessor.initialize(_enableProcessVoxelsThread);
|
||||
_voxelEditSender.initialize(_enableProcessVoxelsThread);
|
||||
_voxelHideShowThread.initialize(_enableProcessVoxelsThread);
|
||||
_particleEditSender.initialize(_enableProcessVoxelsThread);
|
||||
|
@ -1884,7 +1884,7 @@ void Application::updateThreads(float deltaTime) {
|
|||
|
||||
// parse voxel packets
|
||||
if (!_enableProcessVoxelsThread) {
|
||||
_voxelProcessor.threadRoutine();
|
||||
_octreeProcessor.threadRoutine();
|
||||
_voxelHideShowThread.threadRoutine();
|
||||
_voxelEditSender.threadRoutine();
|
||||
_particleEditSender.threadRoutine();
|
||||
|
@ -2125,6 +2125,12 @@ void Application::sendNack() {
|
|||
|| node->getType() == NodeType::ModelServer)
|
||||
) {
|
||||
|
||||
// if there are octree packets from this node that are waiting to be processed,
|
||||
// don't send a NACK since the missing packets may be among those waiting packets.
|
||||
if (_octreeProcessor.hasPacketsToProcessFrom(node)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
QUuid nodeUUID = node->getUUID();
|
||||
|
||||
_octreeSceneStatsLock.lockForRead();
|
||||
|
|
|
@ -88,7 +88,7 @@
|
|||
#include "voxels/VoxelFade.h"
|
||||
#include "voxels/VoxelHideShowThread.h"
|
||||
#include "voxels/VoxelImporter.h"
|
||||
#include "voxels/VoxelPacketProcessor.h"
|
||||
#include "voxels/OctreePacketProcessor.h"
|
||||
#include "voxels/VoxelSystem.h"
|
||||
|
||||
|
||||
|
@ -129,7 +129,7 @@ static const float MIRROR_FIELD_OF_VIEW = 30.0f;
|
|||
class Application : public QApplication {
|
||||
Q_OBJECT
|
||||
|
||||
friend class VoxelPacketProcessor;
|
||||
friend class OctreePacketProcessor;
|
||||
friend class VoxelEditPacketSender;
|
||||
friend class DatagramProcessor;
|
||||
|
||||
|
@ -192,7 +192,7 @@ public:
|
|||
ViewFrustum* getShadowViewFrustum() { return &_shadowViewFrustum; }
|
||||
VoxelSystem* getVoxels() { return &_voxels; }
|
||||
VoxelTree* getVoxelTree() { return _voxels.getTree(); }
|
||||
const VoxelPacketProcessor& getVoxelPacketProcessor() const { return _voxelProcessor; }
|
||||
const OctreePacketProcessor& getOctreePacketProcessor() const { return _octreeProcessor; }
|
||||
ParticleTreeRenderer* getParticles() { return &_particles; }
|
||||
MetavoxelSystem* getMetavoxels() { return &_metavoxels; }
|
||||
ModelTreeRenderer* getModels() { return &_models; }
|
||||
|
@ -533,7 +533,7 @@ private:
|
|||
Audio _audio;
|
||||
|
||||
bool _enableProcessVoxelsThread;
|
||||
VoxelPacketProcessor _voxelProcessor;
|
||||
OctreePacketProcessor _octreeProcessor;
|
||||
VoxelHideShowThread _voxelHideShowThread;
|
||||
VoxelEditPacketSender _voxelEditSender;
|
||||
ParticleEditPacketSender _particleEditSender;
|
||||
|
|
|
@ -71,7 +71,7 @@ void DatagramProcessor::processDatagrams() {
|
|||
case PacketTypeOctreeStats:
|
||||
case PacketTypeEnvironmentData: {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"Application::networkReceive()... _voxelProcessor.queueReceivedPacket()");
|
||||
"Application::networkReceive()... _octreeProcessor.queueReceivedPacket()");
|
||||
|
||||
bool wantExtraDebugging = application->getLogger()->extraDebugging();
|
||||
if (wantExtraDebugging && packetTypeForPacket(incomingPacket) == PacketTypeVoxelData) {
|
||||
|
@ -92,7 +92,7 @@ void DatagramProcessor::processDatagrams() {
|
|||
|
||||
if (matchedNode) {
|
||||
// add this packet to our list of voxel packets and process them on the voxel processing
|
||||
application->_voxelProcessor.queueReceivedPacket(matchedNode, incomingPacket);
|
||||
application->_octreeProcessor.queueReceivedPacket(matchedNode, incomingPacket);
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -45,7 +45,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) {
|
|||
QGLWidget* glWidget = application->getGLWidget();
|
||||
MyAvatar* myAvatar = application->getAvatar();
|
||||
Audio* audio = application->getAudio();
|
||||
const VoxelPacketProcessor& voxelPacketProcessor = application->getVoxelPacketProcessor();
|
||||
const OctreePacketProcessor& octreePacketProcessor = application->getOctreePacketProcessor();
|
||||
BandwidthMeter* bandwidthMeter = application->getBandwidthMeter();
|
||||
NodeBounds& nodeBoundsDisplay = application->getNodeBoundsDisplay();
|
||||
|
||||
|
@ -200,7 +200,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) {
|
|||
if (Menu::getInstance()->isOptionChecked(MenuOption::Stats)) {
|
||||
// let's set horizontal offset to give stats some margin to mirror
|
||||
int horizontalOffset = MIRROR_VIEW_WIDTH + MIRROR_VIEW_LEFT_PADDING * 2;
|
||||
int voxelPacketsToProcess = voxelPacketProcessor.packetsToProcessCount();
|
||||
int voxelPacketsToProcess = octreePacketProcessor.packetsToProcessCount();
|
||||
// Onscreen text about position, servers, etc
|
||||
Stats::getInstance()->display(WHITE_TEXT, horizontalOffset, application->getFps(), application->getPacketsPerSecond(), application->getBytesPerSecond(), voxelPacketsToProcess);
|
||||
// Bandwidth meter
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// VoxelPacketProcessor.cpp
|
||||
// OctreePacketProcessor.cpp
|
||||
// interface/src/voxels
|
||||
//
|
||||
// Created by Brad Hefta-Gaub on 8/12/13.
|
||||
|
@ -13,18 +13,18 @@
|
|||
|
||||
#include "Application.h"
|
||||
#include "Menu.h"
|
||||
#include "VoxelPacketProcessor.h"
|
||||
#include "OctreePacketProcessor.h"
|
||||
|
||||
void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
|
||||
void OctreePacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"VoxelPacketProcessor::processPacket()");
|
||||
"OctreePacketProcessor::processPacket()");
|
||||
|
||||
QByteArray mutablePacket = packet;
|
||||
|
||||
const int WAY_BEHIND = 300;
|
||||
|
||||
if (packetsToProcessCount() > WAY_BEHIND && Application::getInstance()->getLogger()->extraDebugging()) {
|
||||
qDebug("VoxelPacketProcessor::processPacket() packets to process=%d", packetsToProcessCount());
|
||||
qDebug("OctreePacketProcessor::processPacket() packets to process=%d", packetsToProcessCount());
|
||||
}
|
||||
ssize_t messageLength = mutablePacket.size();
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// VoxelPacketProcessor.h
|
||||
// OctreePacketProcessor.h
|
||||
// interface/src/voxels
|
||||
//
|
||||
// Created by Brad Hefta-Gaub on 8/12/13.
|
||||
|
@ -9,16 +9,16 @@
|
|||
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||
//
|
||||
|
||||
#ifndef hifi_VoxelPacketProcessor_h
|
||||
#define hifi_VoxelPacketProcessor_h
|
||||
#ifndef hifi_OctreePacketProcessor_h
|
||||
#define hifi_OctreePacketProcessor_h
|
||||
|
||||
#include <ReceivedPacketProcessor.h>
|
||||
|
||||
/// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes
|
||||
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
|
||||
class VoxelPacketProcessor : public ReceivedPacketProcessor {
|
||||
class OctreePacketProcessor : public ReceivedPacketProcessor {
|
||||
Q_OBJECT
|
||||
protected:
|
||||
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
|
||||
};
|
||||
#endif // hifi_VoxelPacketProcessor_h
|
||||
#endif // hifi_OctreePacketProcessor_h
|
|
@ -17,9 +17,9 @@
|
|||
|
||||
#include "NetworkPacket.h"
|
||||
|
||||
void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const QByteArray& packet) {
|
||||
void NetworkPacket::copyContents(const SharedNodePointer& sendingNode, const QByteArray& packet) {
|
||||
if (packet.size() && packet.size() <= MAX_PACKET_SIZE) {
|
||||
_destinationNode = destinationNode;
|
||||
_sendingNode = sendingNode;
|
||||
_byteArray = packet;
|
||||
} else {
|
||||
qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size());
|
||||
|
@ -27,28 +27,28 @@ void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const
|
|||
}
|
||||
|
||||
NetworkPacket::NetworkPacket(const NetworkPacket& packet) {
|
||||
copyContents(packet.getDestinationNode(), packet.getByteArray());
|
||||
copyContents(packet.getSendingNode(), packet.getByteArray());
|
||||
}
|
||||
|
||||
NetworkPacket::NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) {
|
||||
copyContents(destinationNode, packet);
|
||||
NetworkPacket::NetworkPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
|
||||
copyContents(sendingNode, packet);
|
||||
};
|
||||
|
||||
// copy assignment
|
||||
NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) {
|
||||
copyContents(other.getDestinationNode(), other.getByteArray());
|
||||
copyContents(other.getSendingNode(), other.getByteArray());
|
||||
return *this;
|
||||
}
|
||||
|
||||
#ifdef HAS_MOVE_SEMANTICS
|
||||
// move, same as copy, but other packet won't be used further
|
||||
NetworkPacket::NetworkPacket(NetworkPacket && packet) {
|
||||
copyContents(packet.getDestinationNode(), packet.getByteArray());
|
||||
copyContents(packet.getSendingNode(), packet.getByteArray());
|
||||
}
|
||||
|
||||
// move assignment
|
||||
NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) {
|
||||
copyContents(other.getDestinationNode(), other.getByteArray());
|
||||
copyContents(other.getSendingNode(), other.getByteArray());
|
||||
return *this;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -34,15 +34,15 @@ public:
|
|||
NetworkPacket& operator= (NetworkPacket&& other); // move assignment
|
||||
#endif
|
||||
|
||||
NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& byteArray);
|
||||
NetworkPacket(const SharedNodePointer& sendingNode, const QByteArray& byteArray);
|
||||
|
||||
const SharedNodePointer& getDestinationNode() const { return _destinationNode; }
|
||||
const SharedNodePointer& getSendingNode() const { return _sendingNode; }
|
||||
const QByteArray& getByteArray() const { return _byteArray; }
|
||||
|
||||
private:
|
||||
void copyContents(const SharedNodePointer& destinationNode, const QByteArray& byteArray);
|
||||
void copyContents(const SharedNodePointer& sendingNode, const QByteArray& byteArray);
|
||||
|
||||
SharedNodePointer _destinationNode;
|
||||
SharedNodePointer _sendingNode;
|
||||
QByteArray _byteArray;
|
||||
};
|
||||
|
||||
|
|
|
@ -271,7 +271,7 @@ bool PacketSender::nonThreadedProcess() {
|
|||
unlock();
|
||||
|
||||
// send the packet through the NodeList...
|
||||
NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getDestinationNode());
|
||||
NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getSendingNode());
|
||||
packetsSentThisCall++;
|
||||
_packetsOverCheckInterval++;
|
||||
_totalPacketsSent++;
|
||||
|
|
|
@ -17,13 +17,14 @@ void ReceivedPacketProcessor::terminating() {
|
|||
_hasPackets.wakeAll();
|
||||
}
|
||||
|
||||
void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) {
|
||||
void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
|
||||
// Make sure our Node and NodeList knows we've heard from this node.
|
||||
destinationNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
sendingNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
|
||||
NetworkPacket networkPacket(destinationNode, packet);
|
||||
NetworkPacket networkPacket(sendingNode, packet);
|
||||
lock();
|
||||
_packets.push_back(networkPacket);
|
||||
_nodePacketCounts[sendingNode->getUUID()]++;
|
||||
unlock();
|
||||
|
||||
// Make sure to wake our actual processing thread because we now have packets for it to process.
|
||||
|
@ -42,8 +43,9 @@ bool ReceivedPacketProcessor::process() {
|
|||
NetworkPacket& packet = _packets.front(); // get the oldest packet
|
||||
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us
|
||||
_packets.erase(_packets.begin()); // remove the oldest packet
|
||||
_nodePacketCounts[temporary.getSendingNode()->getUUID()]--;
|
||||
unlock(); // let others add to the packets
|
||||
processPacket(temporary.getDestinationNode(), temporary.getByteArray()); // process our temporary copy
|
||||
processPacket(temporary.getSendingNode(), temporary.getByteArray()); // process our temporary copy
|
||||
}
|
||||
return isStillRunning(); // keep running till they terminate us
|
||||
}
|
||||
|
|
|
@ -28,11 +28,16 @@ public:
|
|||
/// \param packetData pointer to received data
|
||||
/// \param ssize_t packetLength size of received data
|
||||
/// \thread network receive thread
|
||||
void queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet);
|
||||
void queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
|
||||
|
||||
/// Are there received packets waiting to be processed
|
||||
bool hasPacketsToProcess() const { return _packets.size() > 0; }
|
||||
|
||||
/// Are there received packets waiting to be processed from a certain node
|
||||
bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const {
|
||||
return _nodePacketCounts[sendingNode->getUUID()] > 0;
|
||||
}
|
||||
|
||||
/// How many received packets waiting are to be processed
|
||||
int packetsToProcessCount() const { return _packets.size(); }
|
||||
|
||||
|
@ -51,7 +56,9 @@ protected:
|
|||
|
||||
private:
|
||||
|
||||
std::vector<NetworkPacket> _packets;
|
||||
QVector<NetworkPacket> _packets;
|
||||
QHash<QUuid, int> _nodePacketCounts;
|
||||
|
||||
QWaitCondition _hasPackets;
|
||||
QMutex _waitingOnPacketsMutex;
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue