diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 3d206e4e73..cf865ce299 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -108,6 +108,8 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : QApplication(argc, argv), _window(new QMainWindow(desktop())), _glWidget(new GLCanvas()), + _nodeThread(new QThread(this)), + _datagramProcessor(new DatagramProcessor()), _frameCount(0), _fps(120.0f), _justStarted(true), @@ -145,10 +147,8 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : _voxelHideShowThread(&_voxels), _voxelEditSender(this), _particleEditSender(this), - _packetCount(0), _packetsPerSecond(0), _bytesPerSecond(0), - _bytesCount(0), _recentMaxPackets(0), _resetRecentMaxPacketsSoon(true), _swatch(NULL), @@ -173,11 +173,21 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : if (portStr) { listenPort = atoi(portStr); } - + + // put the NodeList and datagram processing on the node thread NodeList* nodeList = NodeList::createInstance(NODE_TYPE_AGENT, listenPort); - - // connect our processDatagrams slot to the QUDPSocket readyRead() signal - connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), SLOT(processDatagrams())); + + nodeList->moveToThread(_nodeThread); + _datagramProcessor->moveToThread(_nodeThread); + + // connect the DataProcessor processDatagrams slot to the QUDPSocket readyRead() signal + connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), _datagramProcessor, SLOT(processDatagrams())); + + // make sure the node thread is given highest priority + _nodeThread->setPriority(QThread::TimeCriticalPriority); + + // start the nodeThread so its event loop is running + _nodeThread->start(); // put the audio processing on a separate thread QThread* audioThread = new QThread(this); @@ -192,7 +202,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer))); connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), &_voxels, SLOT(nodeAdded(SharedNodePointer))); connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_voxels, SLOT(nodeKilled(SharedNodePointer))); - + // read the ApplicationInfo.ini file for Name/Version/Domain information QSettings applicationInfo("resources/info/ApplicationInfo.ini", QSettings::IniFormat); @@ -225,8 +235,10 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : NODE_TYPE_PARTICLE_SERVER, NODE_TYPE_METAVOXEL_SERVER}; nodeList->setNodeTypesOfInterest(nodeTypesOfInterest, sizeof(nodeTypesOfInterest)); + // move the silentNodeTimer to the _nodeThread QTimer* silentNodeTimer = new QTimer(this); connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes())); + silentNodeTimer->moveToThread(_nodeThread); silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000); QString cachePath = QStandardPaths::writableLocation(QStandardPaths::DataLocation); @@ -273,6 +285,10 @@ Application::~Application() { // make sure we don't call the idle timer any more delete idleTimer; + + // ask the datagram processing thread to quit and wait until it is done + _nodeThread->thread()->quit(); + _nodeThread->thread()->wait(); // ask the audio thread to quit and wait until it is done _audio.thread()->quit(); @@ -1312,11 +1328,12 @@ void Application::timer() { } _fps = (float)_frameCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f); - _packetsPerSecond = (float)_packetCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f); - _bytesPerSecond = (float)_bytesCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f); + + _packetsPerSecond = (float) _datagramProcessor->getPacketCount() / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f); + _bytesPerSecond = (float) _datagramProcessor->getByteCount() / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f); _frameCount = 0; - _packetCount = 0; - _bytesCount = 0; + + _datagramProcessor->resetCounters(); gettimeofday(&_timerStart, NULL); @@ -4052,93 +4069,6 @@ int Application::parseOctreeStats(unsigned char* messageData, ssize_t messageLen return statsMessageLength; } -// Receive packets from other nodes/servers and decide what to do with them! -void Application::processDatagrams() { - PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), - "Application::networkReceive()"); - - HifiSockAddr senderSockAddr; - ssize_t bytesReceived; - - while (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() && - (bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) _incomingPacket, - MAX_PACKET_SIZE, - senderSockAddr.getAddressPointer(), - senderSockAddr.getPortPointer()))) { - - _packetCount++; - _bytesCount += bytesReceived; - - if (packetVersionMatch(_incomingPacket)) { - // only process this packet if we have a match on the packet version - switch (_incomingPacket[0]) { - case PACKET_TYPE_TRANSMITTER_DATA_V2: - // V2 = IOS transmitter app - _myTransmitter.processIncomingData(_incomingPacket, bytesReceived); - - break; - case PACKET_TYPE_MIXED_AUDIO: - QMetaObject::invokeMethod(&_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection, - Q_ARG(QByteArray, QByteArray((char*) _incomingPacket, bytesReceived))); - break; - - case PACKET_TYPE_PARTICLE_ADD_RESPONSE: - // this will keep creatorTokenIDs to IDs mapped correctly - Particle::handleAddParticleResponse(_incomingPacket, bytesReceived); - break; - - case PACKET_TYPE_PARTICLE_DATA: - case PACKET_TYPE_PARTICLE_ERASE: - case PACKET_TYPE_VOXEL_DATA: - case PACKET_TYPE_VOXEL_ERASE: - case PACKET_TYPE_OCTREE_STATS: - case PACKET_TYPE_ENVIRONMENT_DATA: { - PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), - "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); - - bool wantExtraDebugging = getLogger()->extraDebugging(); - if (wantExtraDebugging && _incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) { - int numBytesPacketHeader = numBytesForPacketHeader(_incomingPacket); - unsigned char* dataAt = _incomingPacket + numBytesPacketHeader; - dataAt += sizeof(VOXEL_PACKET_FLAGS); - VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt); - dataAt += sizeof(VOXEL_PACKET_SEQUENCE); - VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt); - dataAt += sizeof(VOXEL_PACKET_SENT_TIME); - VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow(); - int flightTime = arrivedAt - sentAt; - - printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime); - } - - // add this packet to our list of voxel packets and process them on the voxel processing - _voxelProcessor.queueReceivedPacket(senderSockAddr, _incomingPacket, bytesReceived); - break; - } - case PACKET_TYPE_METAVOXEL_DATA: - _metavoxels.processData(QByteArray((const char*) _incomingPacket, bytesReceived), - senderSockAddr); - break; - case PACKET_TYPE_BULK_AVATAR_DATA: - NodeList::getInstance()->processBulkNodeData(senderSockAddr, - _incomingPacket, - bytesReceived); - getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived); - break; - case PACKET_TYPE_DATA_SERVER_GET: - case PACKET_TYPE_DATA_SERVER_PUT: - case PACKET_TYPE_DATA_SERVER_SEND: - case PACKET_TYPE_DATA_SERVER_CONFIRM: - DataServerClient::processMessageFromDataServer(_incomingPacket, bytesReceived); - break; - default: - NodeList::getInstance()->processNodeData(senderSockAddr, _incomingPacket, bytesReceived); - break; - } - } - } -} - void Application::packetSentNotification(ssize_t length) { _bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(length); } diff --git a/interface/src/Application.h b/interface/src/Application.h index 6be01db39f..514591e1df 100644 --- a/interface/src/Application.h +++ b/interface/src/Application.h @@ -33,6 +33,7 @@ #include "BandwidthMeter.h" #include "Camera.h" #include "Cloud.h" +#include "DatagramProcessor.h" #include "Environment.h" #include "GLCanvas.h" #include "MetavoxelSystem.h" @@ -96,6 +97,7 @@ class Application : public QApplication, public PacketSenderNotify { friend class VoxelPacketProcessor; friend class VoxelEditPacketSender; + friend class DatagramProcessor; public: static Application* getInstance() { return static_cast<Application*>(QCoreApplication::instance()); } @@ -208,8 +210,6 @@ public slots: void domainChanged(const QString& domainHostname); void nodeKilled(SharedNodePointer node); - void processDatagrams(); - void exportVoxels(); void importVoxels(); void cutVoxels(); @@ -331,6 +331,9 @@ private: QGLWidget* _glWidget; BandwidthMeter _bandwidthMeter; + + QThread* _nodeThread; + DatagramProcessor* _datagramProcessor; QNetworkAccessManager* _networkAccessManager; QSettings* _settings; @@ -461,11 +464,8 @@ private: VoxelEditPacketSender _voxelEditSender; ParticleEditPacketSender _particleEditSender; - unsigned char _incomingPacket[MAX_PACKET_SIZE]; - int _packetCount; int _packetsPerSecond; int _bytesPerSecond; - int _bytesCount; int _recentMaxPackets; // recent max incoming voxel packets to process bool _resetRecentMaxPacketsSoon; diff --git a/interface/src/Audio.cpp b/interface/src/Audio.cpp index 0709b40624..f31ab9056f 100644 --- a/interface/src/Audio.cpp +++ b/interface/src/Audio.cpp @@ -415,7 +415,7 @@ void Audio::addReceivedAudioToBuffer(const QByteArray& audioByteArray) { _totalPacketsReceived++; double timeDiff = diffclock(&_lastReceiveTime, ¤tReceiveTime); - + // Discard first few received packets for computing jitter (often they pile up on start) if (_totalPacketsReceived > NUM_INITIAL_PACKETS_DISCARD) { _stdev.addValue(timeDiff); diff --git a/interface/src/DatagramProcessor.cpp b/interface/src/DatagramProcessor.cpp new file mode 100644 index 0000000000..9a6c1bea05 --- /dev/null +++ b/interface/src/DatagramProcessor.cpp @@ -0,0 +1,110 @@ +// +// DatagramProcessor.cpp +// hifi +// +// Created by Stephen Birarda on 1/23/2014. +// Copyright (c) 2014 HighFidelity, Inc. All rights reserved. +// + +#include <PerfStat.h> + +#include "Application.h" +#include "Menu.h" + +#include "DatagramProcessor.h" + +DatagramProcessor::DatagramProcessor(QObject* parent) : + QObject(parent) +{ + +} + +void DatagramProcessor::processDatagrams() { + PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), + "DatagramProcessor::processDatagrams()"); + + HifiSockAddr senderSockAddr; + ssize_t bytesReceived; + + static unsigned char incomingPacket[MAX_PACKET_SIZE]; + + Application* application = Application::getInstance(); + + while (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() && + (bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) incomingPacket, + MAX_PACKET_SIZE, + senderSockAddr.getAddressPointer(), + senderSockAddr.getPortPointer()))) { + + _packetCount++; + _byteCount += bytesReceived; + + if (packetVersionMatch(incomingPacket)) { + // only process this packet if we have a match on the packet version + switch (incomingPacket[0]) { + case PACKET_TYPE_TRANSMITTER_DATA_V2: + // V2 = IOS transmitter app + application->_myTransmitter.processIncomingData(incomingPacket, bytesReceived); + + break; + case PACKET_TYPE_MIXED_AUDIO: + QMetaObject::invokeMethod(&application->_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection, + Q_ARG(QByteArray, QByteArray((char*) incomingPacket, bytesReceived))); + break; + + case PACKET_TYPE_PARTICLE_ADD_RESPONSE: + // this will keep creatorTokenIDs to IDs mapped correctly + Particle::handleAddParticleResponse(incomingPacket, bytesReceived); + break; + + case PACKET_TYPE_PARTICLE_DATA: + case PACKET_TYPE_PARTICLE_ERASE: + case PACKET_TYPE_VOXEL_DATA: + case PACKET_TYPE_VOXEL_ERASE: + case PACKET_TYPE_OCTREE_STATS: + case PACKET_TYPE_ENVIRONMENT_DATA: { + PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), + "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); + + bool wantExtraDebugging = application->getLogger()->extraDebugging(); + if (wantExtraDebugging && incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) { + int numBytesPacketHeader = numBytesForPacketHeader(incomingPacket); + unsigned char* dataAt = incomingPacket + numBytesPacketHeader; + dataAt += sizeof(VOXEL_PACKET_FLAGS); + VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt); + dataAt += sizeof(VOXEL_PACKET_SEQUENCE); + VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt); + dataAt += sizeof(VOXEL_PACKET_SENT_TIME); + VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow(); + int flightTime = arrivedAt - sentAt; + + printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime); + } + + // add this packet to our list of voxel packets and process them on the voxel processing + application->_voxelProcessor.queueReceivedPacket(senderSockAddr, incomingPacket, bytesReceived); + break; + } + case PACKET_TYPE_METAVOXEL_DATA: + application->_metavoxels.processData(QByteArray((const char*) incomingPacket, bytesReceived), + senderSockAddr); + break; + case PACKET_TYPE_BULK_AVATAR_DATA: + NodeList::getInstance()->processBulkNodeData(senderSockAddr, + incomingPacket, + bytesReceived); + application->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived); + break; + case PACKET_TYPE_DATA_SERVER_GET: + case PACKET_TYPE_DATA_SERVER_PUT: + case PACKET_TYPE_DATA_SERVER_SEND: + case PACKET_TYPE_DATA_SERVER_CONFIRM: + DataServerClient::processMessageFromDataServer(incomingPacket, bytesReceived); + break; + default: + NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket, bytesReceived); + break; + } + } + } +} \ No newline at end of file diff --git a/interface/src/DatagramProcessor.h b/interface/src/DatagramProcessor.h new file mode 100644 index 0000000000..722e5a9d41 --- /dev/null +++ b/interface/src/DatagramProcessor.h @@ -0,0 +1,31 @@ +// +// DatagramProcessor.h +// hifi +// +// Created by Stephen Birarda on 1/23/2014. +// Copyright (c) 2014 HighFidelity, Inc. All rights reserved. +// + +#ifndef __hifi__DatagramProcessor__ +#define __hifi__DatagramProcessor__ + +#include <QtCore/QObject> + +class DatagramProcessor : public QObject { + Q_OBJECT +public: + DatagramProcessor(QObject* parent = 0); + + int getPacketCount() const { return _packetCount; } + int getByteCount() const { return _byteCount; } + + void resetCounters() { _packetCount = 0; _byteCount = 0; } +public slots: + void processDatagrams(); + +private: + int _packetCount; + int _byteCount; +}; + +#endif /* defined(__hifi__DatagramProcessor__) */ diff --git a/libraries/shared/src/NodeList.cpp b/libraries/shared/src/NodeList.cpp index f37f2b48dc..034c272536 100644 --- a/libraries/shared/src/NodeList.cpp +++ b/libraries/shared/src/NodeList.cpp @@ -59,7 +59,7 @@ NodeList::NodeList(char newOwnerType, unsigned short int newSocketListenPort) : _nodeHashMutex(), _domainHostname(DEFAULT_DOMAIN_HOSTNAME), _domainSockAddr(HifiSockAddr(QHostAddress::Null, DEFAULT_DOMAIN_SERVER_PORT)), - _nodeSocket(), + _nodeSocket(this), _ownerType(newOwnerType), _nodeTypesOfInterest(NULL), _ownerUUID(QUuid::createUuid()),