Merge pull request #3010 from wangyix/master

NACK packets are not sent to nodes with unprocessed packets; re-sent packets will be counted towards PPS
This commit is contained in:
AndrewMeadows 2014-06-11 11:10:47 -07:00
commit d9b4032ca0
20 changed files with 101 additions and 83 deletions

View file

@ -86,7 +86,7 @@ bool ModelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedModels; return shouldSendDeletedModels;
} }
int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { int ModelServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) {
unsigned char outputBuffer[MAX_PACKET_SIZE]; unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0; size_t packetLength = 0;
@ -99,6 +99,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP
bool hasMoreToSend = true; bool hasMoreToSend = true;
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 models? // TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 models?
packetsSent = 0;
while (hasMoreToSend) { while (hasMoreToSend) {
hasMoreToSend = tree->encodeModelsDeletedSince(queryNode->getSequenceNumber(), deletedModelsSentAt, hasMoreToSend = tree->encodeModelsDeletedSince(queryNode->getSequenceNumber(), deletedModelsSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength); outputBuffer, MAX_PACKET_SIZE, packetLength);
@ -107,6 +108,7 @@ int ModelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->packetSent(outputBuffer, packetLength); queryNode->packetSent(outputBuffer, packetLength);
packetsSent++;
} }
nodeData->setLastDeletedModelsSentAt(deletePacketSentAt); nodeData->setLastDeletedModelsSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun(); virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent);
virtual void modelCreated(const ModelItem& newModel, const SharedNodePointer& senderNode); virtual void modelCreated(const ModelItem& newModel, const SharedNodePointer& senderNode);

View file

