generalize audio thread pool to processPackets

This commit is contained in:
Zach Pomerantz 2017-02-13 22:42:35 -05:00
parent 7d8a947e6d
commit b102c2d1d0
5 changed files with 52 additions and 16 deletions

View file

@ -397,7 +397,7 @@ void AudioMixer::start() {
++frame; ++frame;
++_numStatFrames; ++_numStatFrames;
// process queued events (networking, &c.) // process queued events (networking, global audio packets, &c.)
{ {
auto eventsTimer = _eventsTiming.timer(); auto eventsTimer = _eventsTiming.timer();
@ -405,12 +405,11 @@ void AudioMixer::start() {
QCoreApplication::processEvents(); QCoreApplication::processEvents();
} }
// process audio packets // process audio packets (node-isolated audio packets) across slave threads
{ {
auto packetsTimer = _packetsTiming.timer(); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
nodeList->eachNode([&](SharedNodePointer& node) { auto packetsTimer = _packetsTiming.timer();
AudioMixerClientData* data = getOrCreateClientData(node.data()); _slavePool.processPackets(cbegin, cend);
data->processPackets();
}); });
} }

View file

@ -53,7 +53,14 @@ inline float computeGain(const AvatarAudioStream& listeningNodeStream, const Pos
inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd, inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd,
const glm::vec3& relativePosition); const glm::vec3& relativePosition);
void AudioMixerSlave::configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { void AudioMixerSlave::processPackets(const SharedNodePointer& node) {
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data) {
data->processPackets();
}
}
void AudioMixerSlave::configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) {
_begin = begin; _begin = begin;
_end = end; _end = end;
_frame = frame; _frame = frame;

View file

@ -30,9 +30,13 @@ class AudioMixerSlave {
public: public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); // process packets for a given node (requires no configuration)
void processPackets(const SharedNodePointer& node);
// mix and broadcast non-ignored streams to the node // configure a round of mixing
void configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio);
// mix and broadcast non-ignored streams to the node (requires configuration using configureMix, above)
// returns true if a mixed packet was sent to the node // returns true if a mixed packet was sent to the node
void mix(const SharedNodePointer& node); void mix(const SharedNodePointer& node);

View file

@ -21,7 +21,7 @@ void AudioMixerSlaveThread::run() {
// iterate over all available nodes // iterate over all available nodes
SharedNodePointer node; SharedNodePointer node;
while (try_pop(node)) { while (try_pop(node)) {
mix(node); (this->*_function)(node);
} }
bool stopping = _stop; bool stopping = _stop;
@ -41,7 +41,11 @@ void AudioMixerSlaveThread::wait() {
}); });
++_pool._numStarted; ++_pool._numStarted;
} }
configure(_pool._begin, _pool._end, _pool._frame, _pool._throttlingRatio);
if (_pool._configure) {
_pool._configure(*this);
}
_function = _pool._function;
} }
void AudioMixerSlaveThread::notify(bool stopping) { void AudioMixerSlaveThread::notify(bool stopping) {
@ -64,16 +68,31 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
static AudioMixerSlave slave; static AudioMixerSlave slave;
#endif #endif
void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) {
_function = &AudioMixerSlave::processPackets;
_configure = [](AudioMixerSlave& slave) {};
run(begin, end);
}
void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) {
_begin = begin; _function = &AudioMixerSlave::mix;
_end = end; _configure = [&](AudioMixerSlave& slave) {
slave.configureMix(_begin, _end, _frame, _throttlingRatio);
};
_frame = frame; _frame = frame;
_throttlingRatio = throttlingRatio; _throttlingRatio = throttlingRatio;
run(begin, end);
}
void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
_begin = begin;
_end = end;
#ifdef AUDIO_SINGLE_THREADED #ifdef AUDIO_SINGLE_THREADED
slave.configure(_begin, _end, frame, throttlingRatio); _configure(slave);
std::for_each(begin, end, [&](const SharedNodePointer& node) { std::for_each(begin, end, [&](const SharedNodePointer& node) {
slave.mix(node); _function(slave, node);
}); });
#else #else
// fill the queue // fill the queue
@ -84,7 +103,7 @@ void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame
{ {
Lock lock(_mutex); Lock lock(_mutex);
// mix // run
_numStarted = _numFinished = 0; _numStarted = _numFinished = 0;
_slaveCondition.notify_all(); _slaveCondition.notify_all();

View file

@ -43,6 +43,7 @@ private:
bool try_pop(SharedNodePointer& node); bool try_pop(SharedNodePointer& node);
AudioMixerSlavePool& _pool; AudioMixerSlavePool& _pool;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false }; bool _stop { false };
}; };
@ -60,6 +61,9 @@ public:
AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AudioMixerSlavePool() { resize(0); } ~AudioMixerSlavePool() { resize(0); }
// process packets on slave threads
void processPackets(ConstIter begin, ConstIter end);
// mix on slave threads // mix on slave threads
void mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); void mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio);
@ -70,6 +74,7 @@ public:
int numThreads() { return _numThreads; } int numThreads() { return _numThreads; }
private: private:
void run(ConstIter begin, ConstIter end);
void resize(int numThreads); void resize(int numThreads);
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves; std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves;
@ -82,6 +87,8 @@ private:
Mutex _mutex; Mutex _mutex;
ConditionVariable _slaveCondition; ConditionVariable _slaveCondition;
ConditionVariable _poolCondition; ConditionVariable _poolCondition;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node);
std::function<void(AudioMixerSlave&)> _configure;
int _numThreads { 0 }; int _numThreads { 0 };
int _numStarted { 0 }; // guarded by _mutex int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; // guarded by _mutex int _numFinished { 0 }; // guarded by _mutex