Merge pull request #9673 from zzmp/audio/threaded-decode

Parallelize audio packet decoding in mixer
This commit is contained in:
Chris Collins 2017-02-15 09:44:42 -08:00 committed by GitHub
commit d1a236954b
10 changed files with 221 additions and 142 deletions

View file

@ -47,40 +47,54 @@ static const QString AUDIO_THREADING_GROUP_KEY = "audio_threading";
int AudioMixer::_numStaticJitterFrames{ -1 }; int AudioMixer::_numStaticJitterFrames{ -1 };
float AudioMixer::_noiseMutingThreshold{ DEFAULT_NOISE_MUTING_THRESHOLD }; float AudioMixer::_noiseMutingThreshold{ DEFAULT_NOISE_MUTING_THRESHOLD };
float AudioMixer::_attenuationPerDoublingInDistance{ DEFAULT_ATTENUATION_PER_DOUBLING_IN_DISTANCE }; float AudioMixer::_attenuationPerDoublingInDistance{ DEFAULT_ATTENUATION_PER_DOUBLING_IN_DISTANCE };
std::map<QString, std::shared_ptr<CodecPlugin>> AudioMixer::_availableCodecs{ };
QStringList AudioMixer::_codecPreferenceOrder{};
QHash<QString, AABox> AudioMixer::_audioZones; QHash<QString, AABox> AudioMixer::_audioZones;
QVector<AudioMixer::ZoneSettings> AudioMixer::_zoneSettings; QVector<AudioMixer::ZoneSettings> AudioMixer::_zoneSettings;
QVector<AudioMixer::ReverbSettings> AudioMixer::_zoneReverbSettings; QVector<AudioMixer::ReverbSettings> AudioMixer::_zoneReverbSettings;
AudioMixer::AudioMixer(ReceivedMessage& message) : AudioMixer::AudioMixer(ReceivedMessage& message) :
ThreadedAssignment(message) { ThreadedAssignment(message) {
// hash the available codecs (on the mixer)
auto codecPlugins = PluginManager::getInstance()->getCodecPlugins();
std::for_each(codecPlugins.cbegin(), codecPlugins.cend(),
[&](const CodecPluginPointer& codec) {
_availableCodecs[codec->getName()] = codec;
});
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver(); auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerListenerForTypes({ PacketType::MicrophoneAudioNoEcho, PacketType::MicrophoneAudioWithEcho, // packets whose consequences are limited to their own node can be parallelized
PacketType::InjectAudio, PacketType::AudioStreamStats }, packetReceiver.registerListenerForTypes({
this, "handleAudioPacket"); PacketType::MicrophoneAudioNoEcho,
packetReceiver.registerListenerForTypes({ PacketType::SilentAudioFrame }, this, "handleSilentAudioPacket"); PacketType::MicrophoneAudioWithEcho,
packetReceiver.registerListener(PacketType::NegotiateAudioFormat, this, "handleNegotiateAudioFormat"); PacketType::InjectAudio,
PacketType::AudioStreamStats,
PacketType::SilentAudioFrame,
PacketType::NegotiateAudioFormat,
PacketType::MuteEnvironment,
PacketType::NodeIgnoreRequest,
PacketType::RadiusIgnoreRequest,
PacketType::RequestsDomainListData,
PacketType::PerAvatarGainSet },
this, "queueAudioPacket");
// packets whose consequences are global should be processed on the main thread
packetReceiver.registerListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket"); packetReceiver.registerListener(PacketType::MuteEnvironment, this, "handleMuteEnvironmentPacket");
packetReceiver.registerListener(PacketType::NodeIgnoreRequest, this, "handleNodeIgnoreRequestPacket");
packetReceiver.registerListener(PacketType::KillAvatar, this, "handleKillAvatarPacket");
packetReceiver.registerListener(PacketType::NodeMuteRequest, this, "handleNodeMuteRequestPacket"); packetReceiver.registerListener(PacketType::NodeMuteRequest, this, "handleNodeMuteRequestPacket");
packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket"); packetReceiver.registerListener(PacketType::KillAvatar, this, "handleKillAvatarPacket");
packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket");
packetReceiver.registerListener(PacketType::PerAvatarGainSet, this, "handlePerAvatarGainSetDataPacket");
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
} }
void AudioMixer::handleAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) { void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
getOrCreateClientData(sendingNode.data()); if (message->getType() == PacketType::SilentAudioFrame) {
DependencyManager::get<NodeList>()->updateNodeWithDataFromPacket(message, sendingNode); _numSilentPackets++;
} }
void AudioMixer::handleSilentAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) { getOrCreateClientData(node.data())->queuePacket(message, node);
_numSilentPackets++;
getOrCreateClientData(sendingNode.data());
DependencyManager::get<NodeList>()->updateNodeWithDataFromPacket(message, sendingNode);
} }
void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) { void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
@ -119,69 +133,28 @@ InputPluginList getInputPlugins() {
return result; return result;
} }
void saveInputPluginSettings(const InputPluginList& plugins) { // must be here to satisfy a reference in PluginManager::saveSettings()
} void saveInputPluginSettings(const InputPluginList& plugins) {}
const std::pair<QString, CodecPluginPointer> AudioMixer::negotiateCodec(std::vector<QString> codecs) {
void AudioMixer::handleNegotiateAudioFormat(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
QStringList availableCodecs;
auto codecPlugins = PluginManager::getInstance()->getCodecPlugins();
if (codecPlugins.size() > 0) {
for (auto& plugin : codecPlugins) {
auto codecName = plugin->getName();
qDebug() << "Codec available:" << codecName;
availableCodecs.append(codecName);
}
} else {
qDebug() << "No Codecs available...";
}
CodecPluginPointer selectedCodec;
QString selectedCodecName; QString selectedCodecName;
CodecPluginPointer selectedCodec;
QStringList codecPreferenceList = _codecPreferenceOrder.split(","); // read the codecs requested (by the client)
int minPreference = std::numeric_limits<int>::max();
for (auto& codec : codecs) {
if (_availableCodecs.count(codec) > 0) {
int preference = _codecPreferenceOrder.indexOf(codec);
// read the codecs requested by the client // choose the preferred, available codec
const int MAX_PREFERENCE = 99999; if (preference >= 0 && preference < minPreference) {
int preferredCodecIndex = MAX_PREFERENCE; minPreference = preference;
QString preferredCodec; selectedCodecName = codec;
quint8 numberOfCodecs = 0;
message->readPrimitive(&numberOfCodecs);
qDebug() << "numberOfCodecs:" << numberOfCodecs;
QStringList codecList;
for (quint16 i = 0; i < numberOfCodecs; i++) {
QString requestedCodec = message->readString();
int preferenceOfThisCodec = codecPreferenceList.indexOf(requestedCodec);
bool codecAvailable = availableCodecs.contains(requestedCodec);
qDebug() << "requestedCodec:" << requestedCodec << "preference:" << preferenceOfThisCodec << "available:" << codecAvailable;
if (codecAvailable) {
codecList.append(requestedCodec);
if (preferenceOfThisCodec >= 0 && preferenceOfThisCodec < preferredCodecIndex) {
qDebug() << "This codec is preferred...";
selectedCodecName = requestedCodec;
preferredCodecIndex = preferenceOfThisCodec;
}
}
}
qDebug() << "all requested and available codecs:" << codecList;
// choose first codec
if (!selectedCodecName.isEmpty()) {
if (codecPlugins.size() > 0) {
for (auto& plugin : codecPlugins) {
if (selectedCodecName == plugin->getName()) {
qDebug() << "Selecting codec:" << selectedCodecName;
selectedCodec = plugin;
break;
}
} }
} }
} }
auto clientData = getOrCreateClientData(sendingNode.data()); return std::make_pair(selectedCodecName, _availableCodecs[selectedCodecName]);
clientData->setupCodec(selectedCodec, selectedCodecName);
qDebug() << "selectedCodecName:" << selectedCodecName;
clientData->sendSelectAudioFormat(sendingNode, selectedCodecName);
} }
void AudioMixer::handleNodeKilled(SharedNodePointer killedNode) { void AudioMixer::handleNodeKilled(SharedNodePointer killedNode) {
@ -227,42 +200,6 @@ void AudioMixer::handleKillAvatarPacket(QSharedPointer<ReceivedMessage> packet,
} }
} }
void AudioMixer::handleRequestsDomainListDataPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->getOrCreateLinkedData(senderNode);
if (senderNode->getLinkedData()) {
AudioMixerClientData* nodeData = dynamic_cast<AudioMixerClientData*>(senderNode->getLinkedData());
if (nodeData != nullptr) {
bool isRequesting;
message->readPrimitive(&isRequesting);
nodeData->setRequestsDomainListData(isRequesting);
}
}
}
void AudioMixer::handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode) {
sendingNode->parseIgnoreRequestMessage(packet);
}
void AudioMixer::handlePerAvatarGainSetDataPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode) {
auto clientData = dynamic_cast<AudioMixerClientData*>(sendingNode->getLinkedData());
if (clientData) {
QUuid listeningNodeUUID = sendingNode->getUUID();
// parse the UUID from the packet
QUuid audioSourceUUID = QUuid::fromRfc4122(packet->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
uint8_t packedGain;
packet->readPrimitive(&packedGain);
float gain = unpackFloatGainFromByte(packedGain);
clientData->hrtfForStream(audioSourceUUID, QUuid()).setGainAdjustment(gain);
qDebug() << "Setting gain adjustment for hrtf[" << listeningNodeUUID << "][" << audioSourceUUID << "] to " << gain;
}
}
void AudioMixer::handleRadiusIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode) {
sendingNode->parseIgnoreRadiusRequestMessage(packet);
}
void AudioMixer::removeHRTFsForFinishedInjector(const QUuid& streamID) { void AudioMixer::removeHRTFsForFinishedInjector(const QUuid& streamID) {
auto injectorClientData = qobject_cast<AudioMixerClientData*>(sender()); auto injectorClientData = qobject_cast<AudioMixerClientData*>(sender());
if (injectorClientData) { if (injectorClientData) {
@ -323,6 +260,7 @@ void AudioMixer::sendStatsPacket() {
addTiming(_prepareTiming, "prepare"); addTiming(_prepareTiming, "prepare");
addTiming(_mixTiming, "mix"); addTiming(_mixTiming, "mix");
addTiming(_eventsTiming, "events"); addTiming(_eventsTiming, "events");
addTiming(_packetsTiming, "packets");
#ifdef HIFI_AUDIO_MIXER_DEBUG #ifdef HIFI_AUDIO_MIXER_DEBUG
timingStats["ns_per_mix"] = (_stats.totalMixes > 0) ? (float)(_stats.mixTime / _stats.totalMixes) : 0; timingStats["ns_per_mix"] = (_stats.totalMixes > 0) ? (float)(_stats.mixTime / _stats.totalMixes) : 0;
@ -452,19 +390,27 @@ void AudioMixer::start() {
++frame; ++frame;
++_numStatFrames; ++_numStatFrames;
// play nice with qt event-looping // process queued events (networking, global audio packets, &c.)
{ {
auto eventsTimer = _eventsTiming.timer(); auto eventsTimer = _eventsTiming.timer();
// since we're a while loop we need to yield to qt's event processing // since we're a while loop we need to yield to qt's event processing
QCoreApplication::processEvents(); QCoreApplication::processEvents();
if (_isFinished) { // process (node-isolated) audio packets across slave threads
// alert qt eventing that this is finished {
QCoreApplication::sendPostedEvents(this, QEvent::DeferredDelete); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
break; auto packetsTimer = _packetsTiming.timer();
_slavePool.processPackets(cbegin, cend);
});
} }
} }
if (_isFinished) {
// alert qt eventing that this is finished
QCoreApplication::sendPostedEvents(this, QEvent::DeferredDelete);
break;
}
} }
} }
@ -629,7 +575,8 @@ void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) {
const QString CODEC_PREFERENCE_ORDER = "codec_preference_order"; const QString CODEC_PREFERENCE_ORDER = "codec_preference_order";
if (audioEnvGroupObject[CODEC_PREFERENCE_ORDER].isString()) { if (audioEnvGroupObject[CODEC_PREFERENCE_ORDER].isString()) {
_codecPreferenceOrder = audioEnvGroupObject[CODEC_PREFERENCE_ORDER].toString(); QString codecPreferenceOrder = audioEnvGroupObject[CODEC_PREFERENCE_ORDER].toString();
_codecPreferenceOrder = codecPreferenceOrder.split(",");
qDebug() << "Codec preference order changed to" << _codecPreferenceOrder; qDebug() << "Codec preference order changed to" << _codecPreferenceOrder;
} }

View file

@ -49,6 +49,7 @@ public:
static const QHash<QString, AABox>& getAudioZones() { return _audioZones; } static const QHash<QString, AABox>& getAudioZones() { return _audioZones; }
static const QVector<ZoneSettings>& getZoneSettings() { return _zoneSettings; } static const QVector<ZoneSettings>& getZoneSettings() { return _zoneSettings; }
static const QVector<ReverbSettings>& getReverbSettings() { return _zoneReverbSettings; } static const QVector<ReverbSettings>& getReverbSettings() { return _zoneReverbSettings; }
static const std::pair<QString, CodecPluginPointer> negotiateCodec(std::vector<QString> codecs);
public slots: public slots:
void run() override; void run() override;
@ -56,20 +57,14 @@ public slots:
private slots: private slots:
// packet handlers // packet handlers
void handleAudioPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleSilentAudioPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode); void handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleNegotiateAudioFormat(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode);
void handleNodeKilled(SharedNodePointer killedNode);
void handleRequestsDomainListDataPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleRadiusIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleKillAvatarPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleNodeMuteRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode); void handleNodeMuteRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handlePerAvatarGainSetDataPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode); void handleNodeKilled(SharedNodePointer killedNode);
void handleKillAvatarPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void start(); void queueAudioPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void removeHRTFsForFinishedInjector(const QUuid& streamID); void removeHRTFsForFinishedInjector(const QUuid& streamID);
void start();
private: private:
// mixing helpers // mixing helpers
@ -93,8 +88,6 @@ private:
int _numStatFrames { 0 }; int _numStatFrames { 0 };
AudioMixerStats _stats; AudioMixerStats _stats;
QString _codecPreferenceOrder;
AudioMixerSlavePool _slavePool; AudioMixerSlavePool _slavePool;
class Timer { class Timer {
@ -124,13 +117,17 @@ private:
Timer _prepareTiming; Timer _prepareTiming;
Timer _mixTiming; Timer _mixTiming;
Timer _eventsTiming; Timer _eventsTiming;
Timer _packetsTiming;
static int _numStaticJitterFrames; // -1 denotes dynamic jitter buffering static int _numStaticJitterFrames; // -1 denotes dynamic jitter buffering
static float _noiseMutingThreshold; static float _noiseMutingThreshold;
static float _attenuationPerDoublingInDistance; static float _attenuationPerDoublingInDistance;
static std::map<QString, CodecPluginPointer> _availableCodecs;
static QStringList _codecPreferenceOrder;
static QHash<QString, AABox> _audioZones; static QHash<QString, AABox> _audioZones;
static QVector<ZoneSettings> _zoneSettings; static QVector<ZoneSettings> _zoneSettings;
static QVector<ReverbSettings> _zoneReverbSettings; static QVector<ReverbSettings> _zoneReverbSettings;
}; };
#endif // hifi_AudioMixer_h #endif // hifi_AudioMixer_h