@ -85,7 +85,6 @@ bool OctreeSendThread::process() {
if (nodeData && !nodeData->isShuttingDown()) { if (nodeData && !nodeData->isShuttingDown()) {
bool viewFrustumChanged = nodeData->updateCurrentViewFrustum(); bool viewFrustumChanged = nodeData->updateCurrentViewFrustum();
packetDistributor(nodeData, viewFrustumChanged); packetDistributor(nodeData, viewFrustumChanged);
resendNackedPackets(nodeData);
} }
} }
} }
@ -281,26 +280,6 @@ int OctreeSendThread::handlePacketSend(OctreeQueryNode* nodeData, int& trueBytes
return packetsSent; return packetsSent;
} }
int OctreeSendThread::resendNackedPackets(OctreeQueryNode* nodeData) {
const int MAX_PACKETS_RESEND = 10;
int packetsSent = 0;
const QByteArray* packet;
while (nodeData->hasNextNackedPacket() && packetsSent < MAX_PACKETS_RESEND) {
packet = nodeData->getNextNackedPacket();
if (packet) {
NodeList::getInstance()->writeDatagram(*packet, _node);
packetsSent++;
_totalBytes += packet->size();
_totalPackets++;
_totalWastedBytes += MAX_PACKET_SIZE - packet->size();
}
}
return packetsSent;
}
/// Version of voxel distributor that sends the deepest LOD level at once /// Version of voxel distributor that sends the deepest LOD level at once
int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) { int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrustumChanged) {
@ -311,6 +290,10 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
return 0; return 0;
} }
// calculate max number of packets that can be sent during this interval
int clientMaxPacketsPerInterval = std::max(1, (nodeData->getMaxOctreePacketsPerSecond() / INTERVALS_PER_SECOND));
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
int truePacketsSent = 0; int truePacketsSent = 0;
int trueBytesSent = 0; int trueBytesSent = 0;
int packetsSentThisInterval = 0; int packetsSentThisInterval = 0;
@ -408,9 +391,6 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
//quint64 startCompressTimeMsecs = OctreePacketData::getCompressContentTime() / 1000; //quint64 startCompressTimeMsecs = OctreePacketData::getCompressContentTime() / 1000;
//quint64 startCompressCalls = OctreePacketData::getCompressContentCalls(); //quint64 startCompressCalls = OctreePacketData::getCompressContentCalls();
int clientMaxPacketsPerInterval = std::max(1,(nodeData->getMaxOctreePacketsPerSecond() / INTERVALS_PER_SECOND));
int maxPacketsPerInterval = std::min(clientMaxPacketsPerInterval, _myServer->getPacketsPerClientPerInterval());
int extraPackingAttempts = 0; int extraPackingAttempts = 0;
bool completedScene = false; bool completedScene = false;
while (somethingToSend && packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) { while (somethingToSend && packetsSentThisInterval < maxPacketsPerInterval && !nodeData->isShuttingDown()) {
@ -581,12 +561,26 @@ int OctreeSendThread::packetDistributor(OctreeQueryNode* nodeData, bool viewFrus
// send the environment packet // send the environment packet
// TODO: should we turn this into a while loop to better handle sending multiple special packets // TODO: should we turn this into a while loop to better handle sending multiple special packets
if (_myServer->hasSpecialPacketToSend(_node) && !nodeData->isShuttingDown()) { if (_myServer->hasSpecialPacketToSend(_node) && !nodeData->isShuttingDown()) {
trueBytesSent += _myServer->sendSpecialPacket(nodeData, _node); int specialPacketsSent;
trueBytesSent += _myServer->sendSpecialPacket(_node, nodeData, specialPacketsSent);
nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed nodeData->resetOctreePacket(); // because nodeData's _sequenceNumber has changed
truePacketsSent++; truePacketsSent += specialPacketsSent;
packetsSentThisInterval++; packetsSentThisInterval += specialPacketsSent;
} }
// Re-send packets that were nacked by the client
while (nodeData->hasNextNackedPacket() && packetsSentThisInterval < maxPacketsPerInterval) {
const QByteArray* packet = nodeData->getNextNackedPacket();
if (packet) {
NodeList::getInstance()->writeDatagram(*packet, _node);
truePacketsSent++;
packetsSentThisInterval++;
_totalBytes += packet->size();
_totalPackets++;
_totalWastedBytes += MAX_PACKET_SIZE - packet->size();
}
}
quint64 end = usecTimestampNow(); quint64 end = usecTimestampNow();
int elapsedmsec = (end - start)/USECS_PER_MSEC; int elapsedmsec = (end - start)/USECS_PER_MSEC;

View file

@ -55,9 +55,6 @@ private:
int _nodeMissingCount; int _nodeMissingCount;
bool _isShuttingDown; bool _isShuttingDown;
int resendNackedPackets(OctreeQueryNode* nodeData);
}; };
#endif // hifi_OctreeSendThread_h #endif // hifi_OctreeSendThread_h

View file

@ -72,7 +72,7 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun() { }; virtual void beforeRun() { };
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; } virtual bool hasSpecialPacketToSend(const SharedNodePointer& node) { return false; }
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { return 0; } virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) { return 0; }
static void attachQueryNodeToNode(Node* newNode); static void attachQueryNodeToNode(Node* newNode);

View file

@ -86,7 +86,7 @@ bool ParticleServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendDeletedParticles; return shouldSendDeletedParticles;
} }
int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { int ParticleServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) {
unsigned char outputBuffer[MAX_PACKET_SIZE]; unsigned char outputBuffer[MAX_PACKET_SIZE];
size_t packetLength = 0; size_t packetLength = 0;
@ -99,6 +99,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo
bool hasMoreToSend = true; bool hasMoreToSend = true;
// TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 particles? // TODO: is it possible to send too many of these packets? what if you deleted 1,000,000 particles?
packetsSent = 0;
while (hasMoreToSend) { while (hasMoreToSend) {
hasMoreToSend = tree->encodeParticlesDeletedSince(queryNode->getSequenceNumber(), deletedParticlesSentAt, hasMoreToSend = tree->encodeParticlesDeletedSince(queryNode->getSequenceNumber(), deletedParticlesSentAt,
outputBuffer, MAX_PACKET_SIZE, packetLength); outputBuffer, MAX_PACKET_SIZE, packetLength);
@ -107,6 +108,7 @@ int ParticleServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNo
NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node)); NodeList::getInstance()->writeDatagram((char*) outputBuffer, packetLength, SharedNodePointer(node));
queryNode->packetSent(outputBuffer, packetLength); queryNode->packetSent(outputBuffer, packetLength);
packetsSent++;
} }
nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt); nodeData->setLastDeletedParticlesSentAt(deletePacketSentAt);

