From 7a440def876322a0978b669805c86eac47440140 Mon Sep 17 00:00:00 2001 From: Zach Pomerantz Date: Wed, 16 Nov 2016 20:03:08 -0500 Subject: [PATCH] add AudioMixerSlavePool --- assignment-client/src/audio/AudioMixer.cpp | 23 ++- assignment-client/src/audio/AudioMixer.h | 4 +- .../src/audio/AudioMixerSlavePool.cpp | 149 ++++++++++++++++++ .../src/audio/AudioMixerSlavePool.h | 79 ++++++++++ 4 files changed, 249 insertions(+), 6 deletions(-) create mode 100644 assignment-client/src/audio/AudioMixerSlavePool.cpp create mode 100644 assignment-client/src/audio/AudioMixerSlavePool.h diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 5a6014aee9..50ef901c2d 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -355,12 +355,27 @@ void AudioMixer::start() { while (!_isFinished) { manageLoad(frameTimestamp, framesSinceManagement); - slave.stats.reset(); + { + QReadLocker readLocker(&nodeList->getMutex()); + std::vector nodes; - nodeList->eachNode([&](const SharedNodePointer& node) { _stats.sumStreams += prepareFrame(node, frame); }); - nodeList->eachNode([&](const SharedNodePointer& node) { slave.mix(node, frame); }); + nodeList->eachNode([&](const SharedNodePointer& node) { + // collect the available nodes (to queue them for the slavePool) + nodes.emplace_back(node); - _stats.accumulate(slave.stats); + // prepare frames; pop off any new audio from their streams + _stats.sumStreams += prepareFrame(node, frame); + }); + + // mix across slave threads + slavePool.mix(nodes, frame); + } + + // gather stats + slavePool.each([&](AudioMixerSlave& slave) { + _stats.accumulate(slave.stats); + slave.stats.reset(); + }); ++frame; ++_numStatFrames; diff --git a/assignment-client/src/audio/AudioMixer.h b/assignment-client/src/audio/AudioMixer.h index 0bfcabd2ab..c1811b6655 100644 --- a/assignment-client/src/audio/AudioMixer.h +++ b/assignment-client/src/audio/AudioMixer.h @@ -19,7 +19,7 @@ #include #include "AudioMixerStats.h" -#include "AudioMixerSlave.h" +#include "AudioMixerSlavePool.h" class PositionalAudioStream; class AvatarAudioStream; @@ -88,7 +88,7 @@ private: QString _codecPreferenceOrder; - AudioMixerSlave slave; + AudioMixerSlavePool slavePool; static int _numStaticJitterFrames; // -1 denotes dynamic jitter buffering static float _noiseMutingThreshold; diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp new file mode 100644 index 0000000000..035233c48a --- /dev/null +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -0,0 +1,149 @@ +// +// AudioMixerSlavePool.cpp +// assignment-client/src/audio +// +// Created by Zach Pomerantz on 11/16/2016. +// Copyright 2016 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#include +#include + +#include "AudioMixerSlavePool.h" + +AudioMixerSlavePool::~AudioMixerSlavePool() { + { + std::unique_lock lock(_mutex); + wait(lock); + } + setNumThreads(0); +} + +void AudioMixerSlavePool::mix(const std::vector& nodes, unsigned int frame) { + std::unique_lock lock(_mutex); + start(lock, nodes, frame); + wait(lock); +} + +void AudioMixerSlavePool::each(std::function functor) { + std::unique_lock lock(_mutex); + assert(!_running); + + for (auto& slave : _slaves) { + functor(*slave.get()); + } +} + +void AudioMixerSlavePool::start(std::unique_lock& lock, const std::vector& nodes, unsigned int frame) { + assert(!_running); + + // fill the queue + for (auto& node : nodes) { + _queue.emplace(node); + } + + // toggle running state + _frame = frame; + _running = true; + _numStarted = 0; + _numFinished = 0; + + _slaveCondition.notify_all(); +} + +void AudioMixerSlavePool::wait(std::unique_lock& lock) { + if (_running) { + _poolCondition.wait(lock, [&] { + return _numFinished == _numThreads; + }); + } + + assert(_queue.empty()); + + // toggle running state + _running = false; +} + +void AudioMixerSlavePool::slaveWait() { + std::unique_lock lock(_mutex); + + _slaveCondition.wait(lock, [&] { + return _numStarted != _numThreads; + }); + + // toggle running state + ++_numStarted; +} + +void AudioMixerSlavePool::slaveNotify() { + { + std::unique_lock lock(_mutex); + ++_numFinished; + } + _poolCondition.notify_one(); +} + +void AudioMixerSlavePool::setNumThreads(int numThreads) { + std::unique_lock lock(_mutex); + + // ensure slave are not running + assert(!_running); + + // clamp to allowed size + { + // idealThreadCount returns -1 if cores cannot be detected - cast it to a large number + int maxThreads = (int)((unsigned int)QThread::idealThreadCount()); + int clampedThreads = std::min(std::max(1, numThreads), maxThreads); + if (clampedThreads != numThreads) { + qWarning("%s: clamped to %d (was %d)", __FUNCTION__, numThreads, clampedThreads); + numThreads = clampedThreads; + } + } + qDebug("%s: set %d threads", __FUNCTION__, numThreads); + + if (numThreads > _numThreads) { + // start new slaves + for (int i = 0; i < numThreads - _numThreads; ++i) { + auto slave = new AudioMixerSlaveThread(*this); + slave->start(); + _slaves.emplace_back(slave); + } + } else if (numThreads < _numThreads) { + auto extraBegin = _slaves.begin() + _numThreads; + + // stop extra slaves... + auto slave = extraBegin; + while (slave != _slaves.end()) { + (*slave)->stop(); + } + + // ...cycle slaves with empty queue... + start(lock, std::vector(), 0); + wait(lock); + + // ...wait for them to finish... + slave = extraBegin; + while (slave != _slaves.end()) { + (*slave)->wait(); + } + + // ...and delete them + _slaves.erase(extraBegin, _slaves.end()); + } + + _numThreads = _numStarted = _numFinished = numThreads; +} + +void AudioMixerSlaveThread::run() { + while (!_stop) { + _pool.slaveWait(); + SharedNodePointer node; + while (_pool._queue.try_pop(node)) { + mix(node, _pool._frame); + } + _pool.slaveNotify(); + } +} diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h new file mode 100644 index 0000000000..194df83c2b --- /dev/null +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -0,0 +1,79 @@ +// +// AudioMixerSlavePool.h +// assignment-client/src/audio +// +// Created by Zach Pomerantz on 11/16/2016. +// Copyright 2016 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_AudioMixerSlavePool_h +#define hifi_AudioMixerSlavePool_h + +#include +#include +#include + +#include + +#include + +#include "AudioMixerSlave.h" + +class AudioMixerSlaveThread; + +class AudioMixerSlavePool { + using Queue = tbb::concurrent_queue; + +public: + AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } + ~AudioMixerSlavePool(); + + // mix on slave threads + void mix(const std::vector& nodes, unsigned int frame); + + void each(std::function functor); + + void setNumThreads(int numThreads); + int numThreads() { return _numThreads; } + +private: + void start(std::unique_lock& lock, const std::vector& nodes, unsigned int frame); + void wait(std::unique_lock& lock); + + friend class AudioMixerSlaveThread; + + // wait for pool to start (called by slaves) + void slaveWait(); + // notify that the slave has finished (called by slave) + void slaveNotify(); + + std::vector> _slaves; + Queue _queue; + unsigned int _frame { 0 }; + + std::mutex _mutex; + std::condition_variable _slaveCondition; + std::condition_variable _poolCondition; + int _numThreads { 0 }; + int _numStarted { 0 }; + int _numFinished { 0 }; + bool _running { false }; +}; + +class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { + Q_OBJECT +public: + AudioMixerSlaveThread(AudioMixerSlavePool& pool) : _pool(pool) {} + + void run() override final; + void stop() { _stop = true; } + +private: + AudioMixerSlavePool& _pool; + std::atomic _stop; +}; + +#endif // hifi_AudioMixerSlavePool_h