View file

@ -19,6 +19,7 @@
#include "InjectedAudioStream.h" #include "InjectedAudioStream.h"
#include "AudioHelpers.h"
#include "AudioMixer.h" #include "AudioMixer.h"
#include "AudioMixerClientData.h" #include "AudioMixerClientData.h"
@ -47,6 +48,92 @@ AudioMixerClientData::~AudioMixerClientData() {
} }
} }
void AudioMixerClientData::queuePacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
if (!_packetQueue.node) {
_packetQueue.node = node;
}
_packetQueue.push(message);
}
void AudioMixerClientData::processPackets() {
SharedNodePointer node = _packetQueue.node;
assert(_packetQueue.empty() || node);
_packetQueue.node.clear();
while (!_packetQueue.empty()) {
auto& packet = _packetQueue.back();
switch (packet->getType()) {
case PacketType::MicrophoneAudioNoEcho:
case PacketType::MicrophoneAudioWithEcho:
case PacketType::InjectAudio:
case PacketType::AudioStreamStats:
case PacketType::SilentAudioFrame: {
QMutexLocker lock(&getMutex());
parseData(*packet);
break;
}
case PacketType::NegotiateAudioFormat:
negotiateAudioFormat(*packet, node);
break;
case PacketType::RequestsDomainListData:
parseRequestsDomainListData(*packet);
break;
case PacketType::PerAvatarGainSet:
parsePerAvatarGainSet(*packet, node);
break;
case PacketType::NodeIgnoreRequest:
parseNodeIgnoreRequest(packet, node);
break;
case PacketType::RadiusIgnoreRequest:
parseRadiusIgnoreRequest(packet, node);
break;
default:
Q_UNREACHABLE();
}
_packetQueue.pop();
}
assert(_packetQueue.empty());
}
void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) {
quint8 numberOfCodecs;
message.readPrimitive(&numberOfCodecs);
std::vector<QString> codecs;
for (auto i = 0; i < numberOfCodecs; i++) {
codecs.push_back(message.readString());
}
const std::pair<QString, CodecPluginPointer> codec = AudioMixer::negotiateCodec(codecs);
setupCodec(codec.second, codec.first);
sendSelectAudioFormat(node, codec.first);
}
void AudioMixerClientData::parseRequestsDomainListData(ReceivedMessage& message) {
bool isRequesting;
message.readPrimitive(&isRequesting);
setRequestsDomainListData(isRequesting);
}
void AudioMixerClientData::parsePerAvatarGainSet(ReceivedMessage& message, const SharedNodePointer& node) {
QUuid uuid = node->getUUID();
// parse the UUID from the packet
QUuid avatarUuid = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));
uint8_t packedGain;
message.readPrimitive(&packedGain);
float gain = unpackFloatGainFromByte(packedGain);
hrtfForStream(avatarUuid, QUuid()).setGainAdjustment(gain);
qDebug() << "Setting gain adjustment for hrtf[" << uuid << "][" << avatarUuid << "] to " << gain;
}
void AudioMixerClientData::parseNodeIgnoreRequest(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& node) {
node->parseIgnoreRequestMessage(message);
}
void AudioMixerClientData::parseRadiusIgnoreRequest(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& node) {
node->parseIgnoreRadiusRequestMessage(message);
}
AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() { AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() {
QReadLocker readLocker { &_streamsLock }; QReadLocker readLocker { &_streamsLock };

View file

@ -12,6 +12,8 @@
#ifndef hifi_AudioMixerClientData_h #ifndef hifi_AudioMixerClientData_h
#define hifi_AudioMixerClientData_h #define hifi_AudioMixerClientData_h
#include <queue>
#include <QtCore/QJsonObject> #include <QtCore/QJsonObject>
#include <AABox.h> #include <AABox.h>
@ -34,6 +36,9 @@ public:
using SharedStreamPointer = std::shared_ptr<PositionalAudioStream>; using SharedStreamPointer = std::shared_ptr<PositionalAudioStream>;
using AudioStreamMap = std::unordered_map<QUuid, SharedStreamPointer>; using AudioStreamMap = std::unordered_map<QUuid, SharedStreamPointer>;
void queuePacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer node);
void processPackets();
// locks the mutex to make a copy // locks the mutex to make a copy
AudioStreamMap getAudioStreams() { QReadLocker readLock { &_streamsLock }; return _audioStreams; } AudioStreamMap getAudioStreams() { QReadLocker readLock { &_streamsLock }; return _audioStreams; }
AvatarAudioStream* getAvatarAudioStream(); AvatarAudioStream* getAvatarAudioStream();
@ -56,7 +61,13 @@ public:
void removeAgentAvatarAudioStream(); void removeAgentAvatarAudioStream();
// packet parsers
int parseData(ReceivedMessage& message) override; int parseData(ReceivedMessage& message) override;
void negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node);
void parseRequestsDomainListData(ReceivedMessage& message);
void parsePerAvatarGainSet(ReceivedMessage& message, const SharedNodePointer& node);
void parseNodeIgnoreRequest(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& node);
void parseRadiusIgnoreRequest(QSharedPointer<ReceivedMessage> message, const SharedNodePointer& node);
// attempt to pop a frame from each audio stream, and return the number of streams from this client // attempt to pop a frame from each audio stream, and return the number of streams from this client
int checkBuffersBeforeFrameSend(); int checkBuffersBeforeFrameSend();
@ -105,11 +116,15 @@ public slots:
void sendSelectAudioFormat(SharedNodePointer node, const QString& selectedCodecName); void sendSelectAudioFormat(SharedNodePointer node, const QString& selectedCodecName);
private: private:
using IgnoreZone = AABox; struct PacketQueue : public std::queue<QSharedPointer<ReceivedMessage>> {
QWeakPointer<Node> node;
};
PacketQueue _packetQueue;
QReadWriteLock _streamsLock; QReadWriteLock _streamsLock;
AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID
using IgnoreZone = AABox;
class IgnoreZoneMemo { class IgnoreZoneMemo {
public: public:
IgnoreZoneMemo(AudioMixerClientData& data) : _data(data) {} IgnoreZoneMemo(AudioMixerClientData& data) : _data(data) {}

View file

@ -53,7 +53,14 @@ inline float computeGain(const AvatarAudioStream& listeningNodeStream, const Pos
inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd, inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd,
const glm::vec3& relativePosition); const glm::vec3& relativePosition);
void AudioMixerSlave::configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { void AudioMixerSlave::processPackets(const SharedNodePointer& node) {
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data) {
data->processPackets();
}
}
void AudioMixerSlave::configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) {
_begin = begin; _begin = begin;
_end = end; _end = end;
_frame = frame; _frame = frame;

View file

@ -30,9 +30,13 @@ class AudioMixerSlave {
public: public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); // process packets for a given node (requires no configuration)
void processPackets(const SharedNodePointer& node);
// mix and broadcast non-ignored streams to the node // configure a round of mixing
void configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio);
// mix and broadcast non-ignored streams to the node (requires configuration using configureMix, above)
// returns true if a mixed packet was sent to the node // returns true if a mixed packet was sent to the node
void mix(const SharedNodePointer& node); void mix(const SharedNodePointer& node);