View file

@ -37,7 +37,7 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun(); virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent);
virtual void particleCreated(const Particle& newParticle, const SharedNodePointer& senderNode); virtual void particleCreated(const Particle& newParticle, const SharedNodePointer& senderNode);

View file

@ -40,7 +40,7 @@ bool VoxelServer::hasSpecialPacketToSend(const SharedNodePointer& node) {
return shouldSendEnvironments; return shouldSendEnvironments;
} }
int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node) { int VoxelServer::sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent) {
unsigned char* copyAt = _tempOutputBuffer; unsigned char* copyAt = _tempOutputBuffer;
@ -76,6 +76,7 @@ int VoxelServer::sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodeP
NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node)); NodeList::getInstance()->writeDatagram((char*) _tempOutputBuffer, envPacketLength, SharedNodePointer(node));
queryNode->packetSent(_tempOutputBuffer, envPacketLength); queryNode->packetSent(_tempOutputBuffer, envPacketLength);
packetsSent = 1;
return envPacketLength; return envPacketLength;
} }

View file

@ -46,7 +46,7 @@ public:
// subclass may implement these method // subclass may implement these method
virtual void beforeRun(); virtual void beforeRun();
virtual bool hasSpecialPacketToSend(const SharedNodePointer& node); virtual bool hasSpecialPacketToSend(const SharedNodePointer& node);
virtual int sendSpecialPacket(OctreeQueryNode* queryNode, const SharedNodePointer& node); virtual int sendSpecialPacket(const SharedNodePointer& node, OctreeQueryNode* queryNode, int& packetsSent);
private: private:
bool _sendEnvironments; bool _sendEnvironments;

View file

