From 8a6dcdeb6898e61fb7096e05021e38de93dc2788 Mon Sep 17 00:00:00 2001 From: Zach Pomerantz Date: Wed, 30 Nov 2016 16:37:34 -0500 Subject: [PATCH] mix audio with single node list read lock --- assignment-client/src/audio/AudioMixer.cpp | 19 ++-- .../src/audio/AudioMixerSlave.cpp | 14 ++- assignment-client/src/audio/AudioMixerSlave.h | 12 ++- .../src/audio/AudioMixerSlavePool.cpp | 91 +++++++++------- .../src/audio/AudioMixerSlavePool.h | 101 +++++++++++------- libraries/networking/src/LimitedNodeList.h | 19 +++- 6 files changed, 159 insertions(+), 97 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index a114f836b2..a2c4402cb0 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -356,21 +356,18 @@ void AudioMixer::start() { while (!_isFinished) { manageLoad(frameTimestamp, framesSinceManagement); - { - QReadLocker readLocker(&nodeList->getMutex()); - std::vector nodes; - - nodeList->eachNode([&](const SharedNodePointer& node) { - // collect the available nodes (to queue them for the slavePool) - nodes.emplace_back(node); - - // prepare frames; pop off any new audio from their streams + // aquire the read-lock in a single thread, to avoid canonical rwlock undefined behaviors + // node removal will acquire a write lock; + // read locks (in slave threads) while a write lock is pending have undefined order in pthread + nodeList->algorithm([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { + // prepare frames; pop off any new audio from their streams + std::for_each(cbegin, cend, [&](const SharedNodePointer& node) { _stats.sumStreams += prepareFrame(node, frame); }); // mix across slave threads - _slavePool.mix(nodes, frame); - } + _slavePool.mix(cbegin, cend, frame); + }); // gather stats _slavePool.each([&](AudioMixerSlave& slave) { diff --git a/assignment-client/src/audio/AudioMixerSlave.cpp b/assignment-client/src/audio/AudioMixerSlave.cpp index 6c302f4cbc..24a5e9306a 100644 --- a/assignment-client/src/audio/AudioMixerSlave.cpp +++ b/assignment-client/src/audio/AudioMixerSlave.cpp @@ -9,6 +9,8 @@ // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // +#include + #include #include #include @@ -131,7 +133,13 @@ void sendEnvironmentPacket(const SharedNodePointer& node, AudioMixerClientData& } } -void AudioMixerSlave::mix(const SharedNodePointer& node, unsigned int frame) { +void AudioMixerSlave::configure(ConstIter begin, ConstIter end, unsigned int frame) { + _begin = begin; + _end = end; + _frame = frame; +} + +void AudioMixerSlave::mix(const SharedNodePointer& node) { // check that the node is valid AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); if (data == nullptr) { @@ -181,7 +189,7 @@ void AudioMixerSlave::mix(const SharedNodePointer& node, unsigned int frame) { // send stats packet (about every second) static const unsigned int NUM_FRAMES_PER_SEC = (int) ceil(AudioConstants::NETWORK_FRAMES_PER_SEC); - if (data->shouldSendStats(frame % NUM_FRAMES_PER_SEC)) { + if (data->shouldSendStats(_frame % NUM_FRAMES_PER_SEC)) { data->sendAudioStreamStatsPackets(node); } } @@ -195,7 +203,7 @@ bool AudioMixerSlave::prepareMix(const SharedNodePointer& node) { memset(_mixedSamples, 0, sizeof(_mixedSamples)); // loop through all other nodes that have sufficient audio to mix - DependencyManager::get()->eachNode([&](const SharedNodePointer& otherNode){ + std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode){ // make sure that we have audio data for this other node // and that it isn't being ignored by our listening node // and that it isn't ignoring our listening node diff --git a/assignment-client/src/audio/AudioMixerSlave.h b/assignment-client/src/audio/AudioMixerSlave.h index 83906b6f84..8be1b697de 100644 --- a/assignment-client/src/audio/AudioMixerSlave.h +++ b/assignment-client/src/audio/AudioMixerSlave.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "AudioMixerStats.h" @@ -27,9 +28,13 @@ class AudioMixerClientData; class AudioMixerSlave { public: + using ConstIter = NodeList::const_iterator; + + void configure(ConstIter begin, ConstIter end, unsigned int frame); + // mix and broadcast non-ignored streams to the node // returns true if a mixed packet was sent to the node - void mix(const SharedNodePointer& node, unsigned int frame); + void mix(const SharedNodePointer& node); AudioMixerStats stats; @@ -48,6 +53,11 @@ private: // mixing buffers float _mixedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO]; int16_t _clampedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO]; + + // frame state + ConstIter _begin; + ConstIter _end; + unsigned int _frame { 0 }; }; #endif // hifi_AudioMixerSlave_h diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 035233c48a..e0b3ac03ac 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -14,22 +14,59 @@ #include "AudioMixerSlavePool.h" +void AudioMixerSlaveThread::run() { + while (!_stop) { + wait(); + + SharedNodePointer node; + while (try_pop(node)) { + mix(node); + } + + notify(); + } +} + +void AudioMixerSlaveThread::wait() { + Lock lock(_pool._mutex); + + _pool._slaveCondition.wait(lock, [&] { + return _pool._numStarted != _pool._numThreads; + }); + + // toggle running state + ++_pool._numStarted; + configure(_pool._begin, _pool._end, _pool._frame); +} + +void AudioMixerSlaveThread::notify() { + { + Lock lock(_pool._mutex); + ++_pool._numFinished; + } + _pool._poolCondition.notify_one(); +} + +bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { + return _pool._queue.try_pop(node); +} + AudioMixerSlavePool::~AudioMixerSlavePool() { { - std::unique_lock lock(_mutex); + Lock lock(_mutex); wait(lock); } setNumThreads(0); } -void AudioMixerSlavePool::mix(const std::vector& nodes, unsigned int frame) { - std::unique_lock lock(_mutex); - start(lock, nodes, frame); +void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame) { + Lock lock(_mutex); + start(lock, begin, end, frame); wait(lock); } void AudioMixerSlavePool::each(std::function functor) { - std::unique_lock lock(_mutex); + Lock lock(_mutex); assert(!_running); for (auto& slave : _slaves) { @@ -37,24 +74,26 @@ void AudioMixerSlavePool::each(std::function funct } } -void AudioMixerSlavePool::start(std::unique_lock& lock, const std::vector& nodes, unsigned int frame) { +void AudioMixerSlavePool::start(Lock& lock, ConstIter begin, ConstIter end, unsigned int frame) { assert(!_running); // fill the queue - for (auto& node : nodes) { + std::for_each(begin, end, [&](const SharedNodePointer& node) { _queue.emplace(node); - } + }); // toggle running state _frame = frame; _running = true; _numStarted = 0; _numFinished = 0; + _begin = begin; + _end = end; _slaveCondition.notify_all(); } -void AudioMixerSlavePool::wait(std::unique_lock& lock) { +void AudioMixerSlavePool::wait(Lock& lock) { if (_running) { _poolCondition.wait(lock, [&] { return _numFinished == _numThreads; @@ -67,27 +106,8 @@ void AudioMixerSlavePool::wait(std::unique_lock& lock) { _running = false; } -void AudioMixerSlavePool::slaveWait() { - std::unique_lock lock(_mutex); - - _slaveCondition.wait(lock, [&] { - return _numStarted != _numThreads; - }); - - // toggle running state - ++_numStarted; -} - -void AudioMixerSlavePool::slaveNotify() { - { - std::unique_lock lock(_mutex); - ++_numFinished; - } - _poolCondition.notify_one(); -} - void AudioMixerSlavePool::setNumThreads(int numThreads) { - std::unique_lock lock(_mutex); + Lock lock(_mutex); // ensure slave are not running assert(!_running); @@ -121,7 +141,7 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { } // ...cycle slaves with empty queue... - start(lock, std::vector(), 0); + start(lock); wait(lock); // ...wait for them to finish... @@ -136,14 +156,3 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { _numThreads = _numStarted = _numFinished = numThreads; } - -void AudioMixerSlaveThread::run() { - while (!_stop) { - _pool.slaveWait(); - SharedNodePointer node; - while (_pool._queue.try_pop(node)) { - mix(node, _pool._frame); - } - _pool.slaveNotify(); - } -} diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 194df83c2b..c8eb775f87 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -22,49 +22,14 @@ #include "AudioMixerSlave.h" -class AudioMixerSlaveThread; - -class AudioMixerSlavePool { - using Queue = tbb::concurrent_queue; - -public: - AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } - ~AudioMixerSlavePool(); - - // mix on slave threads - void mix(const std::vector& nodes, unsigned int frame); - - void each(std::function functor); - - void setNumThreads(int numThreads); - int numThreads() { return _numThreads; } - -private: - void start(std::unique_lock& lock, const std::vector& nodes, unsigned int frame); - void wait(std::unique_lock& lock); - - friend class AudioMixerSlaveThread; - - // wait for pool to start (called by slaves) - void slaveWait(); - // notify that the slave has finished (called by slave) - void slaveNotify(); - - std::vector> _slaves; - Queue _queue; - unsigned int _frame { 0 }; - - std::mutex _mutex; - std::condition_variable _slaveCondition; - std::condition_variable _poolCondition; - int _numThreads { 0 }; - int _numStarted { 0 }; - int _numFinished { 0 }; - bool _running { false }; -}; +class AudioMixerSlavePool; class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { Q_OBJECT + using ConstIter = NodeList::const_iterator; + using Mutex = std::mutex; + using Lock = std::unique_lock; + public: AudioMixerSlaveThread(AudioMixerSlavePool& pool) : _pool(pool) {} @@ -72,8 +37,64 @@ public: void stop() { _stop = true; } private: + friend class AudioMixerSlavePool; + + void wait(); + void notify(); + bool try_pop(SharedNodePointer& node); + + // frame state AudioMixerSlavePool& _pool; + + // synchronization state std::atomic _stop; }; +class AudioMixerSlavePool { + using Queue = tbb::concurrent_queue; + using Mutex = std::mutex; + using Lock = std::unique_lock; + using ConditionVariable = std::condition_variable; + +public: + using ConstIter = NodeList::const_iterator; + + AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } + ~AudioMixerSlavePool(); + + // mix on slave threads + void mix(ConstIter begin, ConstIter end, unsigned int frame); + + // iterate over all slaves + void each(std::function functor); + + void setNumThreads(int numThreads); + int numThreads() { return _numThreads; } + +private: + void start(Lock& lock, ConstIter begin = ConstIter(), ConstIter end = ConstIter(), unsigned int frame = 0); + void wait(Lock& lock); + + std::vector> _slaves; + + friend void AudioMixerSlaveThread::wait(); + friend void AudioMixerSlaveThread::notify(); + friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node); + + // synchronization state + Mutex _mutex; + ConditionVariable _slaveCondition; + ConditionVariable _poolCondition; + int _numThreads { 0 }; + int _numStarted { 0 }; + int _numFinished { 0 }; + bool _running { false }; + + // frame state + Queue _queue; + unsigned int _frame { 0 }; + ConstIter _begin; + ConstIter _end; +}; + #endif // hifi_AudioMixerSlavePool_h diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 00635e2c4b..ef26d87ec7 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -175,7 +175,24 @@ public: SharedNodePointer findNodeWithAddr(const HifiSockAddr& addr); - QReadWriteLock& getMutex() { return _nodeMutex; } + // for use with algorithm + using value_type = SharedNodePointer; + using const_iterator = std::vector::const_iterator; + + // Cede control of iteration under a single read lock (e.g. for use by thread pools) + // This allows multiple threads (i.e. a thread pool) to share a lock + // without deadlocking when a dying node attempts to acquire a write lock + template + void algorithm(NodeAlgorithmLambda functor) { + QReadLocker readLock(&_nodeMutex); + + std::vector nodes(_nodeHash.size()); + std::transform(_nodeHash.cbegin(), _nodeHash.cend(), nodes.begin(), [](const NodeHash::value_type& it) { + return it.second; + }); + + functor(nodes.cbegin(), nodes.cend()); + } template void eachNode(NodeLambda functor) {