View file

@ -21,7 +21,7 @@ void AudioMixerSlaveThread::run() {
// iterate over all available nodes // iterate over all available nodes
SharedNodePointer node; SharedNodePointer node;
while (try_pop(node)) { while (try_pop(node)) {
mix(node); (this->*_function)(node);
} }
bool stopping = _stop; bool stopping = _stop;
@ -41,7 +41,11 @@ void AudioMixerSlaveThread::wait() {
}); });
++_pool._numStarted; ++_pool._numStarted;
} }
configure(_pool._begin, _pool._end, _pool._frame, _pool._throttlingRatio);
if (_pool._configure) {
_pool._configure(*this);
}
_function = _pool._function;
} }
void AudioMixerSlaveThread::notify(bool stopping) { void AudioMixerSlaveThread::notify(bool stopping) {
@ -64,16 +68,31 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
static AudioMixerSlave slave; static AudioMixerSlave slave;
#endif #endif
void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) {
_function = &AudioMixerSlave::processPackets;
_configure = [](AudioMixerSlave& slave) {};
run(begin, end);
}
void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) {
_begin = begin; _function = &AudioMixerSlave::mix;
_end = end; _configure = [&](AudioMixerSlave& slave) {
slave.configureMix(_begin, _end, _frame, _throttlingRatio);
};
_frame = frame; _frame = frame;
_throttlingRatio = throttlingRatio; _throttlingRatio = throttlingRatio;
run(begin, end);
}
void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
_begin = begin;
_end = end;
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
slave.configure(_begin, _end, frame, throttlingRatio); _configure(slave);
std::for_each(begin, end, [&](const SharedNodePointer& node) { std::for_each(begin, end, [&](const SharedNodePointer& node) {
slave.mix(node); _function(slave, node);
}); });
#else #else
// fill the queue // fill the queue
@ -84,7 +103,7 @@ void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame
{ {
Lock lock(_mutex); Lock lock(_mutex);
// mix // run
_numStarted = _numFinished = 0; _numStarted = _numFinished = 0;
_slaveCondition.notify_all(); _slaveCondition.notify_all();

View file

@ -43,6 +43,7 @@ private:
bool try_pop(SharedNodePointer& node); bool try_pop(SharedNodePointer& node);
AudioMixerSlavePool& _pool; AudioMixerSlavePool& _pool;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false }; bool _stop { false };
}; };
@ -60,6 +61,9 @@ public:
AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AudioMixerSlavePool() { resize(0); } ~AudioMixerSlavePool() { resize(0); }
// process packets on slave threads
void processPackets(ConstIter begin, ConstIter end);
// mix on slave threads // mix on slave threads
void mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); void mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio);
@ -70,6 +74,7 @@ public:
int numThreads() { return _numThreads; } int numThreads() { return _numThreads; }
private: private:
void run(ConstIter begin, ConstIter end);
void resize(int numThreads); void resize(int numThreads);
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves; std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;
@ -82,6 +87,8 @@ private:
Mutex _mutex; Mutex _mutex;
ConditionVariable _slaveCondition; ConditionVariable _slaveCondition;
ConditionVariable _poolCondition; ConditionVariable _poolCondition;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node);
std::function<void(AudioMixerSlave&)> _configure;
int _numThreads { 0 }; int _numThreads { 0 };
int _numStarted { 0 }; // guarded by _mutex int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; // guarded by _mutex int _numFinished { 0 }; // guarded by _mutex

