mirror of
https://thingvellir.net/git/overte
synced 2025-03-27 23:52:03 +01:00
decouple audio-mixer packet receipt from main thread
This commit is contained in:
parent
64c11ebf8f
commit
1e65a093f2
7 changed files with 164 additions and 39 deletions
|
@ -37,6 +37,7 @@
|
||||||
#include <QtCore/QJsonDocument>
|
#include <QtCore/QJsonDocument>
|
||||||
#include <QtCore/QJsonObject>
|
#include <QtCore/QJsonObject>
|
||||||
#include <QtCore/QJsonValue>
|
#include <QtCore/QJsonValue>
|
||||||
|
#include <QtCore/QThread>
|
||||||
#include <QtCore/QTimer>
|
#include <QtCore/QTimer>
|
||||||
#include <QtNetwork/QNetworkRequest>
|
#include <QtNetwork/QNetworkRequest>
|
||||||
#include <QtNetwork/QNetworkReply>
|
#include <QtNetwork/QNetworkReply>
|
||||||
|
@ -52,6 +53,7 @@
|
||||||
|
|
||||||
#include "AudioRingBuffer.h"
|
#include "AudioRingBuffer.h"
|
||||||
#include "AudioMixerClientData.h"
|
#include "AudioMixerClientData.h"
|
||||||
|
#include "AudioMixerDatagramProcessor.h"
|
||||||
#include "AvatarAudioStream.h"
|
#include "AvatarAudioStream.h"
|
||||||
#include "InjectedAudioStream.h"
|
#include "InjectedAudioStream.h"
|
||||||
|
|
||||||
|
@ -297,37 +299,31 @@ void AudioMixer::prepareMixForListeningNode(Node* node) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioMixer::readPendingDatagrams() {
|
void AudioMixer::readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
|
||||||
QByteArray receivedPacket;
|
|
||||||
HifiSockAddr senderSockAddr;
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
while (readAvailableDatagram(receivedPacket, senderSockAddr)) {
|
// pull any new audio data from nodes off of the network stack
|
||||||
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
|
||||||
// pull any new audio data from nodes off of the network stack
|
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|
||||||
PacketType mixerPacketType = packetTypeForPacket(receivedPacket);
|
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|
||||||
if (mixerPacketType == PacketTypeMicrophoneAudioNoEcho
|
|| mixerPacketType == PacketTypeInjectAudio
|
||||||
|| mixerPacketType == PacketTypeMicrophoneAudioWithEcho
|
|| mixerPacketType == PacketTypeSilentAudioFrame
|
||||||
|| mixerPacketType == PacketTypeInjectAudio
|
|| mixerPacketType == PacketTypeAudioStreamStats) {
|
||||||
|| mixerPacketType == PacketTypeSilentAudioFrame
|
|
||||||
|| mixerPacketType == PacketTypeAudioStreamStats) {
|
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
|
||||||
|
} else if (mixerPacketType == PacketTypeMuteEnvironment) {
|
||||||
nodeList->findNodeAndUpdateWithDataFromPacket(receivedPacket);
|
QByteArray packet = receivedPacket;
|
||||||
} else if (mixerPacketType == PacketTypeMuteEnvironment) {
|
populatePacketHeader(packet, PacketTypeMuteEnvironment);
|
||||||
QByteArray packet = receivedPacket;
|
|
||||||
populatePacketHeader(packet, PacketTypeMuteEnvironment);
|
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
|
||||||
|
if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) {
|
||||||
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
|
nodeList->writeDatagram(packet, packet.size(), node);
|
||||||
if (node->getType() == NodeType::Agent && node->getActiveSocket() && node->getLinkedData() && node != nodeList->sendingNodeForPacket(receivedPacket)) {
|
|
||||||
nodeList->writeDatagram(packet, packet.size(), node);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
// let processNodeData handle it.
|
|
||||||
nodeList->processNodeData(senderSockAddr, receivedPacket);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// let processNodeData handle it.
|
||||||
|
nodeList->processNodeData(senderSockAddr, receivedPacket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,7 +388,35 @@ void AudioMixer::run() {
|
||||||
ThreadedAssignment::commonInit(AUDIO_MIXER_LOGGING_TARGET_NAME, NodeType::AudioMixer);
|
ThreadedAssignment::commonInit(AUDIO_MIXER_LOGGING_TARGET_NAME, NodeType::AudioMixer);
|
||||||
|
|
||||||
NodeList* nodeList = NodeList::getInstance();
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
|
// we do not want this event loop to be the handler for UDP datagrams, so disconnect
|
||||||
|
disconnect(&nodeList->getNodeSocket(), 0, this, 0);
|
||||||
|
|
||||||
|
// setup a QThread with us as parent that will house the AudioMixerDatagramProcessor
|
||||||
|
_datagramProcessingThread = new QThread(this);
|
||||||
|
|
||||||
|
// create an AudioMixerDatagramProcessor and move it to that thread
|
||||||
|
AudioMixerDatagramProcessor* datagramProcessor = new AudioMixerDatagramProcessor(nodeList->getNodeSocket(), thread());
|
||||||
|
datagramProcessor->moveToThread(_datagramProcessingThread);
|
||||||
|
|
||||||
|
// remove the NodeList as the parent of the node socket
|
||||||
|
nodeList->getNodeSocket().setParent(NULL);
|
||||||
|
nodeList->getNodeSocket().moveToThread(_datagramProcessingThread);
|
||||||
|
|
||||||
|
// let the datagram processor handle readyRead from node socket
|
||||||
|
connect(&nodeList->getNodeSocket(), &QUdpSocket::readyRead,
|
||||||
|
datagramProcessor, &AudioMixerDatagramProcessor::readPendingDatagrams);
|
||||||
|
|
||||||
|
// connect to the datagram processing thread signal that tells us we have to handle a packet
|
||||||
|
connect(datagramProcessor, &AudioMixerDatagramProcessor::packetRequiresProcessing, this, &AudioMixer::readPendingDatagram);
|
||||||
|
|
||||||
|
// delete the datagram processor and the associated thread when the QThread quits
|
||||||
|
connect(_datagramProcessingThread, &QThread::finished, datagramProcessor, &QObject::deleteLater);
|
||||||
|
connect(datagramProcessor, &QObject::destroyed, _datagramProcessingThread, &QThread::deleteLater);
|
||||||
|
|
||||||
|
// start the datagram processing thread
|
||||||
|
_datagramProcessingThread->start();
|
||||||
|
|
||||||
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
|
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
|
||||||
|
|
||||||
nodeList->linkedDataCreateCallback = attachNewNodeDataToNode;
|
nodeList->linkedDataCreateCallback = attachNewNodeDataToNode;
|
||||||
|
|
|
@ -33,7 +33,8 @@ public slots:
|
||||||
/// threaded run of assignment
|
/// threaded run of assignment
|
||||||
void run();
|
void run();
|
||||||
|
|
||||||
void readPendingDatagrams();
|
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
|
||||||
|
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||||
|
|
||||||
void sendStatsPacket();
|
void sendStatsPacket();
|
||||||
|
|
||||||
|
|
51
assignment-client/src/audio/AudioMixerDatagramProcessor.cpp
Normal file
51
assignment-client/src/audio/AudioMixerDatagramProcessor.cpp
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
//
|
||||||
|
// AudioMixerDatagramProcessor.cpp
|
||||||
|
// assignment-client/src
|
||||||
|
//
|
||||||
|
// Created by Stephen Birarda on 2014-08-14.
|
||||||
|
// Copyright 2014 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#include <QDebug>
|
||||||
|
|
||||||
|
#include <HifiSockAddr.h>
|
||||||
|
#include <NodeList.h>
|
||||||
|
|
||||||
|
#include "AudioMixerDatagramProcessor.h"
|
||||||
|
|
||||||
|
AudioMixerDatagramProcessor::AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread) :
|
||||||
|
_nodeSocket(nodeSocket),
|
||||||
|
_previousNodeSocketThread(previousNodeSocketThread)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
AudioMixerDatagramProcessor::~AudioMixerDatagramProcessor() {
|
||||||
|
// return the node socket to its previous thread
|
||||||
|
_nodeSocket.moveToThread(_previousNodeSocketThread);
|
||||||
|
}
|
||||||
|
|
||||||
|
void AudioMixerDatagramProcessor::readPendingDatagrams() {
|
||||||
|
|
||||||
|
HifiSockAddr senderSockAddr;
|
||||||
|
static QByteArray incomingPacket;
|
||||||
|
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
|
// read everything that is available
|
||||||
|
while (_nodeSocket.hasPendingDatagrams()) {
|
||||||
|
incomingPacket.resize(_nodeSocket.pendingDatagramSize());
|
||||||
|
|
||||||
|
// just get this packet off the stack
|
||||||
|
_nodeSocket.readDatagram(incomingPacket.data(), incomingPacket.size(),
|
||||||
|
senderSockAddr.getAddressPointer(), senderSockAddr.getPortPointer());
|
||||||
|
|
||||||
|
if (nodeList->packetVersionAndHashMatch(incomingPacket)) {
|
||||||
|
// emit the signal to tell AudioMixer it needs to process a packet
|
||||||
|
emit packetRequiresProcessing(incomingPacket, senderSockAddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
32
assignment-client/src/audio/AudioMixerDatagramProcessor.h
Normal file
32
assignment-client/src/audio/AudioMixerDatagramProcessor.h
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
//
|
||||||
|
// AudioMixerDatagramProcessor.h
|
||||||
|
// assignment-client/src
|
||||||
|
//
|
||||||
|
// Created by Stephen Birarda on 2014-08-14.
|
||||||
|
// Copyright 2014 High Fidelity, Inc.
|
||||||
|
//
|
||||||
|
// Distributed under the Apache License, Version 2.0.
|
||||||
|
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
|
||||||
|
//
|
||||||
|
|
||||||
|
#ifndef hifi_AudioMixerDatagramProcessor_h
|
||||||
|
#define hifi_AudioMixerDatagramProcessor_h
|
||||||
|
|
||||||
|
#include <qobject.h>
|
||||||
|
#include <qudpsocket.h>
|
||||||
|
|
||||||
|
class AudioMixerDatagramProcessor : public QObject {
|
||||||
|
Q_OBJECT
|
||||||
|
public:
|
||||||
|
AudioMixerDatagramProcessor(QUdpSocket& nodeSocket, QThread* previousNodeSocketThread);
|
||||||
|
~AudioMixerDatagramProcessor();
|
||||||
|
public slots:
|
||||||
|
void readPendingDatagrams();
|
||||||
|
signals:
|
||||||
|
void packetRequiresProcessing(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
|
||||||
|
private:
|
||||||
|
QUdpSocket& _nodeSocket;
|
||||||
|
QThread* _previousNodeSocketThread;
|
||||||
|
};
|
||||||
|
|
||||||
|
#endif // hifi_AudioMixerDatagramProcessor_h
|
|
@ -160,11 +160,7 @@ void LimitedNodeList::changeSocketBufferSizes(int numBytes) {
|
||||||
qDebug() << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
|
qDebug() << "Did not change socket" << bufferTypeString << "buffer size from" << oldBufferSize
|
||||||
<< "since it is larger than desired size of" << numBytes;
|
<< "since it is larger than desired size of" << numBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {
|
bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
#include <QtCore/QCoreApplication>
|
#include <QtCore/QCoreApplication>
|
||||||
#include <QtCore/QJsonObject>
|
#include <QtCore/QJsonObject>
|
||||||
|
#include <QtCore/QThread>
|
||||||
#include <QtCore/QTimer>
|
#include <QtCore/QTimer>
|
||||||
|
|
||||||
#include "Logging.h"
|
#include "Logging.h"
|
||||||
|
@ -18,7 +19,8 @@
|
||||||
|
|
||||||
ThreadedAssignment::ThreadedAssignment(const QByteArray& packet) :
|
ThreadedAssignment::ThreadedAssignment(const QByteArray& packet) :
|
||||||
Assignment(packet),
|
Assignment(packet),
|
||||||
_isFinished(false)
|
_isFinished(false),
|
||||||
|
_datagramProcessingThread(NULL)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -28,10 +30,25 @@ void ThreadedAssignment::setFinished(bool isFinished) {
|
||||||
|
|
||||||
if (_isFinished) {
|
if (_isFinished) {
|
||||||
aboutToFinish();
|
aboutToFinish();
|
||||||
emit finished();
|
|
||||||
|
NodeList* nodeList = NodeList::getInstance();
|
||||||
|
|
||||||
|
// if we have a datagram processing thread, quit it and wait on it to make sure that
|
||||||
|
// the node socket is back on the same thread as the NodeList
|
||||||
|
|
||||||
|
if (_datagramProcessingThread) {
|
||||||
|
// tell the datagram processing thread to quit and wait until it is done, then return the node socket to the NodeList
|
||||||
|
_datagramProcessingThread->quit();
|
||||||
|
_datagramProcessingThread->wait();
|
||||||
|
|
||||||
|
// set node socket parent back to NodeList
|
||||||
|
nodeList->getNodeSocket().setParent(nodeList);
|
||||||
|
}
|
||||||
|
|
||||||
// move the NodeList back to the QCoreApplication instance's thread
|
// move the NodeList back to the QCoreApplication instance's thread
|
||||||
NodeList::getInstance()->moveToThread(QCoreApplication::instance()->thread());
|
nodeList->moveToThread(QCoreApplication::instance()->thread());
|
||||||
|
|
||||||
|
emit finished();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ class ThreadedAssignment : public Assignment {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
public:
|
public:
|
||||||
ThreadedAssignment(const QByteArray& packet);
|
ThreadedAssignment(const QByteArray& packet);
|
||||||
|
|
||||||
void setFinished(bool isFinished);
|
void setFinished(bool isFinished);
|
||||||
virtual void aboutToFinish() { };
|
virtual void aboutToFinish() { };
|
||||||
void addPacketStatsAndSendStatsPacket(QJsonObject& statsObject);
|
void addPacketStatsAndSendStatsPacket(QJsonObject& statsObject);
|
||||||
|
@ -29,15 +30,18 @@ public slots:
|
||||||
virtual void run() = 0;
|
virtual void run() = 0;
|
||||||
virtual void readPendingDatagrams() = 0;
|
virtual void readPendingDatagrams() = 0;
|
||||||
virtual void sendStatsPacket();
|
virtual void sendStatsPacket();
|
||||||
|
signals:
|
||||||
|
void finished();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
|
bool readAvailableDatagram(QByteArray& destinationByteArray, HifiSockAddr& senderSockAddr);
|
||||||
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
|
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
|
||||||
bool _isFinished;
|
bool _isFinished;
|
||||||
|
QThread* _datagramProcessingThread;
|
||||||
|
|
||||||
private slots:
|
private slots:
|
||||||
void checkInWithDomainServerOrExit();
|
void checkInWithDomainServerOrExit();
|
||||||
signals:
|
|
||||||
void finished();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef QSharedPointer<ThreadedAssignment> SharedAssignmentPointer;
|
typedef QSharedPointer<ThreadedAssignment> SharedAssignmentPointer;
|
||||||
|
|
Loading…
Reference in a new issue