slave -> worker

This commit is contained in:
HifiExperiments 2024-08-13 00:06:55 -07:00
parent 493c9c9441
commit 2876a4d4ea
21 changed files with 279 additions and 279 deletions

View file

@ -277,14 +277,14 @@ void AudioMixer::sendStatsPacket() {
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
QJsonObject qtStats; QJsonObject qtStats;
_slavePool.queueStats(qtStats); _workerPool.queueStats(qtStats);
statsObject["audio_thread_event_queue"] = qtStats; statsObject["audio_thread_event_queue"] = qtStats;
#endif #endif
// general stats // general stats
statsObject["useDynamicJitterBuffers"] = _numStaticJitterFrames == DISABLE_STATIC_JITTER_FRAMES; statsObject["useDynamicJitterBuffers"] = _numStaticJitterFrames == DISABLE_STATIC_JITTER_FRAMES;
statsObject["threads"] = _slavePool.numThreads(); statsObject["threads"] = _workerPool.numThreads();
statsObject["trailing_mix_ratio"] = _trailingMixRatio; statsObject["trailing_mix_ratio"] = _trailingMixRatio;
statsObject["throttling_ratio"] = _throttlingRatio; statsObject["throttling_ratio"] = _throttlingRatio;
@ -433,15 +433,15 @@ void AudioMixer::start() {
auto frameTimer = _frameTiming.timer(); auto frameTimer = _frameTiming.timer();
// process (node-isolated) audio packets across slave threads // process (node-isolated) audio packets across worker threads
{ {
auto packetsTimer = _packetsTiming.timer(); auto packetsTimer = _packetsTiming.timer();
// first clear the concurrent vector of added streams that the slaves will add to when they process packets // first clear the concurrent vector of added streams that the workers will add to when they process packets
_workerSharedData.addedStreams.clear(); _workerSharedData.addedStreams.clear();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
_slavePool.processPackets(cbegin, cend); _workerPool.processPackets(cbegin, cend);
}); });
} }
@ -463,15 +463,15 @@ void AudioMixer::start() {
numToRetain = nodeList->size() * (1.0f - _throttlingRatio); numToRetain = nodeList->size() * (1.0f - _throttlingRatio);
} }
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
// mix across slave threads // mix across worker threads
auto mixTimer = _mixTiming.timer(); auto mixTimer = _mixTiming.timer();
_slavePool.mix(cbegin, cend, frame, numToRetain); _workerPool.mix(cbegin, cend, frame, numToRetain);
}); });
// gather stats // gather stats
_slavePool.each([&](AudioMixerSlave& slave) { _workerPool.each([&](AudioMixerWorker& worker) {
_stats.accumulate(slave.stats); _stats.accumulate(worker.stats);
slave.stats.reset(); worker.stats.reset();
}); });
++frame; ++frame;
@ -573,7 +573,7 @@ void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
const QString NUM_THREADS = "num_threads"; const QString NUM_THREADS = "num_threads";
int numThreads = audioThreadingGroupObject[NUM_THREADS].toString().toInt(&ok); int numThreads = audioThreadingGroupObject[NUM_THREADS].toString().toInt(&ok);
if (ok) { if (ok) {
_slavePool.setNumThreads(numThreads); _workerPool.setNumThreads(numThreads);
} }
} }

View file

@ -23,7 +23,7 @@
#include <plugins/Forward.h> #include <plugins/Forward.h>
#include "AudioMixerStats.h" #include "AudioMixerStats.h"
#include "AudioMixerSlavePool.h" #include "AudioMixerWorkerPool.h"
class PositionalAudioStream; class PositionalAudioStream;
class AvatarAudioStream; class AvatarAudioStream;
@ -107,7 +107,7 @@ private:
int _numStatFrames { 0 }; int _numStatFrames { 0 };
AudioMixerStats _stats; AudioMixerStats _stats;
AudioMixerSlavePool _slavePool { _workerSharedData }; AudioMixerWorkerPool _workerPool { _workerSharedData };
class Timer { class Timer {
public: public:
@ -153,7 +153,7 @@ private:
float _throttleStartTarget = 0.9f; float _throttleStartTarget = 0.9f;
float _throttleBackoffTarget = 0.44f; float _throttleBackoffTarget = 0.44f;
AudioMixerSlave::SharedData _workerSharedData; AudioMixerWorker::SharedData _workerSharedData;
}; };
#endif // hifi_AudioMixer_h #endif // hifi_AudioMixer_h

View file

@ -140,11 +140,11 @@ public:
Streams& getStreams() { return _streams; } Streams& getStreams() { return _streams; }
// thread-safe, called from AudioMixerSlave(s) while processing ignore packets for other nodes // thread-safe, called from AudioMixerWorker(s) while processing ignore packets for other nodes
void ignoredByNode(QUuid nodeID); void ignoredByNode(QUuid nodeID);
void unignoredByNode(QUuid nodeID); void unignoredByNode(QUuid nodeID);
// start of methods called non-concurrently from single AudioMixerSlave mixing for the owning node // start of methods called non-concurrently from single AudioMixerWorker mixing for the owning node
const Node::IgnoredNodeIDs& getNewIgnoredNodeIDs() const { return _newIgnoredNodeIDs; } const Node::IgnoredNodeIDs& getNewIgnoredNodeIDs() const { return _newIgnoredNodeIDs; }
const Node::IgnoredNodeIDs& getNewUnignoredNodeIDs() const { return _newUnignoredNodeIDs; } const Node::IgnoredNodeIDs& getNewUnignoredNodeIDs() const { return _newUnignoredNodeIDs; }
@ -163,7 +163,7 @@ public:
bool getHasReceivedFirstMix() const { return _hasReceivedFirstMix; } bool getHasReceivedFirstMix() const { return _hasReceivedFirstMix; }
void setHasReceivedFirstMix(bool hasReceivedFirstMix) { _hasReceivedFirstMix = hasReceivedFirstMix; } void setHasReceivedFirstMix(bool hasReceivedFirstMix) { _hasReceivedFirstMix = hasReceivedFirstMix; }
// end of methods called non-concurrently from single AudioMixerSlave // end of methods called non-concurrently from single AudioMixerWorker
signals: signals:
void injectorStreamFinished(const QUuid& streamID); void injectorStreamFinished(const QUuid& streamID);

View file

