From 64c11ebf8f74103146f0f0d430db6690d3c0f8cd Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Aug 2014 10:36:54 -0700 Subject: [PATCH 1/3] don't change the socket buffer sizes if it is already larger --- libraries/networking/src/LimitedNodeList.cpp | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 44e45c359a..0548e95c80 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -82,7 +82,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort(); } - const int LARGER_BUFFER_SIZE = 1048576; + const int LARGER_BUFFER_SIZE = 2097152; changeSocketBufferSizes(LARGER_BUFFER_SIZE); _packetStatTimer.start(); @@ -147,12 +147,21 @@ void LimitedNodeList::changeSocketBufferSizes(int numBytes) { setsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast(&numBytes), sizeof(numBytes)); - int newBufferSize = 0; - getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast(&newBufferSize), &sizeOfInt); - QString bufferTypeString = (i == 0) ? "send" : "receive"; - qDebug() << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to" << newBufferSize << "bytes"; + if (oldBufferSize < numBytes) { + int newBufferSize = 0; + getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast(&newBufferSize), &sizeOfInt); + + qDebug() << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to" + << newBufferSize << "bytes"; + } else { + // don't make the buffer smaller + qDebug() << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize + << "since it is larger than desired size of" << numBytes; + } + + } From 1e65a093f23606f9afa0e76f43753e1b39dac1fd Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Aug 2014 12:37:57 -0700 Subject: [PATCH 2/3] decouple audio-mixer packet receipt from main thread --- assignment-client/src/audio/AudioMixer.cpp | 80 ++++++++++++------- assignment-client/src/audio/AudioMixer.h | 3 +- .../src/audio/AudioMixerDatagramProcessor.cpp | 51 ++++++++++++ .../src/audio/AudioMixerDatagramProcessor.h | 32 ++++++++ libraries/networking/src/LimitedNodeList.cpp | 4 - .../networking/src/ThreadedAssignment.cpp | 23 +++++- libraries/networking/src/ThreadedAssignment.h | 10 ++- 7 files changed, 164 insertions(+), 39 deletions(-) create mode 100644 assignment-client/src/audio/AudioMixerDatagramProcessor.cpp create mode 100644 assignment-client/src/audio/AudioMixerDatagramProcessor.h diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 914d5ae63b..c315f695cc 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -52,6 +53,7 @@ #include "AudioRingBuffer.h" #include "AudioMixerClientData.h" +#include "AudioMixerDatagramProcessor.h" #include "AvatarAudioStream.h" #include "InjectedAudioStream.h" @@ -297,37 +299,31 @@ void AudioMixer::prepareMixForListeningNode(Node* node) { } } -void AudioMixer::readPendingDatagrams() { - QByteArray receivedPacket; - HifiSockAddr senderSockAddr; +void AudioMixer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) { NodeList* nodeList = NodeList::getInstance(); - while (readAvailableDatagram(receivedPacket, senderSockAddr)) { - if (nodeList->packetVersionAndHashMatch(receivedPacket)) { - // pull any new audio data from nodes off of the network stack - PacketType mixerPacketType = packetTypeForPacket(receivedPacket); - if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho - || mixerPacketType == PacketTypeMicrophoneAudioWithEcho - || mixerPacketType == PacketTypeInjectAudio - || mixerPacketType == PacketTypeSilentAudioFrame - || mixerPacketType == PacketTypeAudioStreamStats) { - - nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket); - } else if (mixerPacketType == PacketTypeMuteEnvironment) { - QByteArray packet = receivedPacket; - populatePacketHeader(packet, PacketTypeMuteEnvironment); - - foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { - if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) { - nodeList->writeDatagram(packet, packet.size(), node); - } - } - - } else { - // let processNodeData handle it. - nodeList->processNodeData(senderSockAddr, receivedPacket); + // pull any new audio data from nodes off of the network stack + PacketType mixerPacketType = packetTypeForPacket(receivedPacket); + if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho + || mixerPacketType == PacketTypeMicrophoneAudioWithEcho + || mixerPacketType == PacketTypeInjectAudio + || mixerPacketType == PacketTypeSilentAudioFrame + || mixerPacketType == PacketTypeAudioStreamStats) { + + nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket); + } else if (mixerPacketType == PacketTypeMuteEnvironment) { + QByteArray packet = receivedPacket; + populatePacketHeader(packet, PacketTypeMuteEnvironment); + + foreach (const SharedNodePointer& node, nodeList->getNodeHash()) { + if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) { + nodeList->writeDatagram(packet, packet.size(), node); } } + + } else { + // let processNodeData handle it. + nodeList->processNodeData(senderSockAddr, receivedPacket); } } @@ -392,7 +388,35 @@ void AudioMixer::run() { ThreadedAssignment::commonInit(AUDIO_MIXER_LOGGING_TARGET_NAME, NodeType::AudioMixer); NodeList* nodeList = NodeList::getInstance(); - + + // we do not want this event loop to be the handler for UDP datagrams, so disconnect + disconnect(&nodeList->getNodeSocket(), 0, this, 0); + + // setup a QThread with us as parent that will house the AudioMixerDatagramProcessor + _datagramProcessingThread = new QThread(this); + + // create an AudioMixerDatagramProcessor and move it to that thread + AudioMixerDatagramProcessor* datagramProcessor = new AudioMixerDatagramProcessor(nodeList->getNodeSocket(), thread()); + datagramProcessor->moveToThread(_datagramProcessingThread); + + // remove the NodeList as the parent of the node socket + nodeList->getNodeSocket().setParent(NULL); + nodeList->getNodeSocket().moveToThread(_datagramProcessingThread); + + // let the datagram processor handle readyRead from node socket + connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead, + datagramProcessor, &AudioMixerDatagramProcessor::readPendingDatagrams); + + // connect to the datagram processing thread signal that tells us we have to handle a packet + connect(datagramProcessor, &AudioMixerDatagramProcessor::packetRequiresProcessing, this, &AudioMixer::readPendingDatagram); + + // delete the datagram processor and the associated thread when the QThread quits + connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater); + connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater); + + // start the datagram processing thread + _datagramProcessingThread->start(); + nodeList->addNodeTypeToInterestSet(NodeType::Agent); nodeList->linkedDataCreateCallback = attachNewNodeDataToNode; diff --git a/assignment-client/src/audio/AudioMixer.h b/assignment-client/src/audio/AudioMixer.h index d11539e22e..2a4b93149c 100644 --- a/assignment-client/src/audio/AudioMixer.h +++ b/assignment-client/src/audio/AudioMixer.h @@ -33,7 +33,8 @@ public slots: /// threaded run of assignment void run(); - void readPendingDatagrams(); + void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle + void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); void sendStatsPacket(); diff --git a/assignment-client/src/audio/AudioMixerDatagramProcessor.cpp b/assignment-client/src/audio/AudioMixerDatagramProcessor.cpp new file mode 100644 index 0000000000..61f42e6f08 --- /dev/null +++ b/assignment-client/src/audio/AudioMixerDatagramProcessor.cpp @@ -0,0 +1,51 @@ +// +// AudioMixerDatagramProcessor.cpp +// assignment-client/src +// +// Created by Stephen Birarda on 2014-08-14. +// Copyright 2014 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include + +#include +#include + +#include "AudioMixerDatagramProcessor.h" + +AudioMixerDatagramProcessor::AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) : + _nodeSocket(nodeSocket), + _previousNodeSocketThread(previousNodeSocketThread) +{ + +} + +AudioMixerDatagramProcessor::~AudioMixerDatagramProcessor() { + // return the node socket to its previous thread + _nodeSocket.moveToThread(_previousNodeSocketThread); +} + +void AudioMixerDatagramProcessor::readPendingDatagrams() { + + HifiSockAddr senderSockAddr; + static QByteArray incomingPacket; + + NodeList* nodeList = NodeList::getInstance(); + + // read everything that is available + while (_nodeSocket.hasPendingDatagrams()) { + incomingPacket.resize(_nodeSocket.pendingDatagramSize()); + + // just get this packet off the stack + _nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(), + senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer()); + + if (nodeList->packetVersionAndHashMatch(incomingPacket)) { + // emit the signal to tell AudioMixer it needs to process a packet + emit packetRequiresProcessing(incomingPacket, senderSockAddr); + } + } +} diff --git a/assignment-client/src/audio/AudioMixerDatagramProcessor.h b/assignment-client/src/audio/AudioMixerDatagramProcessor.h new file mode 100644 index 0000000000..94233a1373 --- /dev/null +++ b/assignment-client/src/audio/AudioMixerDatagramProcessor.h @@ -0,0 +1,32 @@ +// +// AudioMixerDatagramProcessor.h +// assignment-client/src +// +// Created by Stephen Birarda on 2014-08-14. +// Copyright 2014 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_AudioMixerDatagramProcessor_h +#define hifi_AudioMixerDatagramProcessor_h + +#include +#include + +class AudioMixerDatagramProcessor : public QObject { + Q_OBJECT +public: + AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread); + ~AudioMixerDatagramProcessor(); +public slots: + void readPendingDatagrams(); +signals: + void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); +private: + QUdpSocket& _nodeSocket; + QThread* _previousNodeSocketThread; +}; + +#endif // hifi_AudioMixerDatagramProcessor_h \ No newline at end of file diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 0548e95c80..99c8a80bf5 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -160,11 +160,7 @@ void LimitedNodeList::changeSocketBufferSizes(int numBytes) { qDebug() << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize << "since it is larger than desired size of" << numBytes; } - - } - - } bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) { diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 4b92f8ba38..cd80c441c1 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -11,6 +11,7 @@ #include #include +#include #include #include "Logging.h" @@ -18,7 +19,8 @@ ThreadedAssignment::ThreadedAssignment(const QByteArray& packet) : Assignment(packet), - _isFinished(false) + _isFinished(false), + _datagramProcessingThread(NULL) { } @@ -28,10 +30,25 @@ void ThreadedAssignment::setFinished(bool isFinished) { if (_isFinished) { aboutToFinish(); - emit finished(); + + NodeList* nodeList = NodeList::getInstance(); + + // if we have a datagram processing thread, quit it and wait on it to make sure that + // the node socket is back on the same thread as the NodeList + + if (_datagramProcessingThread) { + // tell the datagram processing thread to quit and wait until it is done, then return the node socket to the NodeList + _datagramProcessingThread->quit(); + _datagramProcessingThread->wait(); + + // set node socket parent back to NodeList + nodeList->getNodeSocket().setParent(nodeList); + } // move the NodeList back to the QCoreApplication instance's thread - NodeList::getInstance()->moveToThread(QCoreApplication::instance()->thread()); + nodeList->moveToThread(QCoreApplication::instance()->thread()); + + emit finished(); } } diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h index e9241d0272..454baa85f2 100644 --- a/libraries/networking/src/ThreadedAssignment.h +++ b/libraries/networking/src/ThreadedAssignment.h @@ -20,6 +20,7 @@ class ThreadedAssignment : public Assignment { Q_OBJECT public: ThreadedAssignment(const QByteArray& packet); + void setFinished(bool isFinished); virtual void aboutToFinish() { }; void addPacketStatsAndSendStatsPacket(QJsonObject& statsObject); @@ -29,15 +30,18 @@ public slots: virtual void run() = 0; virtual void readPendingDatagrams() = 0; virtual void sendStatsPacket(); - +signals: + void finished(); + protected: bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr); void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true); bool _isFinished; + QThread* _datagramProcessingThread; + private slots: void checkInWithDomainServerOrExit(); -signals: - void finished(); + }; typedef QSharedPointer SharedAssignmentPointer; From c58bae9021ff028e836dfd63f8d0a544531a184f Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 14 Aug 2014 14:27:09 -0700 Subject: [PATCH 3/3] attempt to change buffers to 1MB only --- libraries/networking/src/LimitedNodeList.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 99c8a80bf5..f50f7493fb 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -82,7 +82,7 @@ LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort(); } - const int LARGER_BUFFER_SIZE = 2097152; + const int LARGER_BUFFER_SIZE = 1048576; changeSocketBufferSizes(LARGER_BUFFER_SIZE); _packetStatTimer.start();