View file

@ -123,8 +123,6 @@ const CodecPluginList& PluginManager::getCodecPlugins() {
static CodecPluginList codecPlugins; static CodecPluginList codecPlugins;
static std::once_flag once; static std::once_flag once;
std::call_once(once, [&] { std::call_once(once, [&] {
//codecPlugins = ::getCodecPlugins();
// Now grab the dynamic plugins // Now grab the dynamic plugins
for (auto loader : getLoadedPlugins()) { for (auto loader : getLoadedPlugins()) {
CodecProvider* codecProvider = qobject_cast<CodecProvider*>(loader->instance()); CodecProvider* codecProvider = qobject_cast<CodecProvider*>(loader->instance());

View file

@ -145,10 +145,8 @@ void Deck::processFrames() {
} }
if (!nextClip) { if (!nextClip) {
qCDebug(recordingLog) << "No more frames available";
// No more frames available, so handle the end of playback // No more frames available, so handle the end of playback
if (_loop) { if (_loop) {
qCDebug(recordingLog) << "Looping enabled, seeking back to beginning";
// If we have looping enabled, start the playback over // If we have looping enabled, start the playback over
seek(0); seek(0);
// FIXME configure the recording scripting interface to reset the avatar basis on a loop // FIXME configure the recording scripting interface to reset the avatar basis on a loop