mix audio with single node list read lock

This commit is contained in:
Zach Pomerantz 2016-11-30 16:37:34 -05:00
parent b4638105e3
commit 8a6dcdeb68
6 changed files with 159 additions and 97 deletions

View file

@ -356,21 +356,18 @@ void AudioMixer::start() {
while (!_isFinished) { while (!_isFinished) {
manageLoad(frameTimestamp, framesSinceManagement); manageLoad(frameTimestamp, framesSinceManagement);
{ // aquire the read-lock in a single thread, to avoid canonical rwlock undefined behaviors
QReadLocker readLocker(&nodeList->getMutex()); // node removal will acquire a write lock;
std::vector<SharedNodePointer> nodes; // read locks (in slave threads) while a write lock is pending have undefined order in pthread
nodeList->algorithm([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
nodeList->eachNode([&](const SharedNodePointer& node) { // prepare frames; pop off any new audio from their streams
// collect the available nodes (to queue them for the slavePool) std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
nodes.emplace_back(node);
// prepare frames; pop off any new audio from their streams
_stats.sumStreams += prepareFrame(node, frame); _stats.sumStreams += prepareFrame(node, frame);
}); });
// mix across slave threads // mix across slave threads
_slavePool.mix(nodes, frame); _slavePool.mix(cbegin, cend, frame);
} });
// gather stats // gather stats
_slavePool.each([&](AudioMixerSlave& slave) { _slavePool.each([&](AudioMixerSlave& slave) {

View file

@ -9,6 +9,8 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#include <algorithm>
#include <glm/glm.hpp> #include <glm/glm.hpp>
#include <glm/gtx/norm.hpp> #include <glm/gtx/norm.hpp>
#include <glm/gtx/vector_angle.hpp> #include <glm/gtx/vector_angle.hpp>
@ -131,7 +133,13 @@ void sendEnvironmentPacket(const SharedNodePointer& node, AudioMixerClientData&
} }
} }
void AudioMixerSlave::mix(const SharedNodePointer& node, unsigned int frame) { void AudioMixerSlave::configure(ConstIter begin, ConstIter end, unsigned int frame) {
_begin = begin;
_end = end;
_frame = frame;
}
void AudioMixerSlave::mix(const SharedNodePointer& node) {
// check that the node is valid // check that the node is valid
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data == nullptr) { if (data == nullptr) {
@ -181,7 +189,7 @@ void AudioMixerSlave::mix(const SharedNodePointer& node, unsigned int frame) {
// send stats packet (about every second) // send stats packet (about every second)
static const unsigned int NUM_FRAMES_PER_SEC = (int) ceil(AudioConstants::NETWORK_FRAMES_PER_SEC); static const unsigned int NUM_FRAMES_PER_SEC = (int) ceil(AudioConstants::NETWORK_FRAMES_PER_SEC);
if (data->shouldSendStats(frame % NUM_FRAMES_PER_SEC)) { if (data->shouldSendStats(_frame % NUM_FRAMES_PER_SEC)) {
data->sendAudioStreamStatsPackets(node); data->sendAudioStreamStatsPackets(node);
} }
} }
@ -195,7 +203,7 @@ bool AudioMixerSlave::prepareMix(const SharedNodePointer& node) {
memset(_mixedSamples, 0, sizeof(_mixedSamples)); memset(_mixedSamples, 0, sizeof(_mixedSamples));
// loop through all other nodes that have sufficient audio to mix // loop through all other nodes that have sufficient audio to mix
DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& otherNode){ std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode){
// make sure that we have audio data for this other node // make sure that we have audio data for this other node
// and that it isn't being ignored by our listening node // and that it isn't being ignored by our listening node
// and that it isn't ignoring our listening node // and that it isn't ignoring our listening node

View file

@ -17,6 +17,7 @@
#include <AudioRingBuffer.h> #include <AudioRingBuffer.h>
#include <ThreadedAssignment.h> #include <ThreadedAssignment.h>
#include <UUIDHasher.h> #include <UUIDHasher.h>
#include <NodeList.h>
#include "AudioMixerStats.h" #include "AudioMixerStats.h"
@ -27,9 +28,13 @@ class AudioMixerClientData;
class AudioMixerSlave { class AudioMixerSlave {
public: public:
using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end, unsigned int frame);
// mix and broadcast non-ignored streams to the node // mix and broadcast non-ignored streams to the node
// returns true if a mixed packet was sent to the node // returns true if a mixed packet was sent to the node
void mix(const SharedNodePointer& node, unsigned int frame); void mix(const SharedNodePointer& node);
AudioMixerStats stats; AudioMixerStats stats;
@ -48,6 +53,11 @@ private:
// mixing buffers // mixing buffers
float _mixedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO]; float _mixedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO];
int16_t _clampedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO]; int16_t _clampedSamples[AudioConstants::NETWORK_FRAME_SAMPLES_STEREO];
// frame state
ConstIter _begin;
ConstIter _end;
unsigned int _frame { 0 };
}; };
#endif // hifi_AudioMixerSlave_h #endif // hifi_AudioMixerSlave_h

View file

@ -14,22 +14,59 @@
#include "AudioMixerSlavePool.h" #include "AudioMixerSlavePool.h"
void AudioMixerSlaveThread::run() {
while (!_stop) {
wait();
SharedNodePointer node;
while (try_pop(node)) {
mix(node);
}
notify();
}
}
void AudioMixerSlaveThread::wait() {
Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] {
return _pool._numStarted != _pool._numThreads;
});
// toggle running state
++_pool._numStarted;
configure(_pool._begin, _pool._end, _pool._frame);
}
void AudioMixerSlaveThread::notify() {
{
Lock lock(_pool._mutex);
++_pool._numFinished;
}
_pool._poolCondition.notify_one();
}
bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node);
}
AudioMixerSlavePool::~AudioMixerSlavePool() { AudioMixerSlavePool::~AudioMixerSlavePool() {
{ {
std::unique_lock<std::mutex> lock(_mutex); Lock lock(_mutex);
wait(lock); wait(lock);
} }
setNumThreads(0); setNumThreads(0);
} }
void AudioMixerSlavePool::mix(const std::vector<SharedNodePointer>& nodes, unsigned int frame) { void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame) {
std::unique_lock<std::mutex> lock(_mutex); Lock lock(_mutex);
start(lock, nodes, frame); start(lock, begin, end, frame);
wait(lock); wait(lock);
} }
void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) { void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) {
std::unique_lock<std::mutex> lock(_mutex); Lock lock(_mutex);
assert(!_running); assert(!_running);
for (auto& slave : _slaves) { for (auto& slave : _slaves) {
@ -37,24 +74,26 @@ void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> funct
} }
} }
void AudioMixerSlavePool::start(std::unique_lock<std::mutex>& lock, const std::vector<SharedNodePointer>& nodes, unsigned int frame) { void AudioMixerSlavePool::start(Lock& lock, ConstIter begin, ConstIter end, unsigned int frame) {
assert(!_running); assert(!_running);
// fill the queue // fill the queue
for (auto& node : nodes) { std::for_each(begin, end, [&](const SharedNodePointer& node) {
_queue.emplace(node); _queue.emplace(node);
} });
// toggle running state // toggle running state
_frame = frame; _frame = frame;
_running = true; _running = true;
_numStarted = 0; _numStarted = 0;
_numFinished = 0; _numFinished = 0;
_begin = begin;
_end = end;
_slaveCondition.notify_all(); _slaveCondition.notify_all();
} }
void AudioMixerSlavePool::wait(std::unique_lock<std::mutex>& lock) { void AudioMixerSlavePool::wait(Lock& lock) {
if (_running) { if (_running) {
_poolCondition.wait(lock, [&] { _poolCondition.wait(lock, [&] {
return _numFinished == _numThreads; return _numFinished == _numThreads;
@ -67,27 +106,8 @@ void AudioMixerSlavePool::wait(std::unique_lock<std::mutex>& lock) {
_running = false; _running = false;
} }
void AudioMixerSlavePool::slaveWait() {
std::unique_lock<std::mutex> lock(_mutex);
_slaveCondition.wait(lock, [&] {
return _numStarted != _numThreads;
});
// toggle running state
++_numStarted;
}
void AudioMixerSlavePool::slaveNotify() {
{
std::unique_lock<std::mutex> lock(_mutex);
++_numFinished;
}
_poolCondition.notify_one();
}
void AudioMixerSlavePool::setNumThreads(int numThreads) { void AudioMixerSlavePool::setNumThreads(int numThreads) {
std::unique_lock<std::mutex> lock(_mutex); Lock lock(_mutex);
// ensure slave are not running // ensure slave are not running
assert(!_running); assert(!_running);
@ -121,7 +141,7 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) {
} }
// ...cycle slaves with empty queue... // ...cycle slaves with empty queue...
start(lock, std::vector<SharedNodePointer>(), 0); start(lock);
wait(lock); wait(lock);
// ...wait for them to finish... // ...wait for them to finish...
@ -136,14 +156,3 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) {
_numThreads = _numStarted = _numFinished = numThreads; _numThreads = _numStarted = _numFinished = numThreads;
} }
void AudioMixerSlaveThread::run() {
while (!_stop) {
_pool.slaveWait();
SharedNodePointer node;
while (_pool._queue.try_pop(node)) {
mix(node, _pool._frame);
}
_pool.slaveNotify();
}
}

View file

@ -22,49 +22,14 @@
#include "AudioMixerSlave.h" #include "AudioMixerSlave.h"
class AudioMixerSlaveThread; class AudioMixerSlavePool;
class AudioMixerSlavePool {
using Queue = tbb::concurrent_queue<SharedNodePointer>;
public:
AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AudioMixerSlavePool();
// mix on slave threads
void mix(const std::vector<SharedNodePointer>& nodes, unsigned int frame);
void each(std::function<void(AudioMixerSlave& slave)> functor);
void setNumThreads(int numThreads);
int numThreads() { return _numThreads; }
private:
void start(std::unique_lock<std::mutex>& lock, const std::vector<SharedNodePointer>& nodes, unsigned int frame);
void wait(std::unique_lock<std::mutex>& lock);
friend class AudioMixerSlaveThread;
// wait for pool to start (called by slaves)
void slaveWait();
// notify that the slave has finished (called by slave)
void slaveNotify();
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;
Queue _queue;
unsigned int _frame { 0 };
std::mutex _mutex;
std::condition_variable _slaveCondition;
std::condition_variable _poolCondition;
int _numThreads { 0 };
int _numStarted { 0 };
int _numFinished { 0 };
bool _running { false };
};
class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { class AudioMixerSlaveThread : public QThread, public AudioMixerSlave {
Q_OBJECT Q_OBJECT
using ConstIter = NodeList::const_iterator;
using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
public: public:
AudioMixerSlaveThread(AudioMixerSlavePool& pool) : _pool(pool) {} AudioMixerSlaveThread(AudioMixerSlavePool& pool) : _pool(pool) {}
@ -72,8 +37,64 @@ public:
void stop() { _stop = true; } void stop() { _stop = true; }
private: private:
friend class AudioMixerSlavePool;
void wait();
void notify();
bool try_pop(SharedNodePointer& node);
// frame state
AudioMixerSlavePool& _pool; AudioMixerSlavePool& _pool;
// synchronization state
std::atomic<bool> _stop; std::atomic<bool> _stop;
}; };
class AudioMixerSlavePool {
using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>;
using ConditionVariable = std::condition_variable;
public:
using ConstIter = NodeList::const_iterator;
AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AudioMixerSlavePool();
// mix on slave threads
void mix(ConstIter begin, ConstIter end, unsigned int frame);
// iterate over all slaves
void each(std::function<void(AudioMixerSlave& slave)> functor);
void setNumThreads(int numThreads);
int numThreads() { return _numThreads; }
private:
void start(Lock& lock, ConstIter begin = ConstIter(), ConstIter end = ConstIter(), unsigned int frame = 0);
void wait(Lock& lock);
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;
friend void AudioMixerSlaveThread::wait();
friend void AudioMixerSlaveThread::notify();
friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node);
// synchronization state
Mutex _mutex;
ConditionVariable _slaveCondition;
ConditionVariable _poolCondition;
int _numThreads { 0 };
int _numStarted { 0 };
int _numFinished { 0 };
bool _running { false };
// frame state
Queue _queue;
unsigned int _frame { 0 };
ConstIter _begin;
ConstIter _end;
};
#endif // hifi_AudioMixerSlavePool_h #endif // hifi_AudioMixerSlavePool_h

View file

@ -175,7 +175,24 @@ public:
SharedNodePointer findNodeWithAddr(const HifiSockAddr& addr); SharedNodePointer findNodeWithAddr(const HifiSockAddr& addr);
QReadWriteLock& getMutex() { return _nodeMutex; } // for use with algorithm
using value_type = SharedNodePointer;
using const_iterator = std::vector<value_type>::const_iterator;
// Cede control of iteration under a single read lock (e.g. for use by thread pools)
// This allows multiple threads (i.e. a thread pool) to share a lock
// without deadlocking when a dying node attempts to acquire a write lock
template<typename NodeAlgorithmLambda>
void algorithm(NodeAlgorithmLambda functor) {
QReadLocker readLock(&_nodeMutex);
std::vector<SharedNodePointer> nodes(_nodeHash.size());
std::transform(_nodeHash.cbegin(), _nodeHash.cend(), nodes.begin(), [](const NodeHash::value_type& it) {
return it.second;
});
functor(nodes.cbegin(), nodes.cend());
}
template<typename NodeLambda> template<typename NodeLambda>
void eachNode(NodeLambda functor) { void eachNode(NodeLambda functor) {