simplify locks in AudioMixerSlavePool

This commit is contained in:
Zach Pomerantz 2016-12-07 01:17:34 +00:00
parent b2f995d3df
commit c4e435a166
2 changed files with 42 additions and 71 deletions

View file

@ -16,32 +16,34 @@
void AudioMixerSlaveThread::run() { void AudioMixerSlaveThread::run() {
while (!_stop) { while (!_stop) {
wait(); wait(); // for the audio pool to notify
// iterate over all available nodes
SharedNodePointer node; SharedNodePointer node;
while (try_pop(node)) { while (try_pop(node)) {
mix(node); mix(node);
} }
notify(); notify(); // the audio pool we are done
} }
} }
void AudioMixerSlaveThread::wait() { void AudioMixerSlaveThread::wait() {
Lock lock(_pool._mutex); {
Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] { _pool._slaveCondition.wait(lock, [&] {
return _pool._numStarted != _pool._numThreads; assert(_pool._numStarted <= _pool._numThreads);
}); return _pool._numStarted != _pool._numThreads;
});
// toggle running state ++_pool._numStarted;
++_pool._numStarted; }
configure(_pool._begin, _pool._end, _pool._frame); configure(_pool._begin, _pool._end, _pool._frame);
} }
void AudioMixerSlaveThread::notify() { void AudioMixerSlaveThread::notify() {
{ {
Lock lock(_pool._mutex); Lock lock(_pool._mutex);
assert(_pool._numFinished < _pool._numThreads);
++_pool._numFinished; ++_pool._numFinished;
} }
_pool._poolCondition.notify_one(); _pool._poolCondition.notify_one();
@ -51,34 +53,45 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node); return _pool._queue.try_pop(node);
} }
AudioMixerSlavePool::~AudioMixerSlavePool() {
resize(0);
}
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
static AudioMixerSlave slave; static AudioMixerSlave slave;
#endif #endif
void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame) { void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame) {
Lock lock(_mutex); _begin = begin;
_end = end;
_frame = frame;
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
slave.configure(_begin, _end, frame); slave.configure(_begin, _end, frame);
std::for_each(begin, end, [&](const SharedNodePointer& node) { std::for_each(begin, end, [&](const SharedNodePointer& node) {
slave.mix(node); slave.mix(node);
}); });
#else #else
start(lock, begin, end, frame); // fill the queue
wait(lock); std::for_each(_begin, _end, [&](const SharedNodePointer& node) {
#endif _queue.emplace(node);
});
{
Lock lock(_mutex);
// mix
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();
// wait
_poolCondition.wait(lock, [&] {
assert(_numFinished <= _numThreads);
return _numFinished == _numThreads;
});
}
assert(_numStarted == _numThreads);
assert(_queue.empty());
#endif
} }
void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) { void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) {
Lock lock(_mutex);
assert(!_running);
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
functor(slave); functor(slave);
#else #else
@ -88,41 +101,6 @@ void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> funct
#endif #endif
} }
void AudioMixerSlavePool::start(Lock& lock, ConstIter begin, ConstIter end, unsigned int frame) {
assert(lock.owns_lock());
assert(!_running);
// fill the queue
std::for_each(begin, end, [&](const SharedNodePointer& node) {
_queue.emplace(node);
});
// toggle running state
_running = true;
_numStarted = 0;
_numFinished = 0;
_begin = begin;
_end = end;
_frame = frame;
_slaveCondition.notify_all();
}
void AudioMixerSlavePool::wait(Lock& lock) {
assert(lock.owns_lock());
if (_running) {
_poolCondition.wait(lock, [&] {
return _numFinished == _numThreads;
});
}
assert(_queue.empty());
// toggle running state
_running = false;
}
void AudioMixerSlavePool::setNumThreads(int numThreads) { void AudioMixerSlavePool::setNumThreads(int numThreads) {
// clamp to allowed size // clamp to allowed size
{ {
@ -144,10 +122,6 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) {
} }
void AudioMixerSlavePool::resize(int numThreads) { void AudioMixerSlavePool::resize(int numThreads) {
Lock lock(_mutex);
// ensure slave are not running
assert(!_running);
assert(_numThreads == _slaves.size()); assert(_numThreads == _slaves.size());
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
@ -173,8 +147,8 @@ void AudioMixerSlavePool::resize(int numThreads) {
} }
// ...cycle slaves with empty queue... // ...cycle slaves with empty queue...
start(lock); _numStarted = _numFinished = 0;
lock.unlock(); _slaveCondition.notify_all();
// ...wait for them to finish... // ...wait for them to finish...
slave = extraBegin; slave = extraBegin;

View file

@ -50,6 +50,8 @@ private:
std::atomic<bool> _stop; std::atomic<bool> _stop;
}; };
// Slave pool for audio mixers
// AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread.
class AudioMixerSlavePool { class AudioMixerSlavePool {
using Queue = tbb::concurrent_queue<SharedNodePointer>; using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex; using Mutex = std::mutex;
@ -60,7 +62,7 @@ public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AudioMixerSlavePool(); ~AudioMixerSlavePool() { resize(0); }
// mix on slave threads // mix on slave threads
void mix(ConstIter begin, ConstIter end, unsigned int frame); void mix(ConstIter begin, ConstIter end, unsigned int frame);
@ -72,10 +74,6 @@ public:
int numThreads() { return _numThreads; } int numThreads() { return _numThreads; }
private: private:
// these methods require access to guarded members, so require a lock as argument
void start(Lock& lock, ConstIter begin = ConstIter(), ConstIter end = ConstIter(), unsigned int frame = 0);
void wait(Lock& lock);
void resize(int numThreads); void resize(int numThreads);
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves; std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;
@ -89,9 +87,8 @@ private:
ConditionVariable _slaveCondition; ConditionVariable _slaveCondition;
ConditionVariable _poolCondition; ConditionVariable _poolCondition;
int _numThreads { 0 }; int _numThreads { 0 };
int _numStarted { 0 }; int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; int _numFinished { 0 }; // guarded by _mutex
bool _running { false };
// frame state // frame state
Queue _queue; Queue _queue;