From 3ab7a6d9d5ffbda3413b8d27e45b95628f5b2af8 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 13 Jan 2014 17:58:48 -0800 Subject: [PATCH] move network packet processing to main thread --- interface/src/Application.cpp | 190 +++++++++++++----------------- interface/src/Application.h | 6 +- libraries/shared/src/NodeList.cpp | 2 +- 3 files changed, 85 insertions(+), 113 deletions(-) diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index e28114034b..7b82bc8d25 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -128,7 +128,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : _perfStatsOn(false), _chatEntryOn(false), _audio(&_audioScope, STARTUP_JITTER_SAMPLES), - _stopNetworkReceiveThread(false), + _enableProcessVoxelsThread(true), _voxelProcessor(), _voxelHideShowThread(&_voxels), _voxelEditSender(this), @@ -165,6 +165,9 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : } NodeList* nodeList = NodeList::createInstance(NODE_TYPE_AGENT, listenPort); + + // connect our processDatagrams slot to the QUDPSocket readyRead() signal + connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), SLOT(processDatagrams())); // put the audio processing on a separate thread QThread* audioThread = new QThread(this); @@ -180,9 +183,6 @@ Application::Application(int& argc, char** argv, timeval &startup_time) : nodeList->addDomainListener(this); - // network receive thread and voxel parsing thread are both controlled by the --nonblocking command line - _enableProcessVoxelsThread = _enableNetworkThread = !cmdOptionExists(argc, constArgv, "--nonblocking"); - // read the ApplicationInfo.ini file for Name/Version/Domain information QSettings applicationInfo("resources/info/ApplicationInfo.ini", QSettings::IniFormat); @@ -330,12 +330,6 @@ void Application::initializeGL() { init(); qDebug( "Init() complete.\n" ); - // create thread for receipt of data via UDP - if (_enableNetworkThread) { - pthread_create(&_networkReceiveThread, NULL, networkReceive, NULL); - qDebug("Network receive thread created.\n"); - } - // create thread for parsing of voxel data independent of the main network and rendering threads _voxelProcessor.initialize(_enableProcessVoxelsThread); _voxelEditSender.initialize(_enableProcessVoxelsThread); @@ -1424,11 +1418,6 @@ void Application::terminate() { // let the avatar mixer know we're out NodeList::getInstance()->sendKillNode(&NODE_TYPE_AVATAR_MIXER, 1); - if (_enableNetworkThread) { - _stopNetworkReceiveThread = true; - pthread_join(_networkReceiveThread, NULL); - } - printf(""); _voxelProcessor.terminate(); _voxelHideShowThread.terminate(); @@ -2322,11 +2311,6 @@ void Application::updateThreads(float deltaTime) { bool showWarnings = Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings); PerformanceWarning warn(showWarnings, "Application::updateThreads()"); - // read incoming packets from network - if (!_enableNetworkThread) { - networkReceive(0); - } - // parse voxel packets if (!_enableProcessVoxelsThread) { _voxelProcessor.threadRoutine(); @@ -4272,105 +4256,95 @@ int Application::parseOctreeStats(unsigned char* messageData, ssize_t messageLen } // Receive packets from other nodes/servers and decide what to do with them! -void* Application::networkReceive(void* args) { +void Application::processDatagrams() { PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), "Application::networkReceive()"); HifiSockAddr senderSockAddr; ssize_t bytesReceived; - Application* app = Application::getInstance(); - while (!app->_stopNetworkReceiveThread) { - if (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() && - (bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) app->_incomingPacket, - MAX_PACKET_SIZE, - senderSockAddr.getAddressPointer(), - senderSockAddr.getPortPointer()))) { - - app->_packetCount++; - app->_bytesCount += bytesReceived; - - if (packetVersionMatch(app->_incomingPacket)) { - // only process this packet if we have a match on the packet version - switch (app->_incomingPacket[0]) { - case PACKET_TYPE_TRANSMITTER_DATA_V2: - // V2 = IOS transmitter app - app->_myTransmitter.processIncomingData(app->_incomingPacket, bytesReceived); - - break; - case PACKET_TYPE_MIXED_AUDIO: - QMetaObject::invokeMethod(&app->_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection, - Q_ARG(QByteArray, QByteArray((char*) app->_incomingPacket, bytesReceived))); - break; - - case PACKET_TYPE_PARTICLE_ADD_RESPONSE: - // look up our ParticleEditHanders.... - ParticleEditHandle::handleAddResponse(app->_incomingPacket, bytesReceived); - break; - - case PACKET_TYPE_PARTICLE_DATA: - case PACKET_TYPE_VOXEL_DATA: - case PACKET_TYPE_VOXEL_ERASE: - case PACKET_TYPE_OCTREE_STATS: - case PACKET_TYPE_ENVIRONMENT_DATA: { - PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), - "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); - - bool wantExtraDebugging = app->getLogger()->extraDebugging(); - if (wantExtraDebugging && app->_incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) { - int numBytesPacketHeader = numBytesForPacketHeader(app->_incomingPacket); - unsigned char* dataAt = app->_incomingPacket + numBytesPacketHeader; - dataAt += sizeof(VOXEL_PACKET_FLAGS); - VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt); - dataAt += sizeof(VOXEL_PACKET_SEQUENCE); - VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt); - dataAt += sizeof(VOXEL_PACKET_SENT_TIME); - VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow(); - int flightTime = arrivedAt - sentAt; - - printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime); - } - - // add this packet to our list of voxel packets and process them on the voxel processing - app->_voxelProcessor.queueReceivedPacket(senderSockAddr, app->_incomingPacket, bytesReceived); - break; + if (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() && + (bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) _incomingPacket, + MAX_PACKET_SIZE, + senderSockAddr.getAddressPointer(), + senderSockAddr.getPortPointer()))) { + + _packetCount++; + _bytesCount += bytesReceived; + + if (packetVersionMatch(_incomingPacket)) { + // only process this packet if we have a match on the packet version + switch (_incomingPacket[0]) { + case PACKET_TYPE_TRANSMITTER_DATA_V2: + // V2 = IOS transmitter app + _myTransmitter.processIncomingData(_incomingPacket, bytesReceived); + + break; + case PACKET_TYPE_MIXED_AUDIO: + QMetaObject::invokeMethod(&_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection, + Q_ARG(QByteArray, QByteArray((char*) _incomingPacket, bytesReceived))); + break; + + case PACKET_TYPE_PARTICLE_ADD_RESPONSE: + // look up our ParticleEditHanders.... + ParticleEditHandle::handleAddResponse(_incomingPacket, bytesReceived); + break; + + case PACKET_TYPE_PARTICLE_DATA: + case PACKET_TYPE_VOXEL_DATA: + case PACKET_TYPE_VOXEL_ERASE: + case PACKET_TYPE_OCTREE_STATS: + case PACKET_TYPE_ENVIRONMENT_DATA: { + PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), + "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); + + bool wantExtraDebugging = getLogger()->extraDebugging(); + if (wantExtraDebugging && _incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) { + int numBytesPacketHeader = numBytesForPacketHeader(_incomingPacket); + unsigned char* dataAt = _incomingPacket + numBytesPacketHeader; + dataAt += sizeof(VOXEL_PACKET_FLAGS); + VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt); + dataAt += sizeof(VOXEL_PACKET_SEQUENCE); + VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt); + dataAt += sizeof(VOXEL_PACKET_SENT_TIME); + VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow(); + int flightTime = arrivedAt - sentAt; + + printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime); } - case PACKET_TYPE_METAVOXEL_DATA: - app->_metavoxels.processData(QByteArray((const char*)app->_incomingPacket, bytesReceived), - senderSockAddr); - break; - case PACKET_TYPE_BULK_AVATAR_DATA: - NodeList::getInstance()->processBulkNodeData(senderSockAddr, - app->_incomingPacket, - bytesReceived); - getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived); - break; - case PACKET_TYPE_AVATAR_URLS: - processAvatarURLsMessage(app->_incomingPacket, bytesReceived); - break; - case PACKET_TYPE_AVATAR_FACE_VIDEO: - processAvatarFaceVideoMessage(app->_incomingPacket, bytesReceived); - break; - case PACKET_TYPE_DATA_SERVER_GET: - case PACKET_TYPE_DATA_SERVER_PUT: - case PACKET_TYPE_DATA_SERVER_SEND: - case PACKET_TYPE_DATA_SERVER_CONFIRM: - DataServerClient::processMessageFromDataServer(app->_incomingPacket, bytesReceived); - break; - default: - NodeList::getInstance()->processNodeData(senderSockAddr, app->_incomingPacket, bytesReceived); - break; + + // add this packet to our list of voxel packets and process them on the voxel processing + _voxelProcessor.queueReceivedPacket(senderSockAddr, _incomingPacket, bytesReceived); + break; } + case PACKET_TYPE_METAVOXEL_DATA: + _metavoxels.processData(QByteArray((const char*) _incomingPacket, bytesReceived), + senderSockAddr); + break; + case PACKET_TYPE_BULK_AVATAR_DATA: + NodeList::getInstance()->processBulkNodeData(senderSockAddr, + _incomingPacket, + bytesReceived); + getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived); + break; + case PACKET_TYPE_AVATAR_URLS: + processAvatarURLsMessage(_incomingPacket, bytesReceived); + break; + case PACKET_TYPE_AVATAR_FACE_VIDEO: + processAvatarFaceVideoMessage(_incomingPacket, bytesReceived); + break; + case PACKET_TYPE_DATA_SERVER_GET: + case PACKET_TYPE_DATA_SERVER_PUT: + case PACKET_TYPE_DATA_SERVER_SEND: + case PACKET_TYPE_DATA_SERVER_CONFIRM: + DataServerClient::processMessageFromDataServer(_incomingPacket, bytesReceived); + break; + default: + NodeList::getInstance()->processNodeData(senderSockAddr, _incomingPacket, bytesReceived); + break; } - } else if (!app->_enableNetworkThread) { - break; } } - - if (app->_enableNetworkThread) { - pthread_exit(0); - } - return NULL; } void Application::packetSentNotification(ssize_t length) { diff --git a/interface/src/Application.h b/interface/src/Application.h index a0bf608e98..8aa4f40f7f 100644 --- a/interface/src/Application.h +++ b/interface/src/Application.h @@ -215,6 +215,8 @@ public: public slots: void nodeKilled(SharedNodePointer node); + void processDatagrams(); + void sendAvatarFaceVideoMessage(int frameCount, const QByteArray& data); void exportVoxels(); void importVoxels(); @@ -467,10 +469,6 @@ private: Audio _audio; #endif - bool _enableNetworkThread; - pthread_t _networkReceiveThread; - bool _stopNetworkReceiveThread; - bool _enableProcessVoxelsThread; VoxelPacketProcessor _voxelProcessor; VoxelHideShowThread _voxelHideShowThread; diff --git a/libraries/shared/src/NodeList.cpp b/libraries/shared/src/NodeList.cpp index 9d47806f18..ef295da7a8 100644 --- a/libraries/shared/src/NodeList.cpp +++ b/libraries/shared/src/NodeList.cpp @@ -786,7 +786,7 @@ SharedNodePointer NodeList::soloNodeOfType(char nodeType) { } } - return SharedNodePointer(NULL); + return SharedNodePointer(); } void NodeList::removeSilentNodes() {