From bd722165743e0fd6ed65adf2b675f3874b420d90 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Wed, 15 Feb 2017 20:49:45 -0800 Subject: [PATCH] add support for multiple jobs types on the slave pool --- .../src/avatars/AvatarMixerSlave.cpp | 2 ++ .../src/avatars/AvatarMixerSlave.h | 1 + .../src/avatars/AvatarMixerSlavePool.cpp | 34 +++++++++++++------ .../src/avatars/AvatarMixerSlavePool.h | 12 ++++--- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 06d21b51b9..784b10ebfe 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -55,3 +55,5 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) { _processIncomingPacketsElapsedTime += (end - start); } +void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) { +} diff --git a/assignment-client/src/avatars/AvatarMixerSlave.h b/assignment-client/src/avatars/AvatarMixerSlave.h index 1abba8d46c..d75c6ae396 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.h +++ b/assignment-client/src/avatars/AvatarMixerSlave.h @@ -28,6 +28,7 @@ public: void configure(ConstIter begin, ConstIter end); void processIncomingPackets(const SharedNodePointer& node); + void anotherJob(const SharedNodePointer& node); void harvestStats(int& nodesProcessed, int& packetsProcessed, quint64& processIncomingPacketsElapsedTime); diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index e50a8475a0..1b335f8383 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -21,7 +21,7 @@ void AvatarMixerSlaveThread::run() { // iterate over all available nodes SharedNodePointer node; while (try_pop(node)) { - processIncomingPackets(node); + (this->*_function)(node); } bool stopping = _stop; @@ -41,7 +41,10 @@ void AvatarMixerSlaveThread::wait() { }); ++_pool._numStarted; } - configure(_pool._begin, _pool._end); + if (_pool._configure) { + _pool._configure(*this); + } + _function = _pool._function; } void AvatarMixerSlaveThread::notify(bool stopping) { @@ -65,18 +68,26 @@ static AvatarMixerSlave slave; #endif void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { + _function = &AvatarMixerSlave::processIncomingPackets; + _configure = [](AvatarMixerSlave& slave) {}; + run(begin, end); +} + +void AvatarMixerSlavePool::anotherJob(ConstIter begin, ConstIter end) { + _function = &AvatarMixerSlave::anotherJob; + _configure = [](AvatarMixerSlave& slave) {}; + run(begin, end); +} + +void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { _begin = begin; _end = end; - // ???? - //_frame = frame; - //_throttlingRatio = throttlingRatio; - -#ifdef AVATAR_SINGLE_THREADED - slave.configure(_begin, _end, frame, throttlingRatio); +#ifdef AUDIO_SINGLE_THREADED + _configure(slave); std::for_each(begin, end, [&](const SharedNodePointer& node) { - slave.processIncomingPackets(node); - }); + _function(slave, node); +}); #else // fill the queue std::for_each(_begin, _end, [&](const SharedNodePointer& node) { @@ -86,7 +97,7 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end { Lock lock(_mutex); - // start the job + // run _numStarted = _numFinished = 0; _slaveCondition.notify_all(); @@ -103,6 +114,7 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end #endif } + void AvatarMixerSlavePool::each(std::function functor) { #ifdef AVATAR_SINGLE_THREADED functor(slave); diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.h b/assignment-client/src/avatars/AvatarMixerSlavePool.h index 9865f897b9..8f13477a93 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.h +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.h @@ -45,6 +45,7 @@ private: bool try_pop(SharedNodePointer& node); AvatarMixerSlavePool& _pool; + void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; bool _stop { false }; }; @@ -62,17 +63,18 @@ public: AvatarMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); } ~AvatarMixerSlavePool() { resize(0); } + // Jobs the slave pool can do... + void processIncomingPackets(ConstIter begin, ConstIter end); + void anotherJob(ConstIter begin, ConstIter end); + // iterate over all slaves void each(std::function functor); void setNumThreads(int numThreads); int numThreads() { return _numThreads; } - // Jobs the slave pool can do... - void processIncomingPackets(ConstIter begin, ConstIter end); - - private: + void run(ConstIter begin, ConstIter end); void resize(int numThreads); std::vector> _slaves; @@ -85,6 +87,8 @@ private: Mutex _mutex; ConditionVariable _slaveCondition; ConditionVariable _poolCondition; + void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); + std::function _configure; int _numThreads { 0 }; int _numStarted { 0 }; // guarded by _mutex int _numFinished { 0 }; // guarded by _mutex