From b102c2d1d03bbfbe845cb7804c4f7af21e92ab2e Mon Sep 17 00:00:00 2001 From: Zach Pomerantz Date: Mon, 13 Feb 2017 22:42:35 -0500 Subject: [PATCH] generalize audio thread pool to processPackets --- assignment-client/src/audio/AudioMixer.cpp | 11 +++---- .../src/audio/AudioMixerSlave.cpp | 9 ++++- assignment-client/src/audio/AudioMixerSlave.h | 8 +++-- .../src/audio/AudioMixerSlavePool.cpp | 33 +++++++++++++++---- .../src/audio/AudioMixerSlavePool.h | 7 ++++ 5 files changed, 52 insertions(+), 16 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index e31929da37..0a7f4c464c 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -397,7 +397,7 @@ void AudioMixer::start() { ++frame; ++_numStatFrames; - // process queued events (networking, &c.) + // process queued events (networking, global audio packets, &c.) { auto eventsTimer = _eventsTiming.timer(); @@ -405,12 +405,11 @@ void AudioMixer::start() { QCoreApplication::processEvents(); } - // process audio packets + // process audio packets (node-isolated audio packets) across slave threads { - auto packetsTimer = _packetsTiming.timer(); - nodeList->eachNode([&](SharedNodePointer& node) { - AudioMixerClientData* data = getOrCreateClientData(node.data()); - data->processPackets(); + nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { + auto packetsTimer = _packetsTiming.timer(); + _slavePool.processPackets(cbegin, cend); }); } diff --git a/assignment-client/src/audio/AudioMixerSlave.cpp b/assignment-client/src/audio/AudioMixerSlave.cpp index 370df60ec5..6b53de89c2 100644 --- a/assignment-client/src/audio/AudioMixerSlave.cpp +++ b/assignment-client/src/audio/AudioMixerSlave.cpp @@ -53,7 +53,14 @@ inline float computeGain(const AvatarAudioStream& listeningNodeStream, const Pos inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd, const glm::vec3& relativePosition); -void AudioMixerSlave::configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { +void AudioMixerSlave::processPackets(const SharedNodePointer& node) { + AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); + if (data) { + data->processPackets(); + } +} + +void AudioMixerSlave::configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { _begin = begin; _end = end; _frame = frame; diff --git a/assignment-client/src/audio/AudioMixerSlave.h b/assignment-client/src/audio/AudioMixerSlave.h index 7b59500629..074d10ff40 100644 --- a/assignment-client/src/audio/AudioMixerSlave.h +++ b/assignment-client/src/audio/AudioMixerSlave.h @@ -30,9 +30,13 @@ class AudioMixerSlave { public: using ConstIter = NodeList::const_iterator; - void configure(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); + // process packets for a given node (requires no configuration) + void processPackets(const SharedNodePointer& node); - // mix and broadcast non-ignored streams to the node + // configure a round of mixing + void configureMix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); + + // mix and broadcast non-ignored streams to the node (requires configuration using configureMix, above) // returns true if a mixed packet was sent to the node void mix(const SharedNodePointer& node); diff --git a/assignment-client/src/audio/AudioMixerSlavePool.cpp b/assignment-client/src/audio/AudioMixerSlavePool.cpp index 9b20572b84..643361ac5d 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.cpp +++ b/assignment-client/src/audio/AudioMixerSlavePool.cpp @@ -21,7 +21,7 @@ void AudioMixerSlaveThread::run() { // iterate over all available nodes SharedNodePointer node; while (try_pop(node)) { - mix(node); + (this->*_function)(node); } bool stopping = _stop; @@ -41,7 +41,11 @@ void AudioMixerSlaveThread::wait() { }); ++_pool._numStarted; } - configure(_pool._begin, _pool._end, _pool._frame, _pool._throttlingRatio); + + if (_pool._configure) { + _pool._configure(*this); + } + _function = _pool._function; } void AudioMixerSlaveThread::notify(bool stopping) { @@ -64,16 +68,31 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { static AudioMixerSlave slave; #endif +void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { + _function = &AudioMixerSlave::processPackets; + _configure = [](AudioMixerSlave& slave) {}; + run(begin, end); +} + void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio) { - _begin = begin; - _end = end; + _function = &AudioMixerSlave::mix; + _configure = [&](AudioMixerSlave& slave) { + slave.configureMix(_begin, _end, _frame, _throttlingRatio); + }; _frame = frame; _throttlingRatio = throttlingRatio; + run(begin, end); +} + +void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { + _begin = begin; + _end = end; + #ifdef AUDIO_SINGLE_THREADED - slave.configure(_begin, _end, frame, throttlingRatio); + _configure(slave); std::for_each(begin, end, [&](const SharedNodePointer& node) { - slave.mix(node); + _function(slave, node); }); #else // fill the queue @@ -84,7 +103,7 @@ void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame { Lock lock(_mutex); - // mix + // run _numStarted = _numFinished = 0; _slaveCondition.notify_all(); diff --git a/assignment-client/src/audio/AudioMixerSlavePool.h b/assignment-client/src/audio/AudioMixerSlavePool.h index 19d2315d12..4082ea18d9 100644 --- a/assignment-client/src/audio/AudioMixerSlavePool.h +++ b/assignment-client/src/audio/AudioMixerSlavePool.h @@ -43,6 +43,7 @@ private: bool try_pop(SharedNodePointer& node); AudioMixerSlavePool& _pool; + void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; @@ -60,6 +61,9 @@ public: AudioMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } ~AudioMixerSlavePool() { resize(0); } + // process packets on slave threads + void processPackets(ConstIter begin, ConstIter end); + // mix on slave threads void mix(ConstIter begin, ConstIter end, unsigned int frame, float throttlingRatio); @@ -70,6 +74,7 @@ public: int numThreads() { return _numThreads; } private: + void run(ConstIter begin, ConstIter end); void resize(int numThreads); std::vector> _slaves; @@ -82,6 +87,8 @@ private: Mutex _mutex; ConditionVariable _slaveCondition; ConditionVariable _poolCondition; + void (AudioMixerSlave::*_function)(const SharedNodePointer& node); + std::function _configure; int _numThreads { 0 }; int _numStarted { 0 }; // guarded by _mutex int _numFinished { 0 }; // guarded by _mutex