From c4e435a1666d09c8191f71ef8dd66201dddc8ee1 Mon Sep 17 00:00:00 2001 From: Zach Pomerantz Date: Wed, 7 Dec 2016 01:17:34 +0000 Subject: [PATCH] simplify locks in AudioMixerSlavePool --- .../src/audio/AudioMixerSlavePool.cpp | 100 +++++++----------- .../src/audio/AudioMixerSlavePool.h | 13 +-- 2 files changed, 42 insertions(+), 71 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 7668553662..6d17028de3 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -16,32 +16,34 @@ void AudioMixerSlaveThread::run() { while (!_stop) { - wait(); + wait(); // for the audio pool to notify + // iterate over all available nodes SharedNodePointer node; while (try_pop(node)) { mix(node); } - notify(); + notify(); // the audio pool we are done } } void AudioMixerSlaveThread::wait() { - Lock lock(_pool._mutex); - - _pool._slaveCondition.wait(lock, [&] { - return _pool._numStarted != _pool._numThreads; - }); - - // toggle running state - ++_pool._numStarted; + { + Lock lock(_pool._mutex); + _pool._slaveCondition.wait(lock, [&] { + assert(_pool._numStarted <= _pool._numThreads); + return _pool._numStarted != _pool._numThreads; + }); + ++_pool._numStarted; + } configure(_pool._begin, _pool._end, _pool._frame); } void AudioMixerSlaveThread::notify() { { Lock lock(_pool._mutex); + assert(_pool._numFinished < _pool._numThreads); ++_pool._numFinished; } _pool._poolCondition.notify_one(); @@ -51,34 +53,45 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { return _pool._queue.try_pop(node); } -AudioMixerSlavePool::~AudioMixerSlavePool() { - resize(0); -} - #ifdef AUDIO_SINGLE_THREADED static AudioMixerSlave slave; #endif void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame) { - Lock lock(_mutex); + _begin = begin; + _end = end; + _frame = frame; #ifdef AUDIO_SINGLE_THREADED slave.configure(_begin, _end, frame); std::for_each(begin, end, [&](const SharedNodePointer& node) { slave.mix(node); }); - #else - start(lock, begin, end, frame); - wait(lock); -#endif + // fill the queue + std::for_each(_begin, _end, [&](const SharedNodePointer& node) { + _queue.emplace(node); + }); + { + Lock lock(_mutex); + + // mix + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); + + // wait + _poolCondition.wait(lock, [&] { + assert(_numFinished <= _numThreads); + return _numFinished == _numThreads; + }); + } + assert(_numStarted == _numThreads); + assert(_queue.empty()); +#endif } void AudioMixerSlavePool::each(std::function functor) { - Lock lock(_mutex); - assert(!_running); - #ifdef AUDIO_SINGLE_THREADED functor(slave); #else @@ -88,41 +101,6 @@ void AudioMixerSlavePool::each(std::function funct #endif } -void AudioMixerSlavePool::start(Lock& lock, ConstIter begin, ConstIter end, unsigned int frame) { - assert(lock.owns_lock()); - assert(!_running); - - // fill the queue - std::for_each(begin, end, [&](const SharedNodePointer& node) { - _queue.emplace(node); - }); - - // toggle running state - _running = true; - _numStarted = 0; - _numFinished = 0; - _begin = begin; - _end = end; - _frame = frame; - - _slaveCondition.notify_all(); -} - -void AudioMixerSlavePool::wait(Lock& lock) { - assert(lock.owns_lock()); - - if (_running) { - _poolCondition.wait(lock, [&] { - return _numFinished == _numThreads; - }); - } - - assert(_queue.empty()); - - // toggle running state - _running = false; -} - void AudioMixerSlavePool::setNumThreads(int numThreads) { // clamp to allowed size { @@ -144,10 +122,6 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) { } void AudioMixerSlavePool::resize(int numThreads) { - Lock lock(_mutex); - - // ensure slave are not running - assert(!_running); assert(_numThreads == _slaves.size()); #ifdef AUDIO_SINGLE_THREADED @@ -173,8 +147,8 @@ void AudioMixerSlavePool::resize(int numThreads) { } // ...cycle slaves with empty queue... - start(lock); - lock.unlock(); + _numStarted = _numFinished = 0; + _slaveCondition.notify_all(); // ...wait for them to finish... slave = extraBegin; diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 7043d2c50e..91f60f5094 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -50,6 +50,8 @@ private: std::atomic _stop; }; +// Slave pool for audio mixers +// AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. class AudioMixerSlavePool { using Queue = tbb::concurrent_queue; using Mutex = std::mutex; @@ -60,7 +62,7 @@ public: using ConstIter = NodeList::const_iterator; AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } - ~AudioMixerSlavePool(); + ~AudioMixerSlavePool() { resize(0); } // mix on slave threads void mix(ConstIter begin, ConstIter end, unsigned int frame); @@ -72,10 +74,6 @@ public: int numThreads() { return _numThreads; } private: - // these methods require access to guarded members, so require a lock as argument - void start(Lock& lock, ConstIter begin = ConstIter(), ConstIter end = ConstIter(), unsigned int frame = 0); - void wait(Lock& lock); - void resize(int numThreads); std::vector> _slaves; @@ -89,9 +87,8 @@ private: ConditionVariable _slaveCondition; ConditionVariable _poolCondition; int _numThreads { 0 }; - int _numStarted { 0 }; - int _numFinished { 0 }; - bool _running { false }; + int _numStarted { 0 }; // guarded by _mutex + int _numFinished { 0 }; // guarded by _mutex // frame state Queue _queue;