Merge pull request #3286 from birarda/mixer-analytics

de-couple audio mixer packet receive thread from processing
This commit is contained in:
AndrewMeadows 2014-08-14 16:42:21 -07:00
commit 1a5230ef08
7 changed files with 175 additions and 41 deletions

View file

@ -37,6 +37,7 @@
#include <QtCore/QJsonDocument>
#include <QtCore/QJsonObject>
#include <QtCore/QJsonValue>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include <QtNetwork/QNetworkRequest>
#include <QtNetwork/QNetworkReply>
@ -52,6 +53,7 @@
#include "AudioRingBuffer.h"
#include "AudioMixerClientData.h"
#include "AudioMixerDatagramProcessor.h"
#include "AvatarAudioStream.h"
#include "InjectedAudioStream.h"
@ -297,37 +299,31 @@ void AudioMixer::prepareMixForListeningNode(Node* node) {
}
}
void AudioMixer::readPendingDatagrams() {
QByteArray receivedPacket;
HifiSockAddr senderSockAddr;
void AudioMixer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
NodeList* nodeList = NodeList::getInstance();
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
// pull any new audio data from nodes off of the network stack
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|| mixerPacketType == PacketTypeInjectAudio
|| mixerPacketType == PacketTypeSilentAudioFrame
|| mixerPacketType == PacketTypeAudioStreamStats) {
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
} else if (mixerPacketType == PacketTypeMuteEnvironment) {
QByteArray packet = receivedPacket;
populatePacketHeader(packet, PacketTypeMuteEnvironment);
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) {
nodeList->writeDatagram(packet, packet.size(), node);
}
}
} else {
// let processNodeData handle it.
nodeList->processNodeData(senderSockAddr, receivedPacket);
// pull any new audio data from nodes off of the network stack
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|| mixerPacketType == PacketTypeInjectAudio
|| mixerPacketType == PacketTypeSilentAudioFrame
|| mixerPacketType == PacketTypeAudioStreamStats) {
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
} else if (mixerPacketType == PacketTypeMuteEnvironment) {
QByteArray packet = receivedPacket;
populatePacketHeader(packet, PacketTypeMuteEnvironment);
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) {
nodeList->writeDatagram(packet, packet.size(), node);
}
}
} else {
// let processNodeData handle it.
nodeList->processNodeData(senderSockAddr, receivedPacket);
}
}
@ -392,7 +388,35 @@ void AudioMixer::run() {
ThreadedAssignment::commonInit(AUDIO_MIXER_LOGGING_TARGET_NAME, NodeType::AudioMixer);
NodeList* nodeList = NodeList::getInstance();
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
// setup a QThread with us as parent that will house the AudioMixerDatagramProcessor
_datagramProcessingThread = new QThread(this);
// create an AudioMixerDatagramProcessor and move it to that thread
AudioMixerDatagramProcessor* datagramProcessor = new AudioMixerDatagramProcessor(nodeList->getNodeSocket(), thread());
datagramProcessor->moveToThread(_datagramProcessingThread);
// remove the NodeList as the parent of the node socket
nodeList->getNodeSocket().setParent(NULL);
nodeList->getNodeSocket().moveToThread(_datagramProcessingThread);
// let the datagram processor handle readyRead from node socket
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead,
datagramProcessor, &AudioMixerDatagramProcessor::readPendingDatagrams);
// connect to the datagram processing thread signal that tells us we have to handle a packet
connect(datagramProcessor, &AudioMixerDatagramProcessor::packetRequiresProcessing, this, &AudioMixer::readPendingDatagram);
// delete the datagram processor and the associated thread when the QThread quits
connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater);
connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater);
// start the datagram processing thread
_datagramProcessingThread->start();
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
nodeList->linkedDataCreateCallback = attachNewNodeDataToNode;

View file

@ -33,7 +33,8 @@ public slots:
/// threaded run of assignment
void run();
void readPendingDatagrams();
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
void sendStatsPacket();

View file

