move network packet processing to main thread

This commit is contained in:
Stephen Birarda 2014-01-13 17:58:48 -08:00
parent 1faa2120aa
commit 3ab7a6d9d5
3 changed files with 85 additions and 113 deletions

View file

@ -128,7 +128,7 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
_perfStatsOn(false), _perfStatsOn(false),
_chatEntryOn(false), _chatEntryOn(false),
_audio(&_audioScope, STARTUP_JITTER_SAMPLES), _audio(&_audioScope, STARTUP_JITTER_SAMPLES),
_stopNetworkReceiveThread(false), _enableProcessVoxelsThread(true),
_voxelProcessor(), _voxelProcessor(),
_voxelHideShowThread(&_voxels), _voxelHideShowThread(&_voxels),
_voxelEditSender(this), _voxelEditSender(this),
@ -165,6 +165,9 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
} }
NodeList* nodeList = NodeList::createInstance(NODE_TYPE_AGENT, listenPort); NodeList* nodeList = NodeList::createInstance(NODE_TYPE_AGENT, listenPort);
// connect our processDatagrams slot to the QUDPSocket readyRead() signal
connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), SLOT(processDatagrams()));
// put the audio processing on a separate thread // put the audio processing on a separate thread
QThread* audioThread = new QThread(this); QThread* audioThread = new QThread(this);
@ -180,9 +183,6 @@ Application::Application(int& argc, char** argv, timeval &startup_time) :
nodeList->addDomainListener(this); nodeList->addDomainListener(this);
// network receive thread and voxel parsing thread are both controlled by the --nonblocking command line
_enableProcessVoxelsThread = _enableNetworkThread = !cmdOptionExists(argc, constArgv, "--nonblocking");
// read the ApplicationInfo.ini file for Name/Version/Domain information // read the ApplicationInfo.ini file for Name/Version/Domain information
QSettings applicationInfo("resources/info/ApplicationInfo.ini", QSettings::IniFormat); QSettings applicationInfo("resources/info/ApplicationInfo.ini", QSettings::IniFormat);
@ -330,12 +330,6 @@ void Application::initializeGL() {
init(); init();
qDebug( "Init() complete.\n" ); qDebug( "Init() complete.\n" );
// create thread for receipt of data via UDP
if (_enableNetworkThread) {
pthread_create(&_networkReceiveThread, NULL, networkReceive, NULL);
qDebug("Network receive thread created.\n");
}
// create thread for parsing of voxel data independent of the main network and rendering threads // create thread for parsing of voxel data independent of the main network and rendering threads
_voxelProcessor.initialize(_enableProcessVoxelsThread); _voxelProcessor.initialize(_enableProcessVoxelsThread);
_voxelEditSender.initialize(_enableProcessVoxelsThread); _voxelEditSender.initialize(_enableProcessVoxelsThread);
@ -1424,11 +1418,6 @@ void Application::terminate() {
// let the avatar mixer know we're out // let the avatar mixer know we're out
NodeList::getInstance()->sendKillNode(&NODE_TYPE_AVATAR_MIXER, 1); NodeList::getInstance()->sendKillNode(&NODE_TYPE_AVATAR_MIXER, 1);
if (_enableNetworkThread) {
_stopNetworkReceiveThread = true;
pthread_join(_networkReceiveThread, NULL);
}
printf(""); printf("");
_voxelProcessor.terminate(); _voxelProcessor.terminate();
_voxelHideShowThread.terminate(); _voxelHideShowThread.terminate();
@ -2322,11 +2311,6 @@ void Application::updateThreads(float deltaTime) {
bool showWarnings = Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings); bool showWarnings = Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings);
PerformanceWarning warn(showWarnings, "Application::updateThreads()"); PerformanceWarning warn(showWarnings, "Application::updateThreads()");
// read incoming packets from network
if (!_enableNetworkThread) {
networkReceive(0);
}
// parse voxel packets // parse voxel packets
if (!_enableProcessVoxelsThread) { if (!_enableProcessVoxelsThread) {
_voxelProcessor.threadRoutine(); _voxelProcessor.threadRoutine();
@ -4272,105 +4256,95 @@ int Application::parseOctreeStats(unsigned char* messageData, ssize_t messageLen
} }
// Receive packets from other nodes/servers and decide what to do with them! // Receive packets from other nodes/servers and decide what to do with them!
void* Application::networkReceive(void* args) { void Application::processDatagrams() {
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings), PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"Application::networkReceive()"); "Application::networkReceive()");
HifiSockAddr senderSockAddr; HifiSockAddr senderSockAddr;
ssize_t bytesReceived; ssize_t bytesReceived;
Application* app = Application::getInstance(); if (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() &&
while (!app->_stopNetworkReceiveThread) { (bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) _incomingPacket,
if (NodeList::getInstance()->getNodeSocket().hasPendingDatagrams() && MAX_PACKET_SIZE,
(bytesReceived = NodeList::getInstance()->getNodeSocket().readDatagram((char*) app->_incomingPacket, senderSockAddr.getAddressPointer(),
MAX_PACKET_SIZE, senderSockAddr.getPortPointer()))) {
senderSockAddr.getAddressPointer(),
senderSockAddr.getPortPointer()))) { _packetCount++;
_bytesCount += bytesReceived;
app->_packetCount++;
app->_bytesCount += bytesReceived; if (packetVersionMatch(_incomingPacket)) {
// only process this packet if we have a match on the packet version
if (packetVersionMatch(app->_incomingPacket)) { switch (_incomingPacket[0]) {
// only process this packet if we have a match on the packet version case PACKET_TYPE_TRANSMITTER_DATA_V2:
switch (app->_incomingPacket[0]) { // V2 = IOS transmitter app
case PACKET_TYPE_TRANSMITTER_DATA_V2: _myTransmitter.processIncomingData(_incomingPacket, bytesReceived);
// V2 = IOS transmitter app
app->_myTransmitter.processIncomingData(app->_incomingPacket, bytesReceived); break;
case PACKET_TYPE_MIXED_AUDIO:
break; QMetaObject::invokeMethod(&_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection,
case PACKET_TYPE_MIXED_AUDIO: Q_ARG(QByteArray, QByteArray((char*) _incomingPacket, bytesReceived)));
QMetaObject::invokeMethod(&app->_audio, "addReceivedAudioToBuffer", Qt::QueuedConnection, break;
Q_ARG(QByteArray, QByteArray((char*) app->_incomingPacket, bytesReceived)));
break; case PACKET_TYPE_PARTICLE_ADD_RESPONSE:
// look up our ParticleEditHanders....
case PACKET_TYPE_PARTICLE_ADD_RESPONSE: ParticleEditHandle::handleAddResponse(_incomingPacket, bytesReceived);
// look up our ParticleEditHanders.... break;
ParticleEditHandle::handleAddResponse(app->_incomingPacket, bytesReceived);
break; case PACKET_TYPE_PARTICLE_DATA:
case PACKET_TYPE_VOXEL_DATA:
case PACKET_TYPE_PARTICLE_DATA: case PACKET_TYPE_VOXEL_ERASE:
case PACKET_TYPE_VOXEL_DATA: case PACKET_TYPE_OCTREE_STATS:
case PACKET_TYPE_VOXEL_ERASE: case PACKET_TYPE_ENVIRONMENT_DATA: {
case PACKET_TYPE_OCTREE_STATS: PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
case PACKET_TYPE_ENVIRONMENT_DATA: { "Application::networkReceive()... _voxelProcessor.queueReceivedPacket()");
PerformanceWarning warn(Menu::getInstance()->isOptionChecked(MenuOption::PipelineWarnings),
"Application::networkReceive()... _voxelProcessor.queueReceivedPacket()"); bool wantExtraDebugging = getLogger()->extraDebugging();
if (wantExtraDebugging && _incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) {
bool wantExtraDebugging = app->getLogger()->extraDebugging(); int numBytesPacketHeader = numBytesForPacketHeader(_incomingPacket);
if (wantExtraDebugging && app->_incomingPacket[0] == PACKET_TYPE_VOXEL_DATA) { unsigned char* dataAt = _incomingPacket + numBytesPacketHeader;
int numBytesPacketHeader = numBytesForPacketHeader(app->_incomingPacket); dataAt += sizeof(VOXEL_PACKET_FLAGS);
unsigned char* dataAt = app->_incomingPacket + numBytesPacketHeader; VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt);
dataAt += sizeof(VOXEL_PACKET_FLAGS); dataAt += sizeof(VOXEL_PACKET_SEQUENCE);
VOXEL_PACKET_SEQUENCE sequence = (*(VOXEL_PACKET_SEQUENCE*)dataAt); VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt);
dataAt += sizeof(VOXEL_PACKET_SEQUENCE); dataAt += sizeof(VOXEL_PACKET_SENT_TIME);
VOXEL_PACKET_SENT_TIME sentAt = (*(VOXEL_PACKET_SENT_TIME*)dataAt); VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
dataAt += sizeof(VOXEL_PACKET_SENT_TIME); int flightTime = arrivedAt - sentAt;
VOXEL_PACKET_SENT_TIME arrivedAt = usecTimestampNow();
int flightTime = arrivedAt - sentAt; printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
printf("got PACKET_TYPE_VOXEL_DATA, sequence:%d flightTime:%d\n", sequence, flightTime);
}
// add this packet to our list of voxel packets and process them on the voxel processing
app->_voxelProcessor.queueReceivedPacket(senderSockAddr, app->_incomingPacket, bytesReceived);
break;
} }
case PACKET_TYPE_METAVOXEL_DATA:
app->_metavoxels.processData(QByteArray((const char*)app->_incomingPacket, bytesReceived), // add this packet to our list of voxel packets and process them on the voxel processing
senderSockAddr); _voxelProcessor.queueReceivedPacket(senderSockAddr, _incomingPacket, bytesReceived);
break; break;
case PACKET_TYPE_BULK_AVATAR_DATA:
NodeList::getInstance()->processBulkNodeData(senderSockAddr,
app->_incomingPacket,
bytesReceived);
getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived);
break;
case PACKET_TYPE_AVATAR_URLS:
processAvatarURLsMessage(app->_incomingPacket, bytesReceived);
break;
case PACKET_TYPE_AVATAR_FACE_VIDEO:
processAvatarFaceVideoMessage(app->_incomingPacket, bytesReceived);
break;
case PACKET_TYPE_DATA_SERVER_GET:
case PACKET_TYPE_DATA_SERVER_PUT:
case PACKET_TYPE_DATA_SERVER_SEND:
case PACKET_TYPE_DATA_SERVER_CONFIRM:
DataServerClient::processMessageFromDataServer(app->_incomingPacket, bytesReceived);
break;
default:
NodeList::getInstance()->processNodeData(senderSockAddr, app->_incomingPacket, bytesReceived);
break;
} }
case PACKET_TYPE_METAVOXEL_DATA:
_metavoxels.processData(QByteArray((const char*) _incomingPacket, bytesReceived),
senderSockAddr);
break;
case PACKET_TYPE_BULK_AVATAR_DATA:
NodeList::getInstance()->processBulkNodeData(senderSockAddr,
_incomingPacket,
bytesReceived);
getInstance()->_bandwidthMeter.inputStream(BandwidthMeter::AVATARS).updateValue(bytesReceived);
break;
case PACKET_TYPE_AVATAR_URLS:
processAvatarURLsMessage(_incomingPacket, bytesReceived);
break;
case PACKET_TYPE_AVATAR_FACE_VIDEO:
processAvatarFaceVideoMessage(_incomingPacket, bytesReceived);
break;
case PACKET_TYPE_DATA_SERVER_GET:
case PACKET_TYPE_DATA_SERVER_PUT:
case PACKET_TYPE_DATA_SERVER_SEND:
case PACKET_TYPE_DATA_SERVER_CONFIRM:
DataServerClient::processMessageFromDataServer(_incomingPacket, bytesReceived);
break;
default:
NodeList::getInstance()->processNodeData(senderSockAddr, _incomingPacket, bytesReceived);
break;
} }
} else if (!app->_enableNetworkThread) {
break;
} }
} }
if (app->_enableNetworkThread) {
pthread_exit(0);
}
return NULL;
} }
void Application::packetSentNotification(ssize_t length) { void Application::packetSentNotification(ssize_t length) {

View file

@ -215,6 +215,8 @@ public:
public slots: public slots:
void nodeKilled(SharedNodePointer node); void nodeKilled(SharedNodePointer node);
void processDatagrams();
void sendAvatarFaceVideoMessage(int frameCount, const QByteArray& data); void sendAvatarFaceVideoMessage(int frameCount, const QByteArray& data);
void exportVoxels(); void exportVoxels();
void importVoxels(); void importVoxels();
@ -467,10 +469,6 @@ private:
Audio _audio; Audio _audio;
#endif #endif
bool _enableNetworkThread;
pthread_t _networkReceiveThread;
bool _stopNetworkReceiveThread;
bool _enableProcessVoxelsThread; bool _enableProcessVoxelsThread;
VoxelPacketProcessor _voxelProcessor; VoxelPacketProcessor _voxelProcessor;
VoxelHideShowThread _voxelHideShowThread; VoxelHideShowThread _voxelHideShowThread;

View file

@ -786,7 +786,7 @@ SharedNodePointer NodeList::soloNodeOfType(char nodeType) {
} }
} }
return SharedNodePointer(NULL); return SharedNodePointer();
} }
void NodeList::removeSilentNodes() { void NodeList::removeSilentNodes() {