add support for multiple jobs types on the slave pool

This commit is contained in:
Brad Hefta-Gaub 2017-02-15 20:49:45 -08:00
parent 65dbc6365d
commit bd72216574
4 changed files with 34 additions and 15 deletions

View file

@ -55,3 +55,5 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) {
_processIncomingPacketsElapsedTime += (end - start);
}
void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
}

View file

@ -28,6 +28,7 @@ public:
void configure(ConstIter begin, ConstIter end);
void processIncomingPackets(const SharedNodePointer& node);
void anotherJob(const SharedNodePointer& node);
void harvestStats(int& nodesProcessed, int& packetsProcessed, quint64& processIncomingPacketsElapsedTime);

View file

@ -21,7 +21,7 @@ void AvatarMixerSlaveThread::run() {
// iterate over all available nodes
SharedNodePointer node;
while (try_pop(node)) {
processIncomingPackets(node);
(this->*_function)(node);
}
bool stopping = _stop;
@ -41,7 +41,10 @@ void AvatarMixerSlaveThread::wait() {
});
++_pool._numStarted;
}
configure(_pool._begin, _pool._end);
if (_pool._configure) {
_pool._configure(*this);
}
_function = _pool._function;
}
void AvatarMixerSlaveThread::notify(bool stopping) {
@ -65,18 +68,26 @@ static AvatarMixerSlave slave;
#endif
void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::processIncomingPackets;
_configure = [](AvatarMixerSlave& slave) {};
run(begin, end);
}
void AvatarMixerSlavePool::anotherJob(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::anotherJob;
_configure = [](AvatarMixerSlave& slave) {};
run(begin, end);
}
void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
_begin = begin;
_end = end;
// ????
//_frame = frame;
//_throttlingRatio = throttlingRatio;
#ifdef AVATAR_SINGLE_THREADED
slave.configure(_begin, _end, frame, throttlingRatio);
#ifdef AUDIO_SINGLE_THREADED
_configure(slave);
std::for_each(begin, end, [&](const SharedNodePointer& node) {
slave.processIncomingPackets(node);
});
_function(slave, node);
});
#else
// fill the queue
std::for_each(_begin, _end, [&](const SharedNodePointer& node) {
@ -86,7 +97,7 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end
{
Lock lock(_mutex);
// start the job
// run
_numStarted = _numFinished = 0;
_slaveCondition.notify_all();
@ -103,6 +114,7 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end
#endif
}
void AvatarMixerSlavePool::each(std::function<void(AvatarMixerSlave& slave)> functor) {
#ifdef AVATAR_SINGLE_THREADED
functor(slave);

View file

@ -45,6 +45,7 @@ private:
bool try_pop(SharedNodePointer& node);
AvatarMixerSlavePool& _pool;
void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false };
};
@ -62,17 +63,18 @@ public:
AvatarMixerSlavePool(int numThreads = QThread::idealThreadCount()) { setNumThreads(numThreads); }
~AvatarMixerSlavePool() { resize(0); }
// Jobs the slave pool can do...
void processIncomingPackets(ConstIter begin, ConstIter end);
void anotherJob(ConstIter begin, ConstIter end);
// iterate over all slaves
void each(std::function<void(AvatarMixerSlave& slave)> functor);
void setNumThreads(int numThreads);
int numThreads() { return _numThreads; }
// Jobs the slave pool can do...
void processIncomingPackets(ConstIter begin, ConstIter end);
private:
void run(ConstIter begin, ConstIter end);
void resize(int numThreads);
std::vector<std::unique_ptr<AvatarMixerSlaveThread>> _slaves;
@ -85,6 +87,8 @@ private:
Mutex _mutex;
ConditionVariable _slaveCondition;
ConditionVariable _poolCondition;
void (AvatarMixerSlave::*_function)(const SharedNodePointer& node);
std::function<void(AvatarMixerSlave&)> _configure;
int _numThreads { 0 };
int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; // guarded by _mutex