@ -1,5 +1,5 @@
// //
// AudioMixerSlave.cpp // AudioMixerWorker.cpp
// assignment-client/src/audio // assignment-client/src/audio
// //
// Created by Zach Pomerantz on 11/22/16. // Created by Zach Pomerantz on 11/22/16.
@ -9,7 +9,7 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#include "AudioMixerSlave.h" #include "AudioMixerWorker.h"
#include <algorithm> #include <algorithm>
@ -55,7 +55,7 @@ inline float computeGain(float masterAvatarGain, float masterInjectorGain, const
inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd, inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const PositionalAudioStream& streamToAdd,
const glm::vec3& relativePosition); const glm::vec3& relativePosition);
void AudioMixerSlave::processPackets(const SharedNodePointer& node) { void AudioMixerWorker::processPackets(const SharedNodePointer& node) {
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data) { if (data) {
// process packets and collect the number of streams available for this frame // process packets and collect the number of streams available for this frame
@ -63,14 +63,14 @@ void AudioMixerSlave::processPackets(const SharedNodePointer& node) {
} }
} }
void AudioMixerSlave::configureMix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) { void AudioMixerWorker::configureMix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) {
_begin = begin; _begin = begin;
_end = end; _end = end;
_frame = frame; _frame = frame;
_numToRetain = numToRetain; _numToRetain = numToRetain;
} }
void AudioMixerSlave::mix(const SharedNodePointer& node) { void AudioMixerWorker::mix(const SharedNodePointer& node) {
// check that the node is valid // check that the node is valid
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data == nullptr) { if (data == nullptr) {
@ -178,7 +178,7 @@ private:
}; };
void AudioMixerSlave::addStreams(Node& listener, AudioMixerClientData& listenerData) { void AudioMixerWorker::addStreams(Node& listener, AudioMixerClientData& listenerData) {
auto& ignoredNodeIDs = listener.getIgnoredNodeIDs(); auto& ignoredNodeIDs = listener.getIgnoredNodeIDs();
auto& ignoringNodeIDs = listenerData.getIgnoringNodeIDs(); auto& ignoringNodeIDs = listenerData.getIgnoringNodeIDs();
@ -229,7 +229,7 @@ void AudioMixerSlave::addStreams(Node& listener, AudioMixerClientData& listenerD
} }
} }
bool shouldBeRemoved(const MixableStream& stream, const AudioMixerSlave::SharedData& sharedData) { bool shouldBeRemoved(const MixableStream& stream, const AudioMixerWorker::SharedData& sharedData) {
return (contains(sharedData.removedNodes, stream.nodeStreamID.nodeLocalID) || return (contains(sharedData.removedNodes, stream.nodeStreamID.nodeLocalID) ||
contains(sharedData.removedStreams, stream.nodeStreamID)); contains(sharedData.removedStreams, stream.nodeStreamID));
}; };
@ -306,7 +306,7 @@ float approximateVolume(const MixableStream& stream, const AvatarAudioStream* li
return stream.positionalStream->getLastPopOutputTrailingLoudness() * gain; return stream.positionalStream->getLastPopOutputTrailingLoudness() * gain;
}; };
bool AudioMixerSlave::prepareMix(const SharedNodePointer& listener) { bool AudioMixerWorker::prepareMix(const SharedNodePointer& listener) {
AvatarAudioStream* listenerAudioStream = static_cast<AudioMixerClientData*>(listener->getLinkedData())->getAvatarAudioStream(); AvatarAudioStream* listenerAudioStream = static_cast<AudioMixerClientData*>(listener->getLinkedData())->getAvatarAudioStream();
AudioMixerClientData* listenerData = static_cast<AudioMixerClientData*>(listener->getLinkedData()); AudioMixerClientData* listenerData = static_cast<AudioMixerClientData*>(listener->getLinkedData());
@ -489,7 +489,7 @@ bool AudioMixerSlave::prepareMix(const SharedNodePointer& listener) {
return hasAudio; return hasAudio;
} }
void AudioMixerSlave::addStream(AudioMixerClientData::MixableStream& mixableStream, void AudioMixerWorker::addStream(AudioMixerClientData::MixableStream& mixableStream,
AvatarAudioStream& listeningNodeStream, AvatarAudioStream& listeningNodeStream,
float masterAvatarGain, float masterAvatarGain,
float masterInjectorGain, float masterInjectorGain,
@ -575,7 +575,7 @@ void AudioMixerSlave::addStream(AudioMixerClientData::MixableStream& mixableStre
} }
} }
void AudioMixerSlave::updateHRTFParameters(AudioMixerClientData::MixableStream& mixableStream, void AudioMixerWorker::updateHRTFParameters(AudioMixerClientData::MixableStream& mixableStream,
AvatarAudioStream& listeningNodeStream, AvatarAudioStream& listeningNodeStream,
float masterAvatarGain, float masterAvatarGain,
float masterInjectorGain) { float masterInjectorGain) {
@ -596,7 +596,7 @@ void AudioMixerSlave::updateHRTFParameters(AudioMixerClientData::MixableStream&
++stats.hrtfUpdates; ++stats.hrtfUpdates;
} }
void AudioMixerSlave::resetHRTFState(AudioMixerClientData::MixableStream& mixableStream) { void AudioMixerWorker::resetHRTFState(AudioMixerClientData::MixableStream& mixableStream) {
mixableStream.hrtf->reset(); mixableStream.hrtf->reset();
++stats.hrtfResets; ++stats.hrtfResets;
} }

View file

@ -1,5 +1,5 @@
// //
// AudioMixerSlave.h // AudioMixerWorker.h
// assignment-client/src/audio // assignment-client/src/audio
// //
// Created by Zach Pomerantz on 11/22/16. // Created by Zach Pomerantz on 11/22/16.
@ -9,8 +9,8 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#ifndef hifi_AudioMixerSlave_h #ifndef hifi_AudioMixerWorker_h
#define hifi_AudioMixerSlave_h #define hifi_AudioMixerWorker_h
#if !defined(Q_MOC_RUN) #if !defined(Q_MOC_RUN)
// Work around https://bugreports.qt.io/browse/QTBUG-80990 // Work around https://bugreports.qt.io/browse/QTBUG-80990
@ -31,7 +31,7 @@
class AvatarAudioStream; class AvatarAudioStream;
class AudioHRTF; class AudioHRTF;
class AudioMixerSlave { class AudioMixerWorker {
public: public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
@ -41,7 +41,7 @@ public:
std::vector<NodeIDStreamID> removedStreams; std::vector<NodeIDStreamID> removedStreams;
}; };
AudioMixerSlave(SharedData& sharedData) : _sharedData(sharedData) {}; AudioMixerWorker(SharedData& sharedData) : _sharedData(sharedData) {};
// process packets for a given node (requires no configuration) // process packets for a given node (requires no configuration)
void processPackets(const SharedNodePointer& node); void processPackets(const SharedNodePointer& node);
@ -84,4 +84,4 @@ private:
SharedData& _sharedData; SharedData& _sharedData;
}; };
#endif // hifi_AudioMixerSlave_h #endif // hifi_AudioMixerWorker_h

View file