@ -0,0 +1,51 @@
//
// AudioMixerDatagramProcessor.cpp
// assignment-client/src
//
// Created by Stephen Birarda on 2014-08-14.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <QDebug>
#include <HifiSockAddr.h>
#include <NodeList.h>
#include "AudioMixerDatagramProcessor.h"
AudioMixerDatagramProcessor::AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) :
_nodeSocket(nodeSocket),
_previousNodeSocketThread(previousNodeSocketThread)
{
}
AudioMixerDatagramProcessor::~AudioMixerDatagramProcessor() {
// return the node socket to its previous thread
_nodeSocket.moveToThread(_previousNodeSocketThread);
}
void AudioMixerDatagramProcessor::readPendingDatagrams() {
HifiSockAddr senderSockAddr;
static QByteArray incomingPacket;
NodeList* nodeList = NodeList::getInstance();
// read everything that is available
while (_nodeSocket.hasPendingDatagrams()) {
incomingPacket.resize(_nodeSocket.pendingDatagramSize());
// just get this packet off the stack
_nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
if (nodeList->packetVersionAndHashMatch(incomingPacket)) {
// emit the signal to tell AudioMixer it needs to process a packet
emit packetRequiresProcessing(incomingPacket, senderSockAddr);
}
}
}

View file

@ -0,0 +1,32 @@
//
// AudioMixerDatagramProcessor.h
// assignment-client/src
//
// Created by Stephen Birarda on 2014-08-14.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_AudioMixerDatagramProcessor_h
#define hifi_AudioMixerDatagramProcessor_h
#include <qobject.h>
#include <qudpsocket.h>
class AudioMixerDatagramProcessor : public QObject {
Q_OBJECT
public:
AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread);
~AudioMixerDatagramProcessor();
public slots:
void readPendingDatagrams();
signals:
void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
private:
QUdpSocket& _nodeSocket;
QThread* _previousNodeSocketThread;
};
#endif // hifi_AudioMixerDatagramProcessor_h

View file

@ -147,15 +147,20 @@ void LimitedNodeList::changeSocketBufferSizes(int numBytes) {
setsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast<const char*>(&numBytes),
sizeof(numBytes));
int newBufferSize = 0;
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast<char*>(&newBufferSize), &sizeOfInt);
QString bufferTypeString = (i == 0) ? "send" : "receive";
qDebug() << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to" << newBufferSize << "bytes";
if (oldBufferSize < numBytes) {
int newBufferSize = 0;
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, bufferOpt, reinterpret_cast<char*>(&newBufferSize), &sizeOfInt);
qDebug() << "Changed socket" << bufferTypeString << "buffer size from" << oldBufferSize << "to"
<< newBufferSize << "bytes";
} else {
// don't make the buffer smaller
qDebug() << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
<< "since it is larger than desired size of" << numBytes;
}
}
}
bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {

View file

@ -11,6 +11,7 @@
#include <QtCore/QCoreApplication>
#include <QtCore/QJsonObject>
#include <QtCore/QThread>
#include <QtCore/QTimer>
#include "Logging.h"
@ -18,7 +19,8 @@
ThreadedAssignment::ThreadedAssignment(const QByteArray& packet) :
Assignment(packet),
_isFinished(false)
_isFinished(false),
_datagramProcessingThread(NULL)
{
}
@ -28,10 +30,25 @@ void ThreadedAssignment::setFinished(bool isFinished) {
if (_isFinished) {
aboutToFinish();
emit finished();
NodeList* nodeList = NodeList::getInstance();
// if we have a datagram processing thread, quit it and wait on it to make sure that
// the node socket is back on the same thread as the NodeList
if (_datagramProcessingThread) {
// tell the datagram processing thread to quit and wait until it is done, then return the node socket to the NodeList
_datagramProcessingThread->quit();
_datagramProcessingThread->wait();
// set node socket parent back to NodeList
nodeList->getNodeSocket().setParent(nodeList);
}
// move the NodeList back to the QCoreApplication instance's thread
NodeList::getInstance()->moveToThread(QCoreApplication::instance()->thread());
nodeList->moveToThread(QCoreApplication::instance()->thread());
emit finished();
}
}

View file

@ -20,6 +20,7 @@ class ThreadedAssignment : public Assignment {
Q_OBJECT
public:
ThreadedAssignment(const QByteArray& packet);
void setFinished(bool isFinished);
virtual void aboutToFinish() { };
void addPacketStatsAndSendStatsPacket(QJsonObject& statsObject);
@ -29,15 +30,18 @@ public slots:
virtual void run() = 0;
virtual void readPendingDatagrams() = 0;
virtual void sendStatsPacket();
signals:
void finished();
protected:
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
bool _isFinished;
QThread* _datagramProcessingThread;
private slots:
void checkInWithDomainServerOrExit();
signals:
void finished();
};
typedef QSharedPointer<ThreadedAssignment> SharedAssignmentPointer;