mirror of
https://github.com/overte-org/overte.git
synced 2025-04-15 20:58:30 +02:00
move datagram processing back to separate thread
This commit is contained in:
parent
0e6faec5bb
commit
7efc9e2e78
6 changed files with 176 additions and 105 deletions
|
@ -108,6 +108,8 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
|
|||
QApplication(argc, argv),
|
||||
_window(new QMainWindow(desktop())),
|
||||
_glWidget(new GLCanvas()),
|
||||
_nodeThread(new QThread(this)),
|
||||
_datagramProcessor(new DatagramProcessor()),
|
||||
_frameCount(0),
|
||||
_fps(120.0f),
|
||||
_justStarted(true),
|
||||
|
@ -145,10 +147,8 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
|
|||
_voxelHideShowThread(&_voxels),
|
||||
_voxelEditSender(this),
|
||||
_particleEditSender(this),
|
||||
_packetCount(0),
|
||||
_packetsPerSecond(0),
|
||||
_bytesPerSecond(0),
|
||||
_bytesCount(0),
|
||||
_recentMaxPackets(0),
|
||||
_resetRecentMaxPacketsSoon(true),
|
||||
_swatch(NULL),
|
||||
|
@ -173,11 +173,21 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
|
|||
if (portStr) {
|
||||
listenPort = atoi(portStr);
|
||||
}
|
||||
|
||||
|
||||
// put the NodeList and datagram processing on the node thread
|
||||
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_AGENT, listenPort);
|
||||
|
||||
// connect our processDatagrams slot to the QUDPSocket readyRead() signal
|
||||
connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), SLOT(processDatagrams()));
|
||||
|
||||
nodeList->moveToThread(_nodeThread);
|
||||
_datagramProcessor->moveToThread(_nodeThread);
|
||||
|
||||
// connect the DataProcessor processDatagrams slot to the QUDPSocket readyRead() signal
|
||||
connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), _datagramProcessor, SLOT(processDatagrams()));
|
||||
|
||||
// make sure the node thread is given highest priority
|
||||
_nodeThread->setPriority(QThread::TimeCriticalPriority);
|
||||
|
||||
// start the nodeThread so its event loop is running
|
||||
_nodeThread->start();
|
||||
|
||||
// put the audio processing on a separate thread
|
||||
QThread* audioThread = new QThread(this);
|
||||
|
@ -192,7 +202,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
|
|||
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), SLOT(nodeKilled(SharedNodePointer)));
|
||||
connect(nodeList, SIGNAL(nodeAdded(SharedNodePointer)), &_voxels, SLOT(nodeAdded(SharedNodePointer)));
|
||||
connect(nodeList, SIGNAL(nodeKilled(SharedNodePointer)), &_voxels, SLOT(nodeKilled(SharedNodePointer)));
|
||||
|
||||
|
||||
// read the ApplicationInfo.ini file for Name/Version/Domain information
|
||||
QSettings applicationInfo("resources/info/ApplicationInfo.ini", QSettings::IniFormat);
|
||||
|
||||
|
@ -225,8 +235,10 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
|
|||
NODE_TYPE_PARTICLE_SERVER, NODE_TYPE_METAVOXEL_SERVER};
|
||||
nodeList->setNodeTypesOfInterest(nodeTypesOfInterest, sizeof(nodeTypesOfInterest));
|
||||
|
||||
// move the silentNodeTimer to the _nodeThread
|
||||
QTimer* silentNodeTimer = new QTimer(this);
|
||||
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
|
||||
silentNodeTimer->moveToThread(_nodeThread);
|
||||
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
|
||||
|
||||
QString cachePath = QStandardPaths::writableLocation(QStandardPaths::DataLocation);
|
||||
|
@ -273,6 +285,10 @@ Application::~Application() {
|
|||
|
||||
// make sure we don't call the idle timer any more
|
||||
delete idleTimer;
|
||||
|
||||
// ask the datagram processing thread to quit and wait until it is done
|
||||
_nodeThread->thread()->quit();
|
||||
_nodeThread->thread()->wait();
|
||||
|
||||
// ask the audio thread to quit and wait until it is done
|
||||
_audio.thread()->quit();
|
||||
|
@ -1312,11 +1328,12 @@ void Application::timer() {
|
|||
}
|
||||
|
||||
_fps = (float)_frameCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f);
|
||||
_packetsPerSecond = (float)_packetCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f);
|
||||
_bytesPerSecond = (float)_bytesCount / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f);
|
||||
|
||||
_packetsPerSecond = (float) _datagramProcessor->getPacketCount() / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f);
|
||||
_bytesPerSecond = (float) _datagramProcessor->getByteCount() / ((float)diffclock(&_timerStart, &_timerEnd) / 1000.f);
|
||||
_frameCount = 0;
|
||||
_packetCount = 0;
|
||||
_bytesCount = 0;
|
||||
|
||||
_datagramProcessor->resetCounters();
|
||||
|
||||
gettimeofday(&_timerStart, NULL);
|
||||
|
||||
|
@ -4052,93 +4069,6 @@ int Application::parseOctreeStats(unsigned char* messageData, ssize_t messageLen
|
|||
return statsMessageLength;
|
||||
}
|
||||
|
||||
// Receive packets from other nodes/servers and decide what to do with them!
|
||||
void Application::processDatagrams() {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"Application::networkReceive()");
|
||||
|
||||
HifiSockAddr senderSockAddr;
|
||||
ssize_t bytesReceived;
|
||||
|
||||
while (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() &&
|
||||
(bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) _incomingPacket,
|
||||
MAX_PACKET_SIZE,
|
||||
senderSockAddr.getAddressPointer(),
|
||||
senderSockAddr.getPortPointer()))) {
|
||||
|
||||
_packetCount++;
|
||||
_bytesCount += bytesReceived;
|
||||
|
||||
if (packetVersionMatch(_incomingPacket)) {
|
||||
// only process this packet if we have a match on the packet version
|
||||
switch (_incomingPacket[0]) {
|
||||
case PACKET_TYPE_TRANSMITTER_DATA_V2:
|
||||
// V2 = IOS transmitter app
|
||||
_myTransmitter.processIncomingData(_incomingPacket, bytesReceived);
|
||||
|
||||
break;
|
||||
case PACKET_TYPE_MIXED_AUDIO:
|
||||
QMetaObject::invokeMethod(&_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection,
|
||||
Q_ARG(QByteArray, QByteArray((char*) _incomingPacket, bytesReceived)));
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_PARTICLE_ADD_RESPONSE:
|
||||
// this will keep creatorTokenIDs to IDs mapped correctly
|
||||
Particle::handleAddParticleResponse(_incomingPacket, bytesReceived);
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_PARTICLE_DATA:
|
||||
case PACKET_TYPE_PARTICLE_ERASE:
|
||||
case PACKET_TYPE_VOXEL_DATA:
|
||||
case PACKET_TYPE_VOXEL_ERASE:
|
||||
case PACKET_TYPE_OCTREE_STATS:
|
||||
case PACKET_TYPE_ENVIRONMENT_DATA: {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"Application::networkReceive()... _voxelProcessor.queueReceivedPacket()");
|
||||
|
||||
bool wantExtraDebugging = getLogger()->extraDebugging();
|
||||
if (wantExtraDebugging && _incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) {
|
||||
int numBytesPacketHeader = numBytesForPacketHeader(_incomingPacket);
|
||||
unsigned char* dataAt = _incomingPacket + numBytesPacketHeader;
|
||||
dataAt += sizeof(VOXEL_PACKET_FLAGS);
|
||||
VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt);
|
||||
dataAt += sizeof(VOXEL_PACKET_SEQUENCE);
|
||||
VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt);
|
||||
dataAt += sizeof(VOXEL_PACKET_SENT_TIME);
|
||||
VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
|
||||
int flightTime = arrivedAt - sentAt;
|
||||
|
||||
printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
|
||||
}
|
||||
|
||||
// add this packet to our list of voxel packets and process them on the voxel processing
|
||||
_voxelProcessor.queueReceivedPacket(senderSockAddr, _incomingPacket, bytesReceived);
|
||||
break;
|
||||
}
|
||||
case PACKET_TYPE_METAVOXEL_DATA:
|
||||
_metavoxels.processData(QByteArray((const char*) _incomingPacket, bytesReceived),
|
||||
senderSockAddr);
|
||||
break;
|
||||
case PACKET_TYPE_BULK_AVATAR_DATA:
|
||||
NodeList::getInstance()->processBulkNodeData(senderSockAddr,
|
||||
_incomingPacket,
|
||||
bytesReceived);
|
||||
getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived);
|
||||
break;
|
||||
case PACKET_TYPE_DATA_SERVER_GET:
|
||||
case PACKET_TYPE_DATA_SERVER_PUT:
|
||||
case PACKET_TYPE_DATA_SERVER_SEND:
|
||||
case PACKET_TYPE_DATA_SERVER_CONFIRM:
|
||||
DataServerClient::processMessageFromDataServer(_incomingPacket, bytesReceived);
|
||||
break;
|
||||
default:
|
||||
NodeList::getInstance()->processNodeData(senderSockAddr, _incomingPacket, bytesReceived);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Application::packetSentNotification(ssize_t length) {
|
||||
_bandwidthMeter.outputStream(BandwidthMeter::VOXELS).updateValue(length);
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "BandwidthMeter.h"
|
||||
#include "Camera.h"
|
||||
#include "Cloud.h"
|
||||
#include "DatagramProcessor.h"
|
||||
#include "Environment.h"
|
||||
#include "GLCanvas.h"
|
||||
#include "MetavoxelSystem.h"
|
||||
|
@ -96,6 +97,7 @@ class Application : public QApplication, public PacketSenderNotify {
|
|||
|
||||
friend class VoxelPacketProcessor;
|
||||
friend class VoxelEditPacketSender;
|
||||
friend class DatagramProcessor;
|
||||
|
||||
public:
|
||||
static Application* getInstance() { return static_cast<Application*>(QCoreApplication::instance()); }
|
||||
|
@ -208,8 +210,6 @@ public slots:
|
|||
void domainChanged(const QString& domainHostname);
|
||||
void nodeKilled(SharedNodePointer node);
|
||||
|
||||
void processDatagrams();
|
||||
|
||||
void exportVoxels();
|
||||
void importVoxels();
|
||||
void cutVoxels();
|
||||
|
@ -331,6 +331,9 @@ private:
|
|||
QGLWidget* _glWidget;
|
||||
|
||||
BandwidthMeter _bandwidthMeter;
|
||||
|
||||
QThread* _nodeThread;
|
||||
DatagramProcessor* _datagramProcessor;
|
||||
|
||||
QNetworkAccessManager* _networkAccessManager;
|
||||
QSettings* _settings;
|
||||
|
@ -461,11 +464,8 @@ private:
|
|||
VoxelEditPacketSender _voxelEditSender;
|
||||
ParticleEditPacketSender _particleEditSender;
|
||||
|
||||
unsigned char _incomingPacket[MAX_PACKET_SIZE];
|
||||
int _packetCount;
|
||||
int _packetsPerSecond;
|
||||
int _bytesPerSecond;
|
||||
int _bytesCount;
|
||||
|
||||
int _recentMaxPackets; // recent max incoming voxel packets to process
|
||||
bool _resetRecentMaxPacketsSoon;
|
||||
|
|
|
@ -415,7 +415,7 @@ void Audio::addReceivedAudioToBuffer(const QByteArray& audioByteArray) {
|
|||
_totalPacketsReceived++;
|
||||
|
||||
double timeDiff = diffclock(&_lastReceiveTime, ¤tReceiveTime);
|
||||
|
||||
|
||||
// Discard first few received packets for computing jitter (often they pile up on start)
|
||||
if (_totalPacketsReceived > NUM_INITIAL_PACKETS_DISCARD) {
|
||||
_stdev.addValue(timeDiff);
|
||||
|
|
110
interface/src/DatagramProcessor.cpp
Normal file
110
interface/src/DatagramProcessor.cpp
Normal file
|
@ -0,0 +1,110 @@
|
|||
//
|
||||
// DatagramProcessor.cpp
|
||||
// hifi
|
||||
//
|
||||
// Created by Stephen Birarda on 1/23/2014.
|
||||
// Copyright (c) 2014 HighFidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#include <PerfStat.h>
|
||||
|
||||
#include "Application.h"
|
||||
#include "Menu.h"
|
||||
|
||||
#include "DatagramProcessor.h"
|
||||
|
||||
DatagramProcessor::DatagramProcessor(QObject* parent) :
|
||||
QObject(parent)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void DatagramProcessor::processDatagrams() {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"DatagramProcessor::processDatagrams()");
|
||||
|
||||
HifiSockAddr senderSockAddr;
|
||||
ssize_t bytesReceived;
|
||||
|
||||
static unsigned char incomingPacket[MAX_PACKET_SIZE];
|
||||
|
||||
Application* application = Application::getInstance();
|
||||
|
||||
while (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() &&
|
||||
(bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) incomingPacket,
|
||||
MAX_PACKET_SIZE,
|
||||
senderSockAddr.getAddressPointer(),
|
||||
senderSockAddr.getPortPointer()))) {
|
||||
|
||||
_packetCount++;
|
||||
_byteCount += bytesReceived;
|
||||
|
||||
if (packetVersionMatch(incomingPacket)) {
|
||||
// only process this packet if we have a match on the packet version
|
||||
switch (incomingPacket[0]) {
|
||||
case PACKET_TYPE_TRANSMITTER_DATA_V2:
|
||||
// V2 = IOS transmitter app
|
||||
application->_myTransmitter.processIncomingData(incomingPacket, bytesReceived);
|
||||
|
||||
break;
|
||||
case PACKET_TYPE_MIXED_AUDIO:
|
||||
QMetaObject::invokeMethod(&application->_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection,
|
||||
Q_ARG(QByteArray, QByteArray((char*) incomingPacket, bytesReceived)));
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_PARTICLE_ADD_RESPONSE:
|
||||
// this will keep creatorTokenIDs to IDs mapped correctly
|
||||
Particle::handleAddParticleResponse(incomingPacket, bytesReceived);
|
||||
break;
|
||||
|
||||
case PACKET_TYPE_PARTICLE_DATA:
|
||||
case PACKET_TYPE_PARTICLE_ERASE:
|
||||
case PACKET_TYPE_VOXEL_DATA:
|
||||
case PACKET_TYPE_VOXEL_ERASE:
|
||||
case PACKET_TYPE_OCTREE_STATS:
|
||||
case PACKET_TYPE_ENVIRONMENT_DATA: {
|
||||
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
|
||||
"Application::networkReceive()... _voxelProcessor.queueReceivedPacket()");
|
||||
|
||||
bool wantExtraDebugging = application->getLogger()->extraDebugging();
|
||||
if (wantExtraDebugging && incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) {
|
||||
int numBytesPacketHeader = numBytesForPacketHeader(incomingPacket);
|
||||
unsigned char* dataAt = incomingPacket + numBytesPacketHeader;
|
||||
dataAt += sizeof(VOXEL_PACKET_FLAGS);
|
||||
VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt);
|
||||
dataAt += sizeof(VOXEL_PACKET_SEQUENCE);
|
||||
VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt);
|
||||
dataAt += sizeof(VOXEL_PACKET_SENT_TIME);
|
||||
VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
|
||||
int flightTime = arrivedAt - sentAt;
|
||||
|
||||
printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
|
||||
}
|
||||
|
||||
// add this packet to our list of voxel packets and process them on the voxel processing
|
||||
application->_voxelProcessor.queueReceivedPacket(senderSockAddr, incomingPacket, bytesReceived);
|
||||
break;
|
||||
}
|
||||
case PACKET_TYPE_METAVOXEL_DATA:
|
||||
application->_metavoxels.processData(QByteArray((const char*) incomingPacket, bytesReceived),
|
||||
senderSockAddr);
|
||||
break;
|
||||
case PACKET_TYPE_BULK_AVATAR_DATA:
|
||||
NodeList::getInstance()->processBulkNodeData(senderSockAddr,
|
||||
incomingPacket,
|
||||
bytesReceived);
|
||||
application->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived);
|
||||
break;
|
||||
case PACKET_TYPE_DATA_SERVER_GET:
|
||||
case PACKET_TYPE_DATA_SERVER_PUT:
|
||||
case PACKET_TYPE_DATA_SERVER_SEND:
|
||||
case PACKET_TYPE_DATA_SERVER_CONFIRM:
|
||||
DataServerClient::processMessageFromDataServer(incomingPacket, bytesReceived);
|
||||
break;
|
||||
default:
|
||||
NodeList::getInstance()->processNodeData(senderSockAddr, incomingPacket, bytesReceived);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
31
interface/src/DatagramProcessor.h
Normal file
31
interface/src/DatagramProcessor.h
Normal file
|
@ -0,0 +1,31 @@
|
|||
//
|
||||
// DatagramProcessor.h
|
||||
// hifi
|
||||
//
|
||||
// Created by Stephen Birarda on 1/23/2014.
|
||||
// Copyright (c) 2014 HighFidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#ifndef __hifi__DatagramProcessor__
|
||||
#define __hifi__DatagramProcessor__
|
||||
|
||||
#include <QtCore/QObject>
|
||||
|
||||
class DatagramProcessor : public QObject {
|
||||
Q_OBJECT
|
||||
public:
|
||||
DatagramProcessor(QObject* parent = 0);
|
||||
|
||||
int getPacketCount() const { return _packetCount; }
|
||||
int getByteCount() const { return _byteCount; }
|
||||
|
||||
void resetCounters() { _packetCount = 0; _byteCount = 0; }
|
||||
public slots:
|
||||
void processDatagrams();
|
||||
|
||||
private:
|
||||
int _packetCount;
|
||||
int _byteCount;
|
||||
};
|
||||
|
||||
#endif /* defined(__hifi__DatagramProcessor__) */
|
|
@ -59,7 +59,7 @@ NodeList::NodeList(char newOwnerType, unsigned short int newSocketListenPort) :
|
|||
_nodeHashMutex(),
|
||||
_domainHostname(DEFAULT_DOMAIN_HOSTNAME),
|
||||
_domainSockAddr(HifiSockAddr(QHostAddress::Null, DEFAULT_DOMAIN_SERVER_PORT)),
|
||||
_nodeSocket(),
|
||||
_nodeSocket(this),
|
||||
_ownerType(newOwnerType),
|
||||
_nodeTypesOfInterest(NULL),
|
||||
_ownerUUID(QUuid::createUuid()),
|
||||
|
|
Loading…
Reference in a new issue