@ -1,5 +1,5 @@
// //
// AudioMixerSlavePool.cpp // AudioMixerWorkerPool.cpp
// assignment-client/src/audio // assignment-client/src/audio
// //
// Created by Zach Pomerantz on 11/16/2016. // Created by Zach Pomerantz on 11/16/2016.
@ -9,7 +9,7 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#include "AudioMixerSlavePool.h" #include "AudioMixerWorkerPool.h"
#include <QObject> #include <QObject>
@ -18,7 +18,7 @@
#include <ThreadHelpers.h> #include <ThreadHelpers.h>
void AudioMixerSlaveThread::run() { void AudioMixerWorkerThread::run() {
while (true) { while (true) {
wait(); wait();
@ -36,10 +36,10 @@ void AudioMixerSlaveThread::run() {
} }
} }
void AudioMixerSlaveThread::wait() { void AudioMixerWorkerThread::wait() {
{ {
Lock lock(_pool._mutex); Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] { _pool._workerCondition.wait(lock, [&] {
assert(_pool._numStarted <= _pool._numThreads); assert(_pool._numStarted <= _pool._numThreads);
return _pool._numStarted != _pool._numThreads; return _pool._numStarted != _pool._numThreads;
}); });
@ -52,7 +52,7 @@ void AudioMixerSlaveThread::wait() {
_function = _pool._function; _function = _pool._function;
} }
void AudioMixerSlaveThread::notify(bool stopping) { void AudioMixerWorkerThread::notify(bool stopping) {
{ {
Lock lock(_pool._mutex); Lock lock(_pool._mutex);
assert(_pool._numFinished < _pool._numThreads); assert(_pool._numFinished < _pool._numThreads);
@ -64,26 +64,26 @@ void AudioMixerSlaveThread::notify(bool stopping) {
_pool._poolCondition.notify_one(); _pool._poolCondition.notify_one();
} }
bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) { bool AudioMixerWorkerThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node); return _pool._queue.try_pop(node);
} }
void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { void AudioMixerWorkerPool::processPackets(ConstIter begin, ConstIter end) {
_function = &AudioMixerSlave::processPackets; _function = &AudioMixerWorker::processPackets;
_configure = [](AudioMixerSlave& slave) {}; _configure = [](AudioMixerWorker& worker) {};
run(begin, end); run(begin, end);
} }
void AudioMixerSlavePool::mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) { void AudioMixerWorkerPool::mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain) {
_function = &AudioMixerSlave::mix; _function = &AudioMixerWorker::mix;
_configure = [=](AudioMixerSlave& slave) { _configure = [=](AudioMixerWorker& worker) {
slave.configureMix(_begin, _end, frame, numToRetain); worker.configureMix(_begin, _end, frame, numToRetain);
}; };
run(begin, end); run(begin, end);
} }
void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) { void AudioMixerWorkerPool::run(ConstIter begin, ConstIter end) {
_begin = begin; _begin = begin;
_end = end; _end = end;
@ -97,7 +97,7 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
// run // run
_numStarted = _numFinished = 0; _numStarted = _numFinished = 0;
_slaveCondition.notify_all(); _workerCondition.notify_all();
// wait // wait
_poolCondition.wait(lock, [&] { _poolCondition.wait(lock, [&] {
@ -111,17 +111,17 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
assert(_queue.empty()); assert(_queue.empty());
} }
void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) { void AudioMixerWorkerPool::each(std::function<void(AudioMixerWorker& worker)> functor) {
for (auto& slave : _slaves) { for (auto& worker : _workers) {
functor(*slave.get()); functor(*worker.get());
} }
} }
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
void AudioMixerSlavePool::queueStats(QJsonObject& stats) { void AudioMixerWorkerPool::queueStats(QJsonObject& stats) {
unsigned i = 0; unsigned i = 0;
for (auto& slave : _slaves) { for (auto& worker : _workers) {
int queueSize = ::hifi::qt::getEventQueueSize(slave.get()); int queueSize = ::hifi::qt::getEventQueueSize(worker.get());
QString queueName = QString("audio_thread_event_queue_%1").arg(i); QString queueName = QString("audio_thread_event_queue_%1").arg(i);
stats[queueName] = queueSize; stats[queueName] = queueSize;
@ -130,7 +130,7 @@ void AudioMixerSlavePool::queueStats(QJsonObject& stats) {
} }
#endif // DEBUG_EVENT_QUEUE #endif // DEBUG_EVENT_QUEUE
void AudioMixerSlavePool::setNumThreads(int numThreads) { void AudioMixerWorkerPool::setNumThreads(int numThreads) {
// clamp to allowed size // clamp to allowed size
{ {
int maxThreads = QThread::idealThreadCount(); int maxThreads = QThread::idealThreadCount();
@ -150,36 +150,36 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) {
resize(numThreads); resize(numThreads);
} }
void AudioMixerSlavePool::resize(int numThreads) { void AudioMixerWorkerPool::resize(int numThreads) {
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_workers.size());
qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);
Lock lock(_mutex); Lock lock(_mutex);
if (numThreads > _numThreads) { if (numThreads > _numThreads) {
// start new slaves // start new workers
for (int i = 0; i < numThreads - _numThreads; ++i) { for (int i = 0; i < numThreads - _numThreads; ++i) {
auto slave = new AudioMixerSlaveThread(*this, _workerSharedData); auto worker = new AudioMixerWorkerThread(*this, _workerSharedData);
QObject::connect(slave, &QThread::started, [] { setThreadName("AudioMixerSlaveThread"); }); QObject::connect(worker, &QThread::started, [] { setThreadName("AudioMixerWorkerThread"); });
slave->start(); worker->start();
_slaves.emplace_back(slave); _workers.emplace_back(worker);
} }
} else if (numThreads < _numThreads) { } else if (numThreads < _numThreads) {
auto extraBegin = _slaves.begin() + numThreads; auto extraBegin = _workers.begin() + numThreads;
// mark slaves to stop... // mark workers to stop...
auto slave = extraBegin; auto worker = extraBegin;
while (slave != _slaves.end()) { while (worker != _workers.end()) {
(*slave)->_stop = true; (*worker)->_stop = true;
++slave; ++worker;
} }
// ...cycle them until they do stop... // ...cycle them until they do stop...
_numStopped = 0; _numStopped = 0;
while (_numStopped != (_numThreads - numThreads)) { while (_numStopped != (_numThreads - numThreads)) {
_numStarted = _numFinished = _numStopped; _numStarted = _numFinished = _numStopped;
_slaveCondition.notify_all(); _workerCondition.notify_all();
_poolCondition.wait(lock, [&] { _poolCondition.wait(lock, [&] {
assert(_numFinished <= _numThreads); assert(_numFinished <= _numThreads);
return _numFinished == _numThreads; return _numFinished == _numThreads;
@ -187,18 +187,18 @@ void AudioMixerSlavePool::resize(int numThreads) {
} }
// ...wait for threads to finish... // ...wait for threads to finish...
slave = extraBegin; worker = extraBegin;
while (slave != _slaves.end()) { while (worker != _workers.end()) {
QThread* thread = reinterpret_cast<QThread*>(slave->get()); QThread* thread = reinterpret_cast<QThread*>(worker->get());
static const int MAX_THREAD_WAIT_TIME = 10; static const int MAX_THREAD_WAIT_TIME = 10;
thread->wait(MAX_THREAD_WAIT_TIME); thread->wait(MAX_THREAD_WAIT_TIME);
++slave; ++worker;
} }
// ...and erase them // ...and erase them
_slaves.erase(extraBegin, _slaves.end()); _workers.erase(extraBegin, _workers.end());
} }
_numThreads = _numStarted = _numFinished = numThreads; _numThreads = _numStarted = _numFinished = numThreads;
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_workers.size());
} }

View file

@ -1,5 +1,5 @@
// //
// AudioMixerSlavePool.h // AudioMixerWorkerPool.h
// assignment-client/src/audio // assignment-client/src/audio
// //
// Created by Zach Pomerantz on 11/16/2016. // Created by Zach Pomerantz on 11/16/2016.
@ -9,8 +9,8 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#ifndef hifi_AudioMixerSlavePool_h #ifndef hifi_AudioMixerWorkerPool_h
#define hifi_AudioMixerSlavePool_h #define hifi_AudioMixerWorkerPool_h
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
@ -20,37 +20,37 @@
#include <shared/QtHelpers.h> #include <shared/QtHelpers.h>
#include <TBBHelpers.h> #include <TBBHelpers.h>
#include "AudioMixerSlave.h" #include "AudioMixerWorker.h"
class AudioMixerSlavePool; class AudioMixerWorkerPool;
class AudioMixerSlaveThread : public QThread, public AudioMixerSlave { class AudioMixerWorkerThread : public QThread, public AudioMixerWorker {
Q_OBJECT Q_OBJECT
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
using Mutex = std::mutex; using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>; using Lock = std::unique_lock<Mutex>;
public: public:
AudioMixerSlaveThread(AudioMixerSlavePool& pool, AudioMixerSlave::SharedData& sharedData) AudioMixerWorkerThread(AudioMixerWorkerPool& pool, AudioMixerWorker::SharedData& sharedData)
: AudioMixerSlave(sharedData), _pool(pool) {} : AudioMixerWorker(sharedData), _pool(pool) {}
void run() override final; void run() override final;
private: private:
friend class AudioMixerSlavePool; friend class AudioMixerWorkerPool;
void wait(); void wait();
void notify(bool stopping); void notify(bool stopping);
bool try_pop(SharedNodePointer& node); bool try_pop(SharedNodePointer& node);
AudioMixerSlavePool& _pool; AudioMixerWorkerPool& _pool;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; void (AudioMixerWorker::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false }; bool _stop { false };
}; };
// Slave pool for audio mixers // Worker pool for audio mixers
// AudioMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. // AudioMixerWorkerPool is not thread-safe! It should be instantiated and used from a single thread.
class AudioMixerSlavePool { class AudioMixerWorkerPool {
using Queue = tbb::concurrent_queue<SharedNodePointer>; using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex; using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>; using Lock = std::unique_lock<Mutex>;
@ -59,18 +59,18 @@ class AudioMixerSlavePool {
public: public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
AudioMixerSlavePool(AudioMixerSlave::SharedData& sharedData, int numThreads = QThread::idealThreadCount()) AudioMixerWorkerPool(AudioMixerWorker::SharedData& sharedData, int numThreads = QThread::idealThreadCount())
: _workerSharedData(sharedData) { setNumThreads(numThreads); } : _workerSharedData(sharedData) { setNumThreads(numThreads); }
~AudioMixerSlavePool() { resize(0); } ~AudioMixerWorkerPool() { resize(0); }
// process packets on slave threads // process packets on worker threads
void processPackets(ConstIter begin, ConstIter end); void processPackets(ConstIter begin, ConstIter end);
// mix on slave threads // mix on worker threads
void mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain); void mix(ConstIter begin, ConstIter end, unsigned int frame, int numToRetain);
// iterate over all slaves // iterate over all workers
void each(std::function<void(AudioMixerSlave& slave)> functor); void each(std::function<void(AudioMixerWorker& worker)> functor);
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
void queueStats(QJsonObject& stats); void queueStats(QJsonObject& stats);
@ -83,18 +83,18 @@ private:
void run(ConstIter begin, ConstIter end); void run(ConstIter begin, ConstIter end);
void resize(int numThreads); void resize(int numThreads);
std::vector<std::unique_ptr<AudioMixerSlaveThread>> _slaves; std::vector<std::unique_ptr<AudioMixerWorkerThread>> _workers;
friend void AudioMixerSlaveThread::wait(); friend void AudioMixerWorkerThread::wait();
friend void AudioMixerSlaveThread::notify(bool stopping); friend void AudioMixerWorkerThread::notify(bool stopping);
friend bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node); friend bool AudioMixerWorkerThread::try_pop(SharedNodePointer& node);
// synchronization state // synchronization state
Mutex _mutex; Mutex _mutex;
ConditionVariable _slaveCondition; ConditionVariable _workerCondition;
ConditionVariable _poolCondition; ConditionVariable _poolCondition;
void (AudioMixerSlave::*_function)(const SharedNodePointer& node); void (AudioMixerWorker::*_function)(const SharedNodePointer& node);
std::function<void(AudioMixerSlave&)> _configure; std::function<void(AudioMixerWorker&)> _configure;
int _numThreads { 0 }; int _numThreads { 0 };
int _numStarted { 0 }; // guarded by _mutex int _numStarted { 0 }; // guarded by _mutex
int _numFinished { 0 }; // guarded by _mutex int _numFinished { 0 }; // guarded by _mutex
@ -105,7 +105,7 @@ private:
ConstIter _begin; ConstIter _begin;
ConstIter _end; ConstIter _end;
AudioMixerSlave::SharedData& _workerSharedData; AudioMixerWorker::SharedData& _workerSharedData;
}; };
#endif // hifi_AudioMixerSlavePool_h #endif // hifi_AudioMixerWorkerPool_h

View file

@ -60,7 +60,7 @@ bool AvatarMixer::SessionDisplayName::operator<(const SessionDisplayName& rhs) c
AvatarMixer::AvatarMixer(ReceivedMessage& message) : AvatarMixer::AvatarMixer(ReceivedMessage& message) :
ThreadedAssignment(message), ThreadedAssignment(message),
_slavePool(&_slaveSharedData) _workerPool(&_workerSharedData)
{ {
DependencyManager::registerInheritance<EntityDynamicFactoryInterface, AssignmentDynamicFactory>(); DependencyManager::registerInheritance<EntityDynamicFactoryInterface, AssignmentDynamicFactory>();
DependencyManager::set<AssignmentDynamicFactory>(); DependencyManager::set<AssignmentDynamicFactory>();
@ -297,7 +297,7 @@ void AvatarMixer::start() {
auto end = usecTimestampNow(); auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsLockWaitElapsedTime += (end - start); _processQueuedAvatarDataPacketsLockWaitElapsedTime += (end - start);
_slavePool.processIncomingPackets(cbegin, cend); _workerPool.processIncomingPackets(cbegin, cend);
}, &lockWait, &nodeTransform, &functor); }, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow(); auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsElapsedTime += (end - start); _processQueuedAvatarDataPacketsElapsedTime += (end - start);
@ -333,7 +333,7 @@ void AvatarMixer::start() {
auto start = usecTimestampNow(); auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto start = usecTimestampNow(); auto start = usecTimestampNow();
_slavePool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode, _throttlingRatio); _workerPool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode, _throttlingRatio);
auto end = usecTimestampNow(); auto end = usecTimestampNow();
_broadcastAvatarDataInner += (end - start); _broadcastAvatarDataInner += (end - start);
}, &lockWait, &nodeTransform, &functor); }, &lockWait, &nodeTransform, &functor);
@ -761,14 +761,14 @@ void AvatarMixer::sendStatsPacket() {
QJsonObject statsObject; QJsonObject statsObject;
statsObject["broadcast_loop_rate"] = _loopRate.rate(); statsObject["broadcast_loop_rate"] = _loopRate.rate();
statsObject["threads"] = _slavePool.numThreads(); statsObject["threads"] = _workerPool.numThreads();
statsObject["trailing_mix_ratio"] = _trailingMixRatio; statsObject["trailing_mix_ratio"] = _trailingMixRatio;
statsObject["throttling_ratio"] = _throttlingRatio; statsObject["throttling_ratio"] = _throttlingRatio;
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
QJsonObject qtStats; QJsonObject qtStats;
_slavePool.queueStats(qtStats); _workerPool.queueStats(qtStats);
statsObject["avatar_thread_event_queue"] = qtStats; statsObject["avatar_thread_event_queue"] = qtStats;
#endif #endif
@ -821,41 +821,41 @@ void AvatarMixer::sendStatsPacket() {
statsObject["parallelTasks"] = parallelTasks; statsObject["parallelTasks"] = parallelTasks;
AvatarMixerSlaveStats aggregateStats; AvatarMixerWorkerStats aggregateStats;
// gather stats // gather stats
_slavePool.each([&](AvatarMixerSlave& slave) { _workerPool.each([&](AvatarMixerWorker& worker) {
AvatarMixerSlaveStats stats; AvatarMixerWorkerStats stats;
slave.harvestStats(stats); worker.harvestStats(stats);
aggregateStats += stats; aggregateStats += stats;
}); });
QJsonObject slavesAggregatObject; QJsonObject workersAggregatObject;
slavesAggregatObject["received_1_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed); workersAggregatObject["received_1_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed);
slavesAggregatObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(aggregateStats.nodesBroadcastedTo); workersAggregatObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(aggregateStats.nodesBroadcastedTo);
float averageNodes = ((float)aggregateStats.nodesBroadcastedTo / (float)tightLoopFrames); float averageNodes = ((float)aggregateStats.nodesBroadcastedTo / (float)tightLoopFrames);
float averageOthersIncluded = averageNodes ? aggregateStats.numOthersIncluded / averageNodes : 0.0f; float averageOthersIncluded = averageNodes ? aggregateStats.numOthersIncluded / averageNodes : 0.0f;
slavesAggregatObject["sent_2_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded); workersAggregatObject["sent_2_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded);
float averageOverBudgetAvatars = averageNodes ? aggregateStats.overBudgetAvatars / averageNodes : 0.0f; float averageOverBudgetAvatars = averageNodes ? aggregateStats.overBudgetAvatars / averageNodes : 0.0f;
slavesAggregatObject["sent_3_averageOverBudgetAvatars"] = TIGHT_LOOP_STAT(averageOverBudgetAvatars); workersAggregatObject["sent_3_averageOverBudgetAvatars"] = TIGHT_LOOP_STAT(averageOverBudgetAvatars);
slavesAggregatObject["sent_4_averageDataBytes"] = TIGHT_LOOP_STAT(aggregateStats.numDataBytesSent); workersAggregatObject["sent_4_averageDataBytes"] = TIGHT_LOOP_STAT(aggregateStats.numDataBytesSent);
slavesAggregatObject["sent_5_averageTraitsBytes"] = TIGHT_LOOP_STAT(aggregateStats.numTraitsBytesSent); workersAggregatObject["sent_5_averageTraitsBytes"] = TIGHT_LOOP_STAT(aggregateStats.numTraitsBytesSent);
slavesAggregatObject["sent_6_averageIdentityBytes"] = TIGHT_LOOP_STAT(aggregateStats.numIdentityBytesSent); workersAggregatObject["sent_6_averageIdentityBytes"] = TIGHT_LOOP_STAT(aggregateStats.numIdentityBytesSent);
slavesAggregatObject["sent_7_averageHeroAvatars"] = TIGHT_LOOP_STAT(aggregateStats.numHeroesIncluded); workersAggregatObject["sent_7_averageHeroAvatars"] = TIGHT_LOOP_STAT(aggregateStats.numHeroesIncluded);
slavesAggregatObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.processIncomingPacketsElapsedTime); workersAggregatObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.processIncomingPacketsElapsedTime);
slavesAggregatObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.ignoreCalculationElapsedTime); workersAggregatObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.ignoreCalculationElapsedTime);
slavesAggregatObject["timing_3_toByteArray"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.toByteArrayElapsedTime); workersAggregatObject["timing_3_toByteArray"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.toByteArrayElapsedTime);
slavesAggregatObject["timing_4_avatarDataPacking"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.avatarDataPackingElapsedTime); workersAggregatObject["timing_4_avatarDataPacking"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.avatarDataPackingElapsedTime);
slavesAggregatObject["timing_5_packetSending"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.packetSendingElapsedTime); workersAggregatObject["timing_5_packetSending"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.packetSendingElapsedTime);
slavesAggregatObject["timing_6_jobElapsedTime"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.jobElapsedTime); workersAggregatObject["timing_6_jobElapsedTime"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.jobElapsedTime);
statsObject["slaves_aggregate (per frame)"] = slavesAggregatObject; statsObject["workers_aggregate (per frame)"] = workersAggregatObject;
_handleViewFrustumPacketElapsedTime = 0; _handleViewFrustumPacketElapsedTime = 0;
_handleAvatarIdentityPacketElapsedTime = 0; _handleAvatarIdentityPacketElapsedTime = 0;
@ -1016,9 +1016,9 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
numThreads = 1; numThreads = 1;
} }
qCDebug(avatars) << "Avatar mixer will use specified number of threads:" << numThreads; qCDebug(avatars) << "Avatar mixer will use specified number of threads:" << numThreads;
_slavePool.setNumThreads(numThreads); _workerPool.setNumThreads(numThreads);
} else { } else {
qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads."; qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _workerPool.numThreads() << "threads.";
} }
{ {
@ -1035,7 +1035,7 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
static const QString PRIORITY_FRACTION_KEY = "priority_fraction"; static const QString PRIORITY_FRACTION_KEY = "priority_fraction";
if (avatarMixerGroupObject.contains(PRIORITY_FRACTION_KEY)) { if (avatarMixerGroupObject.contains(PRIORITY_FRACTION_KEY)) {
float priorityFraction = float(avatarMixerGroupObject[PRIORITY_FRACTION_KEY].toDouble()); float priorityFraction = float(avatarMixerGroupObject[PRIORITY_FRACTION_KEY].toDouble());
_slavePool.setPriorityReservedFraction(std::min(std::max(0.0f, priorityFraction), 1.0f)); _workerPool.setPriorityReservedFraction(std::min(std::max(0.0f, priorityFraction), 1.0f));
qCDebug(avatars) << "Avatar mixer reserving" << priorityFraction << "of bandwidth for priority avatars"; qCDebug(avatars) << "Avatar mixer reserving" << priorityFraction << "of bandwidth for priority avatars";
} }
} }
@ -1059,22 +1059,22 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
<< "and a maximum avatar height of" << _domainMaximumHeight; << "and a maximum avatar height of" << _domainMaximumHeight;
static const QString AVATAR_WHITELIST_OPTION = "avatar_whitelist"; static const QString AVATAR_WHITELIST_OPTION = "avatar_whitelist";
_slaveSharedData.skeletonURLWhitelist = avatarMixerGroupObject[AVATAR_WHITELIST_OPTION] _workerSharedData.skeletonURLWhitelist = avatarMixerGroupObject[AVATAR_WHITELIST_OPTION]
.toString().split(',', Qt::KeepEmptyParts); .toString().split(',', Qt::KeepEmptyParts);
static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar"; static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar";
_slaveSharedData.skeletonReplacementURL = avatarMixerGroupObject[REPLACEMENT_AVATAR_OPTION] _workerSharedData.skeletonReplacementURL = avatarMixerGroupObject[REPLACEMENT_AVATAR_OPTION]
.toString(); .toString();
if (_slaveSharedData.skeletonURLWhitelist.count() == 1 && _slaveSharedData.skeletonURLWhitelist[0].isEmpty()) { if (_workerSharedData.skeletonURLWhitelist.count() == 1 && _workerSharedData.skeletonURLWhitelist[0].isEmpty()) {
// KeepEmptyParts above will parse "," as ["", ""] (which is ok), but "" as [""] (which is not ok). // KeepEmptyParts above will parse "," as ["", ""] (which is ok), but "" as [""] (which is not ok).
_slaveSharedData.skeletonURLWhitelist.clear(); _workerSharedData.skeletonURLWhitelist.clear();
} }
if (_slaveSharedData.skeletonURLWhitelist.isEmpty()) { if (_workerSharedData.skeletonURLWhitelist.isEmpty()) {
qCDebug(avatars) << "All avatars are allowed."; qCDebug(avatars) << "All avatars are allowed.";
} else { } else {
qCDebug(avatars) << "Avatars other than" << _slaveSharedData.skeletonURLWhitelist << "will be replaced by" << (_slaveSharedData.skeletonReplacementURL.isEmpty() ? "default" : _slaveSharedData.skeletonReplacementURL.toString()); qCDebug(avatars) << "Avatars other than" << _workerSharedData.skeletonURLWhitelist << "will be replaced by" << (_workerSharedData.skeletonReplacementURL.isEmpty() ? "default" : _workerSharedData.skeletonReplacementURL.toString());
} }
} }
@ -1099,7 +1099,7 @@ void AvatarMixer::setupEntityQuery() {
priorityZoneQuery["name"] = true; // Handy for debugging. priorityZoneQuery["name"] = true; // Handy for debugging.
_entityViewer.getOctreeQuery().setJSONParameters(priorityZoneQuery); _entityViewer.getOctreeQuery().setJSONParameters(priorityZoneQuery);
_slaveSharedData.entityTree = entityTree; _workerSharedData.entityTree = entityTree;
} }
void AvatarMixer::handleOctreePacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) { void AvatarMixer::handleOctreePacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {

View file

@ -26,7 +26,7 @@
#include "../entities/EntityTreeHeadlessViewer.h" #include "../entities/EntityTreeHeadlessViewer.h"
#include "AvatarMixerClientData.h" #include "AvatarMixerClientData.h"
#include "AvatarMixerSlavePool.h" #include "AvatarMixerWorkerPool.h"
/// Handles assignments of type AvatarMixer - distribution of avatar data to various clients /// Handles assignments of type AvatarMixer - distribution of avatar data to various clients
class AvatarMixer : public ThreadedAssignment { class AvatarMixer : public ThreadedAssignment {
@ -153,8 +153,8 @@ private:
RateCounter<> _loopRate; // this is the rate that the main thread tight loop runs RateCounter<> _loopRate; // this is the rate that the main thread tight loop runs
AvatarMixerSlavePool _slavePool; AvatarMixerWorkerPool _workerPool;
SlaveSharedData _slaveSharedData; WorkerSharedData _workerSharedData;
}; };
#endif // hifi_AvatarMixer_h #endif // hifi_AvatarMixer_h

View file

@ -21,7 +21,7 @@
#include "AvatarLogging.h" #include "AvatarLogging.h"
#include "AvatarMixerSlave.h" #include "AvatarMixerWorker.h"
AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) { AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) {
// in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID // in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID
@ -52,7 +52,7 @@ void AvatarMixerClientData::queuePacket(QSharedPointer<ReceivedMessage> message,
_packetQueue.push(message); _packetQueue.push(message);
} }
int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData) { int AvatarMixerClientData::processPackets(const WorkerSharedData& workerSharedData) {
int packetsProcessed = 0; int packetsProcessed = 0;
SharedNodePointer node = _packetQueue.node; SharedNodePointer node = _packetQueue.node;
assert(_packetQueue.empty() || node); assert(_packetQueue.empty() || node);
@ -65,10 +65,10 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData
switch (packet->getType()) { switch (packet->getType()) {
case PacketType::AvatarData: case PacketType::AvatarData:
parseData(*packet, slaveSharedData); parseData(*packet, workerSharedData);
break; break;
case PacketType::SetAvatarTraits: case PacketType::SetAvatarTraits:
processSetTraitsMessage(*packet, slaveSharedData, *node); processSetTraitsMessage(*packet, workerSharedData, *node);
break; break;
case PacketType::BulkAvatarTraitsAck: case PacketType::BulkAvatarTraitsAck:
processBulkAvatarTraitsAckMessage(*packet); processBulkAvatarTraitsAckMessage(*packet);
@ -127,7 +127,7 @@ struct FindContainingZone {
} // namespace } // namespace
int AvatarMixerClientData::parseData(ReceivedMessage& message, const SlaveSharedData& slaveSharedData) { int AvatarMixerClientData::parseData(ReceivedMessage& message, const WorkerSharedData& workerSharedData) {
// pull the sequence number from the data first // pull the sequence number from the data first
uint16_t sequenceNumber; uint16_t sequenceNumber;
@ -150,7 +150,7 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message, const SlaveShared
auto newPosition = _avatar->getClientGlobalPosition(); auto newPosition = _avatar->getClientGlobalPosition();
if (newPosition != oldPosition || _avatar->getNeedsHeroCheck()) { if (newPosition != oldPosition || _avatar->getNeedsHeroCheck()) {
EntityTree& entityTree = *slaveSharedData.entityTree; EntityTree& entityTree = *workerSharedData.entityTree;
FindContainingZone findContainingZone{ newPosition }; FindContainingZone findContainingZone{ newPosition };
entityTree.recurseTreeWithOperation(&FindContainingZone::operation, &findContainingZone); entityTree.recurseTreeWithOperation(&FindContainingZone::operation, &findContainingZone);
bool currentlyHasPriority = findContainingZone.isInPriorityZone; bool currentlyHasPriority = findContainingZone.isInPriorityZone;
@ -176,7 +176,7 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message, const SlaveShared
} }
void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
const SlaveSharedData& slaveSharedData, const WorkerSharedData& workerSharedData,
Node& sendingNode) { Node& sendingNode) {
// Trying to read more bytes than available, bail // Trying to read more bytes than available, bail
if (message.getBytesLeftToRead() < qint64(sizeof(AvatarTraits::TraitVersion))) { if (message.getBytesLeftToRead() < qint64(sizeof(AvatarTraits::TraitVersion))) {
@ -222,7 +222,7 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
_lastReceivedTraitVersions[traitType] = packetTraitVersion; _lastReceivedTraitVersions[traitType] = packetTraitVersion;
if (traitType == AvatarTraits::SkeletonModelURL) { if (traitType == AvatarTraits::SkeletonModelURL) {
// special handling for skeleton model URL, since we need to make sure it is in the whitelist // special handling for skeleton model URL, since we need to make sure it is in the whitelist
checkSkeletonURLAgainstWhitelist(slaveSharedData, sendingNode, packetTraitVersion); checkSkeletonURLAgainstWhitelist(workerSharedData, sendingNode, packetTraitVersion);
} }
anyTraitsChanged = true; anyTraitsChanged = true;
@ -366,10 +366,10 @@ void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& m
} }
} }
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const WorkerSharedData& workerSharedData,
Node& sendingNode, Node& sendingNode,
AvatarTraits::TraitVersion traitVersion) { AvatarTraits::TraitVersion traitVersion) {
const auto& whitelist = slaveSharedData.skeletonURLWhitelist; const auto& whitelist = workerSharedData.skeletonURLWhitelist;
if (!whitelist.isEmpty()) { if (!whitelist.isEmpty()) {
bool inWhitelist = false; bool inWhitelist = false;
@ -391,11 +391,11 @@ void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedDa
if (!inWhitelist) { if (!inWhitelist) {
// make sure we're not unecessarily overriding the default avatar with the default avatar // make sure we're not unecessarily overriding the default avatar with the default avatar
if (_avatar->getWireSafeSkeletonModelURL() != slaveSharedData.skeletonReplacementURL) { if (_avatar->getWireSafeSkeletonModelURL() != workerSharedData.skeletonReplacementURL) {
// we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change // we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change
qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement" qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement"
<< slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); << workerSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID();
_avatar->setSkeletonModelURL(slaveSharedData.skeletonReplacementURL); _avatar->setSkeletonModelURL(workerSharedData.skeletonReplacementURL);
auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true); auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true);

View file

@ -36,7 +36,7 @@ const QString OUTBOUND_AVATAR_DATA_STATS_KEY = "outbound_av_data_kbps";
const QString OUTBOUND_AVATAR_TRAITS_STATS_KEY = "outbound_av_traits_kbps"; const QString OUTBOUND_AVATAR_TRAITS_STATS_KEY = "outbound_av_traits_kbps";
const QString INBOUND_AVATAR_DATA_STATS_KEY = "inbound_av_data_kbps"; const QString INBOUND_AVATAR_DATA_STATS_KEY = "inbound_av_data_kbps";
struct SlaveSharedData; struct WorkerSharedData;
class AvatarMixerClientData : public NodeData { class AvatarMixerClientData : public NodeData {
Q_OBJECT Q_OBJECT
@ -47,7 +47,7 @@ public:
using PerNodeTraitVersions = std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions>; using PerNodeTraitVersions = std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions>;
using NodeData::parseData; // Avoid clang warning about hiding. using NodeData::parseData; // Avoid clang warning about hiding.
int parseData(ReceivedMessage& message, const SlaveSharedData& SlaveSharedData); int parseData(ReceivedMessage& message, const WorkerSharedData& WorkerSharedData);
MixerAvatar& getAvatar() { return *_avatar; } MixerAvatar& getAvatar() { return *_avatar; }
const MixerAvatar& getAvatar() const { return *_avatar; } const MixerAvatar& getAvatar() const { return *_avatar; }
const MixerAvatar* getConstAvatarData() const { return _avatar.get(); } const MixerAvatar* getConstAvatarData() const { return _avatar.get(); }
@ -130,12 +130,12 @@ public:
QVector<JointData>& getLastOtherAvatarSentJoints(NLPacket::LocalID otherAvatar) { return _lastOtherAvatarSentJoints[otherAvatar]; } QVector<JointData>& getLastOtherAvatarSentJoints(NLPacket::LocalID otherAvatar) { return _lastOtherAvatarSentJoints[otherAvatar]; }
void queuePacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node); void queuePacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node);
int processPackets(const SlaveSharedData& slaveSharedData); // returns number of packets processed int processPackets(const WorkerSharedData& workerSharedData); // returns number of packets processed
void processSetTraitsMessage(ReceivedMessage& message, const SlaveSharedData& slaveSharedData, Node& sendingNode); void processSetTraitsMessage(ReceivedMessage& message, const WorkerSharedData& workerSharedData, Node& sendingNode);
void emulateDeleteEntitiesTraitsMessage(const QList<QUuid>& avatarEntityIDs); void emulateDeleteEntitiesTraitsMessage(const QList<QUuid>& avatarEntityIDs);
void processBulkAvatarTraitsAckMessage(ReceivedMessage& message); void processBulkAvatarTraitsAckMessage(ReceivedMessage& message);
void checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, Node& sendingNode, void checkSkeletonURLAgainstWhitelist(const WorkerSharedData& workerSharedData, Node& sendingNode,
AvatarTraits::TraitVersion traitVersion); AvatarTraits::TraitVersion traitVersion);
using TraitsCheckTimestamp = std::chrono::steady_clock::time_point; using TraitsCheckTimestamp = std::chrono::steady_clock::time_point;

View file

@ -1,5 +1,5 @@
// //
// AvatarMixerSlave.cpp // AvatarMixerWorker.cpp
// assignment-client/src/avatar // assignment-client/src/avatar
// //
// Created by Brad Hefta-Gaub on 2/14/2017. // Created by Brad Hefta-Gaub on 2/14/2017.
@ -9,7 +9,7 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#include "AvatarMixerSlave.h" #include "AvatarMixerWorker.h"
#include <algorithm> #include <algorithm>
#include <random> #include <random>
@ -36,12 +36,12 @@
namespace chrono = std::chrono; namespace chrono = std::chrono;
void AvatarMixerSlave::configure(ConstIter begin, ConstIter end) { void AvatarMixerWorker::configure(ConstIter begin, ConstIter end) {
_begin = begin; _begin = begin;
_end = end; _end = end;
} }
void AvatarMixerSlave::configureBroadcast(ConstIter begin, ConstIter end, void AvatarMixerWorker::configureBroadcast(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp, p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio, float maxKbpsPerNode, float throttlingRatio,
float priorityReservedFraction) { float priorityReservedFraction) {
@ -53,13 +53,13 @@ void AvatarMixerSlave::configureBroadcast(ConstIter begin, ConstIter end,
_avatarHeroFraction = priorityReservedFraction; _avatarHeroFraction = priorityReservedFraction;
} }
void AvatarMixerSlave::harvestStats(AvatarMixerSlaveStats& stats) { void AvatarMixerWorker::harvestStats(AvatarMixerWorkerStats& stats) {
stats = _stats; stats = _stats;
_stats.reset(); _stats.reset();
} }
void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) { void AvatarMixerWorker::processIncomingPackets(const SharedNodePointer& node) {
auto start = usecTimestampNow(); auto start = usecTimestampNow();
auto nodeData = dynamic_cast<AvatarMixerClientData*>(node->getLinkedData()); auto nodeData = dynamic_cast<AvatarMixerClientData*>(node->getLinkedData());
if (nodeData) { if (nodeData) {
@ -70,7 +70,7 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) {
_stats.processIncomingPacketsElapsedTime += (end - start); _stats.processIncomingPacketsElapsedTime += (end - start);
} }
int AvatarMixerSlave::sendIdentityPacket(NLPacketList& packetList, const AvatarMixerClientData* nodeData, const Node& destinationNode) { int AvatarMixerWorker::sendIdentityPacket(NLPacketList& packetList, const AvatarMixerClientData* nodeData, const Node& destinationNode) {
if (destinationNode.getType() == NodeType::Agent && !destinationNode.isUpstream()) { if (destinationNode.getType() == NodeType::Agent && !destinationNode.isUpstream()) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
@ -83,7 +83,7 @@ int AvatarMixerSlave::sendIdentityPacket(NLPacketList& packetList, const AvatarM
} }
} }
qint64 AvatarMixerSlave::addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData, qint64 AvatarMixerWorker::addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData, const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList, NLPacketList& traitsPacketList,
qint64 bytesWritten) { qint64 bytesWritten) {
@ -98,7 +98,7 @@ qint64 AvatarMixerSlave::addTraitsNodeHeader(AvatarMixerClientData* listeningNod
return bytesWritten; return bytesWritten;
} }
qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData, qint64 AvatarMixerWorker::addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData, const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList) { NLPacketList& traitsPacketList) {
@ -245,7 +245,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
return bytesWritten; return bytesWritten;
} }
int AvatarMixerSlave::sendReplicatedIdentityPacket(const Node& agentNode, const AvatarMixerClientData* nodeData, const Node& destinationNode) { int AvatarMixerWorker::sendReplicatedIdentityPacket(const Node& agentNode, const AvatarMixerClientData* nodeData, const Node& destinationNode) {
if (AvatarMixer::shouldReplicateTo(agentNode, destinationNode)) { if (AvatarMixer::shouldReplicateTo(agentNode, destinationNode)) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(true); QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(true);
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
@ -262,7 +262,7 @@ int AvatarMixerSlave::sendReplicatedIdentityPacket(const Node& agentNode, const
static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { void AvatarMixerWorker::broadcastAvatarData(const SharedNodePointer& node) {
quint64 start = usecTimestampNow(); quint64 start = usecTimestampNow();
if ((node->getType() == NodeType::Agent || node->getType() == NodeType::EntityScriptServer) && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) { if ((node->getType() == NodeType::Agent || node->getType() == NodeType::EntityScriptServer) && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) {
@ -311,7 +311,7 @@ namespace {
} // Close anonymous namespace. } // Close anonymous namespace.
void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) { void AvatarMixerWorker::broadcastAvatarDataToAgent(const SharedNodePointer& node) {
const Node* destinationNode = node.data(); const Node* destinationNode = node.data();
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
@ -680,7 +680,7 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
uint64_t REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US = 5 * 1000 * 1000; uint64_t REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US = 5 * 1000 * 1000;
void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) { void AvatarMixerWorker::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) {
_stats.downstreamMixersBroadcastedTo++; _stats.downstreamMixersBroadcastedTo++;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData()); AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());

View file

@ -1,5 +1,5 @@
// //
// AvatarMixerSlave.h // AvatarMixerWorker.h
// assignment-client/src/avatar // assignment-client/src/avatar
// //
// Created by Brad Hefta-Gaub on 2/14/2017. // Created by Brad Hefta-Gaub on 2/14/2017.
@ -9,14 +9,14 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#ifndef hifi_AvatarMixerSlave_h #ifndef hifi_AvatarMixerWorker_h
#define hifi_AvatarMixerSlave_h #define hifi_AvatarMixerWorker_h
#include <NodeList.h> #include <NodeList.h>
class AvatarMixerClientData; class AvatarMixerClientData;
class AvatarMixerSlaveStats { class AvatarMixerWorkerStats {
public: public:
int nodesProcessed { 0 }; int nodesProcessed { 0 };
int packetsProcessed { 0 }; int packetsProcessed { 0 };
@ -67,7 +67,7 @@ public:
jobElapsedTime = 0; jobElapsedTime = 0;
} }
AvatarMixerSlaveStats& operator+=(const AvatarMixerSlaveStats& rhs) { AvatarMixerWorkerStats& operator+=(const AvatarMixerWorkerStats& rhs) {
nodesProcessed += rhs.nodesProcessed; nodesProcessed += rhs.nodesProcessed;
packetsProcessed += rhs.packetsProcessed; packetsProcessed += rhs.packetsProcessed;
processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime; processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime;
@ -96,15 +96,15 @@ public:
class EntityTree; class EntityTree;
using EntityTreePointer = std::shared_ptr<EntityTree>; using EntityTreePointer = std::shared_ptr<EntityTree>;
struct SlaveSharedData { struct WorkerSharedData {
QStringList skeletonURLWhitelist; QStringList skeletonURLWhitelist;
QUrl skeletonReplacementURL; QUrl skeletonReplacementURL;
EntityTreePointer entityTree; EntityTreePointer entityTree;
}; };
class AvatarMixerSlave { class AvatarMixerWorker {
public: public:
AvatarMixerSlave(SlaveSharedData* sharedData) : _sharedData(sharedData) {}; AvatarMixerWorker(WorkerSharedData* sharedData) : _sharedData(sharedData) {};
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end); void configure(ConstIter begin, ConstIter end);
@ -116,7 +116,7 @@ public:
void processIncomingPackets(const SharedNodePointer& node); void processIncomingPackets(const SharedNodePointer& node);
void broadcastAvatarData(const SharedNodePointer& node); void broadcastAvatarData(const SharedNodePointer& node);
void harvestStats(AvatarMixerSlaveStats& stats); void harvestStats(AvatarMixerWorkerStats& stats);
private: private:
int sendIdentityPacket(NLPacketList& packet, const AvatarMixerClientData* nodeData, const Node& destinationNode); int sendIdentityPacket(NLPacketList& packet, const AvatarMixerClientData* nodeData, const Node& destinationNode);
@ -143,8 +143,8 @@ private:
float _throttlingRatio { 0.0f }; float _throttlingRatio { 0.0f };
float _avatarHeroFraction { 0.4f }; float _avatarHeroFraction { 0.4f };
AvatarMixerSlaveStats _stats; AvatarMixerWorkerStats _stats;
SlaveSharedData* _sharedData; WorkerSharedData* _sharedData;
}; };
#endif // hifi_AvatarMixerSlave_h #endif // hifi_AvatarMixerWorker_h

View file

@ -1,5 +1,5 @@
// //
// AvatarMixerSlavePool.cpp // AvatarMixerWorkerPool.cpp
// assignment-client/src/avatar // assignment-client/src/avatar
// //
// Created by Brad Hefta-Gaub on 2/14/2017. // Created by Brad Hefta-Gaub on 2/14/2017.
@ -9,12 +9,12 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#include "AvatarMixerSlavePool.h" #include "AvatarMixerWorkerPool.h"
#include <assert.h> #include <assert.h>
#include <algorithm> #include <algorithm>
void AvatarMixerSlaveThread::run() { void AvatarMixerWorkerThread::run() {
while (true) { while (true) {
wait(); wait();
@ -32,10 +32,10 @@ void AvatarMixerSlaveThread::run() {
} }
} }
void AvatarMixerSlaveThread::wait() { void AvatarMixerWorkerThread::wait() {
{ {
Lock lock(_pool._mutex); Lock lock(_pool._mutex);
_pool._slaveCondition.wait(lock, [&] { _pool._workerCondition.wait(lock, [&] {
assert(_pool._numStarted <= _pool._numThreads); assert(_pool._numStarted <= _pool._numThreads);
return _pool._numStarted != _pool._numThreads; return _pool._numStarted != _pool._numThreads;
}); });
@ -47,7 +47,7 @@ void AvatarMixerSlaveThread::wait() {
_function = _pool._function; _function = _pool._function;
} }
void AvatarMixerSlaveThread::notify(bool stopping) { void AvatarMixerWorkerThread::notify(bool stopping) {
{ {
Lock lock(_pool._mutex); Lock lock(_pool._mutex);
assert(_pool._numFinished < _pool._numThreads); assert(_pool._numFinished < _pool._numThreads);
@ -59,30 +59,30 @@ void AvatarMixerSlaveThread::notify(bool stopping) {
_pool._poolCondition.notify_one(); _pool._poolCondition.notify_one();
} }
bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) { bool AvatarMixerWorkerThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node); return _pool._queue.try_pop(node);
} }
void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { void AvatarMixerWorkerPool::processIncomingPackets(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::processIncomingPackets; _function = &AvatarMixerWorker::processIncomingPackets;
_configure = [=](AvatarMixerSlave& slave) { _configure = [=](AvatarMixerWorker& worker) {
slave.configure(begin, end); worker.configure(begin, end);
}; };
run(begin, end); run(begin, end);
} }
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, void AvatarMixerWorkerPool::broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp, p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio) { float maxKbpsPerNode, float throttlingRatio) {
_function = &AvatarMixerSlave::broadcastAvatarData; _function = &AvatarMixerWorker::broadcastAvatarData;
_configure = [=](AvatarMixerSlave& slave) { _configure = [=](AvatarMixerWorker& worker) {
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio, worker.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio,
_priorityReservedFraction); _priorityReservedFraction);
}; };
run(begin, end); run(begin, end);
} }
void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) { void AvatarMixerWorkerPool::run(ConstIter begin, ConstIter end) {
_begin = begin; _begin = begin;
_end = end; _end = end;
@ -96,7 +96,7 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
// run // run
_numStarted = _numFinished = 0; _numStarted = _numFinished = 0;
_slaveCondition.notify_all(); _workerCondition.notify_all();
// wait // wait
_poolCondition.wait(lock, [&] { _poolCondition.wait(lock, [&] {
@ -111,17 +111,17 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
} }
void AvatarMixerSlavePool::each(std::function<void(AvatarMixerSlave& slave)> functor) { void AvatarMixerWorkerPool::each(std::function<void(AvatarMixerWorker& worker)> functor) {
for (auto& slave : _slaves) { for (auto& worker : _workers) {
functor(*slave.get()); functor(*worker.get());
} }
} }
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
void AvatarMixerSlavePool::queueStats(QJsonObject& stats) { void AvatarMixerWorkerPool::queueStats(QJsonObject& stats) {
unsigned i = 0; unsigned i = 0;
for (auto& slave : _slaves) { for (auto& worker : _workers) {
int queueSize = ::hifi::qt::getEventQueueSize(slave.get()); int queueSize = ::hifi::qt::getEventQueueSize(worker.get());
QString queueName = QString("avatar_thread_event_queue_%1").arg(i); QString queueName = QString("avatar_thread_event_queue_%1").arg(i);
stats[queueName] = queueSize; stats[queueName] = queueSize;
@ -130,7 +130,7 @@ void AvatarMixerSlavePool::queueStats(QJsonObject& stats) {
} }
#endif // DEBUG_EVENT_QUEUE #endif // DEBUG_EVENT_QUEUE
void AvatarMixerSlavePool::setNumThreads(int numThreads) { void AvatarMixerWorkerPool::setNumThreads(int numThreads) {
// clamp to allowed size // clamp to allowed size
{ {
int maxThreads = QThread::idealThreadCount(); int maxThreads = QThread::idealThreadCount();
@ -150,35 +150,35 @@ void AvatarMixerSlavePool::setNumThreads(int numThreads) {
resize(numThreads); resize(numThreads);
} }
void AvatarMixerSlavePool::resize(int numThreads) { void AvatarMixerWorkerPool::resize(int numThreads) {
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_workers.size());
qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);
Lock lock(_mutex); Lock lock(_mutex);
if (numThreads > _numThreads) { if (numThreads > _numThreads) {
// start new slaves // start new workers
for (int i = 0; i < numThreads - _numThreads; ++i) { for (int i = 0; i < numThreads - _numThreads; ++i) {
auto slave = new AvatarMixerSlaveThread(*this, _slaveSharedData); auto worker = new AvatarMixerWorkerThread(*this, _workerSharedData);
slave->start(); worker->start();
_slaves.emplace_back(slave); _workers.emplace_back(worker);
} }
} else if (numThreads < _numThreads) { } else if (numThreads < _numThreads) {
auto extraBegin = _slaves.begin() + numThreads; auto extraBegin = _workers.begin() + numThreads;
// mark slaves to stop... // mark workers to stop...
auto slave = extraBegin; auto worker = extraBegin;
while (slave != _slaves.end()) { while (worker != _workers.end()) {
(*slave)->_stop = true; (*worker)->_stop = true;
++slave; ++worker;
} }
// ...cycle them until they do stop... // ...cycle them until they do stop...
_numStopped = 0; _numStopped = 0;
while (_numStopped != (_numThreads - numThreads)) { while (_numStopped != (_numThreads - numThreads)) {
_numStarted = _numFinished = _numStopped; _numStarted = _numFinished = _numStopped;
_slaveCondition.notify_all(); _workerCondition.notify_all();
_poolCondition.wait(lock, [&] { _poolCondition.wait(lock, [&] {
assert(_numFinished <= _numThreads); assert(_numFinished <= _numThreads);
return _numFinished == _numThreads; return _numFinished == _numThreads;
@ -186,18 +186,18 @@ void AvatarMixerSlavePool::resize(int numThreads) {
} }
// ...wait for threads to finish... // ...wait for threads to finish...
slave = extraBegin; worker = extraBegin;
while (slave != _slaves.end()) { while (worker != _workers.end()) {
QThread* thread = reinterpret_cast<QThread*>(slave->get()); QThread* thread = reinterpret_cast<QThread*>(worker->get());
static const int MAX_THREAD_WAIT_TIME = 10; static const int MAX_THREAD_WAIT_TIME = 10;
thread->wait(MAX_THREAD_WAIT_TIME); thread->wait(MAX_THREAD_WAIT_TIME);
++slave; ++worker;
} }
// ...and erase them // ...and erase them
_slaves.erase(extraBegin, _slaves.end()); _workers.erase(extraBegin, _workers.end());
} }
_numThreads = _numStarted = _numFinished = numThreads; _numThreads = _numStarted = _numFinished = numThreads;
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_workers.size());
} }

View file

@ -1,5 +1,5 @@
// //
// AvatarMixerSlavePool.h // AvatarMixerWorkerPool.h
// assignment-client/src/avatar // assignment-client/src/avatar
// //
// Created by Brad Hefta-Gaub on 2/14/2017. // Created by Brad Hefta-Gaub on 2/14/2017.
@ -9,8 +9,8 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
// //
#ifndef hifi_AvatarMixerSlavePool_h #ifndef hifi_AvatarMixerWorkerPool_h
#define hifi_AvatarMixerSlavePool_h #define hifi_AvatarMixerWorkerPool_h
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
@ -22,38 +22,38 @@
#include <NodeList.h> #include <NodeList.h>
#include <shared/QtHelpers.h> #include <shared/QtHelpers.h>
#include "AvatarMixerSlave.h" #include "AvatarMixerWorker.h"
class AvatarMixerSlavePool; class AvatarMixerWorkerPool;
class AvatarMixerSlaveThread : public QThread, public AvatarMixerSlave { class AvatarMixerWorkerThread : public QThread, public AvatarMixerWorker {
Q_OBJECT Q_OBJECT
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
using Mutex = std::mutex; using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>; using Lock = std::unique_lock<Mutex>;
public: public:
AvatarMixerSlaveThread(AvatarMixerSlavePool& pool, SlaveSharedData* slaveSharedData) : AvatarMixerWorkerThread(AvatarMixerWorkerPool& pool, WorkerSharedData* workerSharedData) :
AvatarMixerSlave(slaveSharedData), _pool(pool) {}; AvatarMixerWorker(workerSharedData), _pool(pool) {};
void run() override final; void run() override final;
private: private:
friend class AvatarMixerSlavePool; friend class AvatarMixerWorkerPool;
void wait(); void wait();
void notify(bool stopping); void notify(bool stopping);
bool try_pop(SharedNodePointer& node); bool try_pop(SharedNodePointer& node);
AvatarMixerSlavePool& _pool; AvatarMixerWorkerPool& _pool;
void (AvatarMixerSlave::*_function)(const SharedNodePointer& node) { nullptr }; void (AvatarMixerWorker::*_function)(const SharedNodePointer& node) { nullptr };
bool _stop { false }; bool _stop { false };
}; };
// Slave pool for avatar mixers // Worker pool for avatar mixers
// AvatarMixerSlavePool is not thread-safe! It should be instantiated and used from a single thread. // AvatarMixerWorkerPool is not thread-safe! It should be instantiated and used from a single thread.
class AvatarMixerSlavePool { class AvatarMixerWorkerPool {
using Queue = tbb::concurrent_queue<SharedNodePointer>; using Queue = tbb::concurrent_queue<SharedNodePointer>;
using Mutex = std::mutex; using Mutex = std::mutex;
using Lock = std::unique_lock<Mutex>; using Lock = std::unique_lock<Mutex>;
@ -62,17 +62,17 @@ class AvatarMixerSlavePool {
public: public:
using ConstIter = NodeList::const_iterator; using ConstIter = NodeList::const_iterator;
AvatarMixerSlavePool(SlaveSharedData* slaveSharedData, int numThreads = QThread::idealThreadCount()) : AvatarMixerWorkerPool(WorkerSharedData* workerSharedData, int numThreads = QThread::idealThreadCount()) :
_slaveSharedData(slaveSharedData) { setNumThreads(numThreads); } _workerSharedData(workerSharedData) { setNumThreads(numThreads); }
~AvatarMixerSlavePool() { resize(0); } ~AvatarMixerWorkerPool() { resize(0); }
// Jobs the slave pool can do... // Jobs the worker pool can do...
void processIncomingPackets(ConstIter begin, ConstIter end); void processIncomingPackets(ConstIter begin, ConstIter end);
void broadcastAvatarData(ConstIter begin, ConstIter end, void broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio); p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio);
// iterate over all slaves // iterate over all workers
void each(std::function<void(AvatarMixerSlave& slave)> functor); void each(std::function<void(AvatarMixerWorker& worker)> functor);
#ifdef DEBUG_EVENT_QUEUE #ifdef DEBUG_EVENT_QUEUE
void queueStats(QJsonObject& stats); void queueStats(QJsonObject& stats);
@ -88,18 +88,18 @@ private:
void run(ConstIter begin, ConstIter end); void run(ConstIter begin, ConstIter end);
void resize(int numThreads); void resize(int numThreads);
std::vector<std::unique_ptr<AvatarMixerSlaveThread>> _slaves; std::vector<std::unique_ptr<AvatarMixerWorkerThread>> _workers;
friend void AvatarMixerSlaveThread::wait(); friend void AvatarMixerWorkerThread::wait();
friend void AvatarMixerSlaveThread::notify(bool stopping); friend void AvatarMixerWorkerThread::notify(bool stopping);
friend bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node); friend bool AvatarMixerWorkerThread::try_pop(SharedNodePointer& node);
// synchronization state // synchronization state
Mutex _mutex; Mutex _mutex;
ConditionVariable _slaveCondition; ConditionVariable _workerCondition;
ConditionVariable _poolCondition; ConditionVariable _poolCondition;
void (AvatarMixerSlave::*_function)(const SharedNodePointer& node); void (AvatarMixerWorker::*_function)(const SharedNodePointer& node);
std::function<void(AvatarMixerSlave&)> _configure; std::function<void(AvatarMixerWorker&)> _configure;
// Set from Domain Settings: // Set from Domain Settings:
float _priorityReservedFraction { 0.4f }; float _priorityReservedFraction { 0.4f };
@ -114,7 +114,7 @@ private:
ConstIter _begin; ConstIter _begin;
ConstIter _end; ConstIter _end;
SlaveSharedData* _slaveSharedData; WorkerSharedData* _workerSharedData;
}; };
#endif // hifi_AvatarMixerSlavePool_h #endif // hifi_AvatarMixerWorkerPool_h

View file

@ -172,20 +172,20 @@ static const QMap<QString, DomainServerExporter::MetricType> TYPE_MAP {
{ "avatar_mixer_single_core_tasks_process_events" , DomainServerExporter::MetricType::Counter }, { "avatar_mixer_single_core_tasks_process_events" , DomainServerExporter::MetricType::Counter },
{ "avatar_mixer_single_core_tasks_queue_incoming_packet" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_single_core_tasks_queue_incoming_packet" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_single_core_tasks_send_stats" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_single_core_tasks_send_stats" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_received_1_nodes_processed" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_received_1_nodes_processed" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_1_nodes_broadcasted_to" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_1_nodes_broadcasted_to" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_2_average_others_included" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_2_average_others_included" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_3_average_over_budget_avatars" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_3_average_over_budget_avatars" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_4_average_data_bytes" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_4_average_data_bytes" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_5_average_traits_bytes" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_5_average_traits_bytes" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_6_average_identity_bytes" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_6_average_identity_bytes" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_sent_7_average_hero_avatars" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_sent_7_average_hero_avatars" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_1_process_incoming_packets" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_1_process_incoming_packets" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_2_ignore_calculation" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_2_ignore_calculation" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_3_to_byte_array" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_3_to_byte_array" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_4_avatar_data_packing" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_4_avatar_data_packing" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_5_packet_sending" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_5_packet_sending" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_slaves_aggregate_per_frame_timing_6_job_elapsed_time" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_workers_aggregate_per_frame_timing_6_job_elapsed_time" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_threads" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_threads" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_throttling_ratio" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_throttling_ratio" , DomainServerExporter::MetricType::Gauge },
{ "avatar_mixer_trailing_mix_ratio" , DomainServerExporter::MetricType::Gauge }, { "avatar_mixer_trailing_mix_ratio" , DomainServerExporter::MetricType::Gauge },

View file

@ -538,7 +538,7 @@ void AnimInverseKinematics::solveTargetWithCCD(const AnimContext& context, const
} }
} }
} else if (targetType == IKTarget::Type::HmdHead) { } else if (targetType == IKTarget::Type::HmdHead) {
// An HmdHead target slaves the orientation of the end-effector by distributing rotation // An HmdHead target workers the orientation of the end-effector by distributing rotation
// deltas up the hierarchy. Its target position is enforced later (by shifting the hips). // deltas up the hierarchy. Its target position is enforced later (by shifting the hips).
deltaRotation = target.getRotation() * glm::inverse(tipOrientation); deltaRotation = target.getRotation() * glm::inverse(tipOrientation);
const float ANGLE_DISTRIBUTION_FACTOR = 0.45f; const float ANGLE_DISTRIBUTION_FACTOR = 0.45f;

View file

@ -71,11 +71,11 @@ public:
using IgnoreBox = AABox; using IgnoreBox = AABox;
// called from single AudioMixerSlave while processing packets for node // called from single AudioMixerWorker while processing packets for node
void enableIgnoreBox(); void enableIgnoreBox();
void disableIgnoreBox() { _isIgnoreBoxEnabled = false; } void disableIgnoreBox() { _isIgnoreBoxEnabled = false; }
// thread-safe, called from AudioMixerSlave(s) while preparing mixes // thread-safe, called from AudioMixerWorker(s) while preparing mixes
bool isIgnoreBoxEnabled() const { return _isIgnoreBoxEnabled; } bool isIgnoreBoxEnabled() const { return _isIgnoreBoxEnabled; }
const IgnoreBox& getIgnoreBox() const { return _ignoreBox; } const IgnoreBox& getIgnoreBox() const { return _ignoreBox; }

View file

@ -655,7 +655,7 @@ uint8_t EntityMotionState::getSimulationPriority() const {
return _entity->getSimulationPriority(); return _entity->getSimulationPriority();
} }
void EntityMotionState::slaveBidPriority() { void EntityMotionState::workerBidPriority() {
_bumpedPriority = glm::max(_bumpedPriority, _entity->getSimulationPriority()); _bumpedPriority = glm::max(_bumpedPriority, _entity->getSimulationPriority());
} }

View file

@ -111,7 +111,7 @@ protected:
void updateServerPhysicsVariables(); void updateServerPhysicsVariables();
bool remoteSimulationOutOfSync(uint32_t simulationStep); bool remoteSimulationOutOfSync(uint32_t simulationStep);
void slaveBidPriority(); // computeNewBidPriority() with value stored in _entity void workerBidPriority(); // computeNewBidPriority() with value stored in _entity
void clearObjectVelocities() const; void clearObjectVelocities() const;

View file

@ -602,7 +602,7 @@ void PhysicalEntitySimulation::sendOwnershipBids(uint32_t numSubsteps) {
// in the EntityMotionState::_serverFoo variables (please see comments in EntityMotionState.h) // in the EntityMotionState::_serverFoo variables (please see comments in EntityMotionState.h)
// therefore we need to immediately send an update so that the values stored are what we're // therefore we need to immediately send an update so that the values stored are what we're
// "telling" the server rather than what we've been "hearing" from the server. // "telling" the server rather than what we've been "hearing" from the server.
_bids[i]->slaveBidPriority(); _bids[i]->workerBidPriority();
_bids[i]->sendUpdate(_entityPacketSender, numSubsteps); _bids[i]->sendUpdate(_entityPacketSender, numSubsteps);
addOwnership(_bids[i]); addOwnership(_bids[i]);