@ -158,7 +158,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
_mousePressed(false), _mousePressed(false),
_audio(STARTUP_JITTER_SAMPLES), _audio(STARTUP_JITTER_SAMPLES),
_enableProcessVoxelsThread(true), _enableProcessVoxelsThread(true),
_voxelProcessor(), _octreeProcessor(),
_voxelHideShowThread(&_voxels), _voxelHideShowThread(&_voxels),
_packetsPerSecond(0), _packetsPerSecond(0),
_bytesPerSecond(0), _bytesPerSecond(0),
@ -244,6 +244,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer &startup_time) :
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), &_voxels, SLOT(nodeAdded(SharedNodePointer))); connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), &_voxels, SLOT(nodeAdded(SharedNodePointer)));
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_voxels, SLOT(nodeKilled(SharedNodePointer))); connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_voxels, SLOT(nodeKilled(SharedNodePointer)));
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_octreeProcessor, SLOT(nodeKilled(SharedNodePointer)));
connect(nodeList, &NodeList::uuidChanged, this, &Application::updateWindowTitle); connect(nodeList, &NodeList::uuidChanged, this, &Application::updateWindowTitle);
connect(nodeList, SIGNAL(uuidChanged(const QUuid&)), _myAvatar, SLOT(setSessionUUID(const QUuid&))); connect(nodeList, SIGNAL(uuidChanged(const QUuid&)), _myAvatar, SLOT(setSessionUUID(const QUuid&)));
connect(nodeList, &NodeList::limitOfSilentDomainCheckInsReached, nodeList, &NodeList::reset); connect(nodeList, &NodeList::limitOfSilentDomainCheckInsReached, nodeList, &NodeList::reset);
@ -418,7 +419,7 @@ Application::~Application() {
_audio.thread()->quit(); _audio.thread()->quit();
_audio.thread()->wait(); _audio.thread()->wait();
_voxelProcessor.terminate(); _octreeProcessor.terminate();
_voxelHideShowThread.terminate(); _voxelHideShowThread.terminate();
_voxelEditSender.terminate(); _voxelEditSender.terminate();
_particleEditSender.terminate(); _particleEditSender.terminate();
@ -517,7 +518,7 @@ void Application::initializeGL() {
qDebug( "init() complete."); qDebug( "init() complete.");
// create thread for parsing of voxel data independent of the main network and rendering threads // create thread for parsing of voxel data independent of the main network and rendering threads
_voxelProcessor.initialize(_enableProcessVoxelsThread); _octreeProcessor.initialize(_enableProcessVoxelsThread);
_voxelEditSender.initialize(_enableProcessVoxelsThread); _voxelEditSender.initialize(_enableProcessVoxelsThread);
_voxelHideShowThread.initialize(_enableProcessVoxelsThread); _voxelHideShowThread.initialize(_enableProcessVoxelsThread);
_particleEditSender.initialize(_enableProcessVoxelsThread); _particleEditSender.initialize(_enableProcessVoxelsThread);
@ -1884,7 +1885,7 @@ void Application::updateThreads(float deltaTime) {
// parse voxel packets // parse voxel packets
if (!_enableProcessVoxelsThread) { if (!_enableProcessVoxelsThread) {
_voxelProcessor.threadRoutine(); _octreeProcessor.threadRoutine();
_voxelHideShowThread.threadRoutine(); _voxelHideShowThread.threadRoutine();
_voxelEditSender.threadRoutine(); _voxelEditSender.threadRoutine();
_particleEditSender.threadRoutine(); _particleEditSender.threadRoutine();
@ -2095,7 +2096,7 @@ void Application::updateMyAvatar(float deltaTime) {
} }
} }
// sent a nack packet containing missing sequence numbers of received packets // sent nack packets containing missing sequence numbers of received packets from nodes
{ {
quint64 now = usecTimestampNow(); quint64 now = usecTimestampNow();
quint64 sinceLastNack = now - _lastNackTime; quint64 sinceLastNack = now - _lastNackTime;
@ -2125,6 +2126,12 @@ void Application::sendNack() {
|| node->getType() == NodeType::ModelServer) || node->getType() == NodeType::ModelServer)
) { ) {
// if there are octree packets from this node that are waiting to be processed,
// don't send a NACK since the missing packets may be among those waiting packets.
if (_octreeProcessor.hasPacketsToProcessFrom(node)) {
continue;
}
QUuid nodeUUID = node->getUUID(); QUuid nodeUUID = node->getUUID();
_octreeSceneStatsLock.lockForRead(); _octreeSceneStatsLock.lockForRead();

View file

@ -88,7 +88,7 @@
#include "voxels/VoxelFade.h" #include "voxels/VoxelFade.h"
#include "voxels/VoxelHideShowThread.h" #include "voxels/VoxelHideShowThread.h"
#include "voxels/VoxelImporter.h" #include "voxels/VoxelImporter.h"
#include "voxels/VoxelPacketProcessor.h" #include "voxels/OctreePacketProcessor.h"
#include "voxels/VoxelSystem.h" #include "voxels/VoxelSystem.h"
@ -129,7 +129,7 @@ static const float MIRROR_FIELD_OF_VIEW = 30.0f;
class Application : public QApplication { class Application : public QApplication {
Q_OBJECT Q_OBJECT
friend class VoxelPacketProcessor; friend class OctreePacketProcessor;
friend class VoxelEditPacketSender; friend class VoxelEditPacketSender;
friend class DatagramProcessor; friend class DatagramProcessor;
@ -192,7 +192,7 @@ public:
ViewFrustum* getShadowViewFrustum() { return &_shadowViewFrustum; } ViewFrustum* getShadowViewFrustum() { return &_shadowViewFrustum; }
VoxelSystem* getVoxels() { return &_voxels; } VoxelSystem* getVoxels() { return &_voxels; }
VoxelTree* getVoxelTree() { return _voxels.getTree(); } VoxelTree* getVoxelTree() { return _voxels.getTree(); }
const VoxelPacketProcessor& getVoxelPacketProcessor() const { return _voxelProcessor; } const OctreePacketProcessor& getOctreePacketProcessor() const { return _octreeProcessor; }
ParticleTreeRenderer* getParticles() { return &_particles; } ParticleTreeRenderer* getParticles() { return &_particles; }
MetavoxelSystem* getMetavoxels() { return &_metavoxels; } MetavoxelSystem* getMetavoxels() { return &_metavoxels; }
ModelTreeRenderer* getModels() { return &_models; } ModelTreeRenderer* getModels() { return &_models; }
@ -533,7 +533,7 @@ private:
Audio _audio; Audio _audio;
bool _enableProcessVoxelsThread; bool _enableProcessVoxelsThread;
VoxelPacketProcessor _voxelProcessor; OctreePacketProcessor _octreeProcessor;
VoxelHideShowThread _voxelHideShowThread; VoxelHideShowThread _voxelHideShowThread;
VoxelEditPacketSender _voxelEditSender; VoxelEditPacketSender _voxelEditSender;
ParticleEditPacketSender _particleEditSender; ParticleEditPacketSender _particleEditSender;

View file

@ -71,7 +71,7 @@ void DatagramProcessor::processDatagrams() {
case PacketTypeOctreeStats: case PacketTypeOctreeStats:
case PacketTypeEnvironmentData: { case PacketTypeEnvironmentData: {
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); "Application::networkReceive()... _octreeProcessor.queueReceivedPacket()");
bool wantExtraDebugging = application->getLogger()->extraDebugging(); bool wantExtraDebugging = application->getLogger()->extraDebugging();
if (wantExtraDebugging && packetTypeForPacket(incomingPacket) == PacketTypeVoxelData) { if (wantExtraDebugging && packetTypeForPacket(incomingPacket) == PacketTypeVoxelData) {
@ -92,7 +92,7 @@ void DatagramProcessor::processDatagrams() {
if (matchedNode) { if (matchedNode) {
// add this packet to our list of voxel packets and process them on the voxel processing // add this packet to our list of voxel packets and process them on the voxel processing
application->_voxelProcessor.queueReceivedPacket(matchedNode, incomingPacket); application->_octreeProcessor.queueReceivedPacket(matchedNode, incomingPacket);
} }
break; break;

View file

@ -45,7 +45,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) {
QGLWidget* glWidget = application->getGLWidget(); QGLWidget* glWidget = application->getGLWidget();
MyAvatar* myAvatar = application->getAvatar(); MyAvatar* myAvatar = application->getAvatar();
Audio* audio = application->getAudio(); Audio* audio = application->getAudio();
const VoxelPacketProcessor& voxelPacketProcessor = application->getVoxelPacketProcessor(); const OctreePacketProcessor& octreePacketProcessor = application->getOctreePacketProcessor();
BandwidthMeter* bandwidthMeter = application->getBandwidthMeter(); BandwidthMeter* bandwidthMeter = application->getBandwidthMeter();
NodeBounds& nodeBoundsDisplay = application->getNodeBoundsDisplay(); NodeBounds& nodeBoundsDisplay = application->getNodeBoundsDisplay();
@ -200,7 +200,7 @@ void ApplicationOverlay::renderOverlay(bool renderToTexture) {
if (Menu::getInstance()->isOptionChecked(MenuOption::Stats)) { if (Menu::getInstance()->isOptionChecked(MenuOption::Stats)) {
// let's set horizontal offset to give stats some margin to mirror // let's set horizontal offset to give stats some margin to mirror
int horizontalOffset = MIRROR_VIEW_WIDTH + MIRROR_VIEW_LEFT_PADDING * 2; int horizontalOffset = MIRROR_VIEW_WIDTH + MIRROR_VIEW_LEFT_PADDING * 2;
int voxelPacketsToProcess = voxelPacketProcessor.packetsToProcessCount(); int voxelPacketsToProcess = octreePacketProcessor.packetsToProcessCount();
// Onscreen text about position, servers, etc // Onscreen text about position, servers, etc
Stats::getInstance()->display(WHITE_TEXT, horizontalOffset, application->getFps(), application->getPacketsPerSecond(), application->getBytesPerSecond(), voxelPacketsToProcess); Stats::getInstance()->display(WHITE_TEXT, horizontalOffset, application->getFps(), application->getPacketsPerSecond(), application->getBytesPerSecond(), voxelPacketsToProcess);
// Bandwidth meter // Bandwidth meter

View file

@ -1,5 +1,5 @@
// //
// VoxelPacketProcessor.cpp // OctreePacketProcessor.cpp
// interface/src/voxels // interface/src/voxels
// //
// Created by Brad Hefta-Gaub on 8/12/13. // Created by Brad Hefta-Gaub on 8/12/13.
@ -13,18 +13,18 @@
#include "Application.h" #include "Application.h"
#include "Menu.h" #include "Menu.h"
#include "VoxelPacketProcessor.h" #include "OctreePacketProcessor.h"
void VoxelPacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) { void OctreePacketProcessor::processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"VoxelPacketProcessor::processPacket()"); "OctreePacketProcessor::processPacket()");
QByteArray mutablePacket = packet; QByteArray mutablePacket = packet;
const int WAY_BEHIND = 300; const int WAY_BEHIND = 300;
if (packetsToProcessCount() > WAY_BEHIND && Application::getInstance()->getLogger()->extraDebugging()) { if (packetsToProcessCount() > WAY_BEHIND && Application::getInstance()->getLogger()->extraDebugging()) {
qDebug("VoxelPacketProcessor::processPacket() packets to process=%d", packetsToProcessCount()); qDebug("OctreePacketProcessor::processPacket() packets to process=%d", packetsToProcessCount());
} }
ssize_t messageLength = mutablePacket.size(); ssize_t messageLength = mutablePacket.size();

View file

@ -1,5 +1,5 @@
// //
// VoxelPacketProcessor.h // OctreePacketProcessor.h
// interface/src/voxels // interface/src/voxels
// //
// Created by Brad Hefta-Gaub on 8/12/13. // Created by Brad Hefta-Gaub on 8/12/13.
@ -9,16 +9,16 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#ifndef hifi_VoxelPacketProcessor_h #ifndef hifi_OctreePacketProcessor_h
#define hifi_VoxelPacketProcessor_h #define hifi_OctreePacketProcessor_h
#include <ReceivedPacketProcessor.h> #include <ReceivedPacketProcessor.h>
/// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes /// Handles processing of incoming voxel packets for the interface application. As with other ReceivedPacketProcessor classes
/// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket() /// the user is responsible for reading inbound packets and adding them to the processing queue by calling queueReceivedPacket()
class VoxelPacketProcessor : public ReceivedPacketProcessor { class OctreePacketProcessor : public ReceivedPacketProcessor {
Q_OBJECT Q_OBJECT
protected: protected:
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet); virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
}; };
#endif // hifi_VoxelPacketProcessor_h #endif // hifi_OctreePacketProcessor_h

View file

@ -17,9 +17,9 @@
#include "NetworkPacket.h" #include "NetworkPacket.h"
void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const QByteArray& packet) { void NetworkPacket::copyContents(const SharedNodePointer& node, const QByteArray& packet) {
if (packet.size() && packet.size() <= MAX_PACKET_SIZE) { if (packet.size() && packet.size() <= MAX_PACKET_SIZE) {
_destinationNode = destinationNode; _node = node;
_byteArray = packet; _byteArray = packet;
} else { } else {
qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size()); qDebug(">>> NetworkPacket::copyContents() unexpected length = %d", packet.size());
@ -27,28 +27,28 @@ void NetworkPacket::copyContents(const SharedNodePointer& destinationNode, const
} }
NetworkPacket::NetworkPacket(const NetworkPacket& packet) { NetworkPacket::NetworkPacket(const NetworkPacket& packet) {
copyContents(packet.getDestinationNode(), packet.getByteArray()); copyContents(packet.getNode(), packet.getByteArray());
} }
NetworkPacket::NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { NetworkPacket::NetworkPacket(const SharedNodePointer& node, const QByteArray& packet) {
copyContents(destinationNode, packet); copyContents(node, packet);
}; };
// copy assignment // copy assignment
NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) { NetworkPacket& NetworkPacket::operator=(NetworkPacket const& other) {
copyContents(other.getDestinationNode(), other.getByteArray()); copyContents(other.getNode(), other.getByteArray());
return *this; return *this;
} }
#ifdef HAS_MOVE_SEMANTICS #ifdef HAS_MOVE_SEMANTICS
// move, same as copy, but other packet won't be used further // move, same as copy, but other packet won't be used further
NetworkPacket::NetworkPacket(NetworkPacket && packet) { NetworkPacket::NetworkPacket(NetworkPacket && packet) {
copyContents(packet.getDestinationNode(), packet.getByteArray()); copyContents(packet.getNode(), packet.getByteArray());
} }
// move assignment // move assignment
NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) { NetworkPacket& NetworkPacket::operator=(NetworkPacket&& other) {
copyContents(other.getDestinationNode(), other.getByteArray()); copyContents(other.getNode(), other.getByteArray());
return *this; return *this;
} }
#endif #endif

View file

@ -26,6 +26,7 @@
/// Storage of not-yet processed inbound, or not yet sent outbound generic UDP network packet /// Storage of not-yet processed inbound, or not yet sent outbound generic UDP network packet
class NetworkPacket { class NetworkPacket {
public: public:
NetworkPacket() { }
NetworkPacket(const NetworkPacket& packet); // copy constructor NetworkPacket(const NetworkPacket& packet); // copy constructor
NetworkPacket& operator= (const NetworkPacket& other); // copy assignment NetworkPacket& operator= (const NetworkPacket& other); // copy assignment
@ -34,15 +35,15 @@ public:
NetworkPacket& operator= (NetworkPacket&& other); // move assignment NetworkPacket& operator= (NetworkPacket&& other); // move assignment
#endif #endif
NetworkPacket(const SharedNodePointer& destinationNode, const QByteArray& byteArray); NetworkPacket(const SharedNodePointer& node, const QByteArray& byteArray);
const SharedNodePointer& getDestinationNode() const { return _destinationNode; } const SharedNodePointer& getNode() const { return _node; }
const QByteArray& getByteArray() const { return _byteArray; } const QByteArray& getByteArray() const { return _byteArray; }
private: private:
void copyContents(const SharedNodePointer& destinationNode, const QByteArray& byteArray); void copyContents(const SharedNodePointer& node, const QByteArray& byteArray);
SharedNodePointer _destinationNode; SharedNodePointer _node;
QByteArray _byteArray; QByteArray _byteArray;
}; };

View file

@ -271,7 +271,7 @@ bool PacketSender::nonThreadedProcess() {
unlock(); unlock();
// send the packet through the NodeList... // send the packet through the NodeList...
NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getDestinationNode()); NodeList::getInstance()->writeDatagram(temporary.getByteArray(), temporary.getNode());
packetsSentThisCall++; packetsSentThisCall++;
_packetsOverCheckInterval++; _packetsOverCheckInterval++;
_totalPacketsSent++; _totalPacketsSent++;

View file

@ -17,13 +17,14 @@ void ReceivedPacketProcessor::terminating() {
_hasPackets.wakeAll(); _hasPackets.wakeAll();
} }
void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet) { void ReceivedPacketProcessor::queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) {
// Make sure our Node and NodeList knows we've heard from this node. // Make sure our Node and NodeList knows we've heard from this node.
destinationNode->setLastHeardMicrostamp(usecTimestampNow()); sendingNode->setLastHeardMicrostamp(usecTimestampNow());
NetworkPacket networkPacket(destinationNode, packet); NetworkPacket networkPacket(sendingNode, packet);
lock(); lock();
_packets.push_back(networkPacket); _packets.push_back(networkPacket);
_nodePacketCounts[sendingNode->getUUID()]++;
unlock(); unlock();
// Make sure to wake our actual processing thread because we now have packets for it to process. // Make sure to wake our actual processing thread because we now have packets for it to process.
@ -42,8 +43,13 @@ bool ReceivedPacketProcessor::process() {
NetworkPacket& packet = _packets.front(); // get the oldest packet NetworkPacket& packet = _packets.front(); // get the oldest packet
NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us NetworkPacket temporary = packet; // make a copy of the packet in case the vector is resized on us
_packets.erase(_packets.begin()); // remove the oldest packet _packets.erase(_packets.begin()); // remove the oldest packet
_nodePacketCounts[temporary.getNode()->getUUID()]--;
unlock(); // let others add to the packets unlock(); // let others add to the packets
processPacket(temporary.getDestinationNode(), temporary.getByteArray()); // process our temporary copy processPacket(temporary.getNode(), temporary.getByteArray()); // process our temporary copy
} }
return isStillRunning(); // keep running till they terminate us return isStillRunning(); // keep running till they terminate us
} }
void ReceivedPacketProcessor::killNode(const SharedNodePointer& node) {
_nodePacketCounts.remove(node->getUUID());
}

View file

@ -28,20 +28,26 @@ public:
/// \param packetData pointer to received data /// \param packetData pointer to received data
/// \param ssize_t packetLength size of received data /// \param ssize_t packetLength size of received data
/// \thread network receive thread /// \thread network receive thread
void queueReceivedPacket(const SharedNodePointer& destinationNode, const QByteArray& packet); void queueReceivedPacket(const SharedNodePointer& sendingNode, const QByteArray& packet);
/// Are there received packets waiting to be processed /// Are there received packets waiting to be processed
bool hasPacketsToProcess() const { return _packets.size() > 0; } bool hasPacketsToProcess() const { return _packets.size() > 0; }
/// Are there received packets waiting to be processed from a certain node
bool hasPacketsToProcessFrom(const SharedNodePointer& sendingNode) const {
return _nodePacketCounts[sendingNode->getUUID()] > 0;
}
/// How many received packets waiting are to be processed /// How many received packets waiting are to be processed
int packetsToProcessCount() const { return _packets.size(); } int packetsToProcessCount() const { return _packets.size(); }
public slots:
void killNode(const SharedNodePointer& node);
protected: protected:
/// Callback for processing of recieved packets. Implement this to process the incoming packets. /// Callback for processing of recieved packets. Implement this to process the incoming packets.
/// \param sockaddr& senderAddress the address of the sender /// \param SharedNodePointer& sendingNode the node that sent this packet
/// \param packetData pointer to received data /// \param QByteArray& the packet to be processed
/// \param ssize_t packetLength size of received data
/// \thread "this" individual processing thread
virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) = 0; virtual void processPacket(const SharedNodePointer& sendingNode, const QByteArray& packet) = 0;
/// Implements generic processing behavior for this thread. /// Implements generic processing behavior for this thread.
@ -51,7 +57,9 @@ protected:
private: private:
std::vector<NetworkPacket> _packets; QVector<NetworkPacket> _packets;
QHash<QUuid, int> _nodePacketCounts;
QWaitCondition _hasPackets; QWaitCondition _hasPackets;
QMutex _waitingOnPacketsMutex; QMutex _waitingOnPacketsMutex;
}; };