Merge commit '135c7b667eb126bcc4ebab948fa7450327209ae8'

This commit is contained in:
Roxanne Skelly 2019-03-05 15:09:56 -08:00
commit cca36e4a89
5 changed files with 43 additions and 89 deletions

View file

@ -64,10 +64,6 @@ bool AudioMixerSlaveThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node); return _pool._queue.try_pop(node);
} }
#ifdef AUDIO_SINGLE_THREADED
static AudioMixerSlave slave;
#endif
void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) { void AudioMixerSlavePool::processPackets(ConstIter begin, ConstIter end) {
_function = &AudioMixerSlave::processPackets; _function = &AudioMixerSlave::processPackets;
_configure = [](AudioMixerSlave& slave) {}; _configure = [](AudioMixerSlave& slave) {};
@ -87,19 +83,9 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
_begin = begin; _begin = begin;
_end = end; _end = end;
#ifdef AUDIO_SINGLE_THREADED
_configure(slave);
std::for_each(begin, end, [&](const SharedNodePointer& node) {
_function(slave, node);
});
#else
// fill the queue // fill the queue
std::for_each(_begin, _end, [&](const SharedNodePointer& node) { std::for_each(_begin, _end, [&](const SharedNodePointer& node) {
#if defined(__clang__) && defined(Q_OS_LINUX)
_queue.push(node); _queue.push(node);
#else
_queue.emplace(node);
#endif
}); });
{ {
@ -119,17 +105,12 @@ void AudioMixerSlavePool::run(ConstIter begin, ConstIter end) {
} }
assert(_queue.empty()); assert(_queue.empty());
#endif
} }
void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) { void AudioMixerSlavePool::each(std::function<void(AudioMixerSlave& slave)> functor) {
#ifdef AUDIO_SINGLE_THREADED
functor(slave);
#else
for (auto& slave : _slaves) { for (auto& slave : _slaves) {
functor(*slave.get()); functor(*slave.get());
} }
#endif
} }
void AudioMixerSlavePool::setNumThreads(int numThreads) { void AudioMixerSlavePool::setNumThreads(int numThreads) {
@ -155,9 +136,6 @@ void AudioMixerSlavePool::setNumThreads(int numThreads) {
void AudioMixerSlavePool::resize(int numThreads) { void AudioMixerSlavePool::resize(int numThreads) {
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_slaves.size());
#ifdef AUDIO_SINGLE_THREADED
qDebug("%s: running single threaded", __FUNCTION__, numThreads);
#else
qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);
Lock lock(_mutex); Lock lock(_mutex);
@ -205,5 +183,4 @@ void AudioMixerSlavePool::resize(int numThreads) {
_numThreads = _numStarted = _numFinished = numThreads; _numThreads = _numStarted = _numFinished = numThreads;
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_slaves.size());
#endif
} }

View file

@ -264,6 +264,10 @@ void AvatarMixer::start() {
}, &lockWait, &nodeTransform, &functor); }, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow(); auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsElapsedTime += (end - start); _processQueuedAvatarDataPacketsElapsedTime += (end - start);
_broadcastAvatarDataLockWait += lockWait;
_broadcastAvatarDataNodeTransform += nodeTransform;
_broadcastAvatarDataNodeFunctor += functor;
} }
// process pending display names... this doesn't currently run on multiple threads, because it // process pending display names... this doesn't currently run on multiple threads, because it
@ -281,6 +285,10 @@ void AvatarMixer::start() {
}, &lockWait, &nodeTransform, &functor); }, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow(); auto end = usecTimestampNow();
_displayNameManagementElapsedTime += (end - start); _displayNameManagementElapsedTime += (end - start);
_broadcastAvatarDataLockWait += lockWait;
_broadcastAvatarDataNodeTransform += nodeTransform;
_broadcastAvatarDataNodeFunctor += functor;
} }
// this is where we need to put the real work... // this is where we need to put the real work...

View file

@ -63,10 +63,6 @@ bool AvatarMixerSlaveThread::try_pop(SharedNodePointer& node) {
return _pool._queue.try_pop(node); return _pool._queue.try_pop(node);
} }
#ifdef AVATAR_SINGLE_THREADED
static AvatarMixerSlave slave;
#endif
void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) { void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::processIncomingPackets; _function = &AvatarMixerSlave::processIncomingPackets;
_configure = [=](AvatarMixerSlave& slave) { _configure = [=](AvatarMixerSlave& slave) {
@ -89,19 +85,9 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
_begin = begin; _begin = begin;
_end = end; _end = end;
#ifdef AUDIO_SINGLE_THREADED
_configure(slave);
std::for_each(begin, end, [&](const SharedNodePointer& node) {
_function(slave, node);
});
#else
// fill the queue // fill the queue
std::for_each(_begin, _end, [&](const SharedNodePointer& node) { std::for_each(_begin, _end, [&](const SharedNodePointer& node) {
#if defined(__clang__) && defined(Q_OS_LINUX)
_queue.push(node); _queue.push(node);
#else
_queue.emplace(node);
#endif
}); });
{ {
@ -121,18 +107,13 @@ void AvatarMixerSlavePool::run(ConstIter begin, ConstIter end) {
} }
assert(_queue.empty()); assert(_queue.empty());
#endif
} }
void AvatarMixerSlavePool::each(std::function<void(AvatarMixerSlave& slave)> functor) { void AvatarMixerSlavePool::each(std::function<void(AvatarMixerSlave& slave)> functor) {
#ifdef AVATAR_SINGLE_THREADED
functor(slave);
#else
for (auto& slave : _slaves) { for (auto& slave : _slaves) {
functor(*slave.get()); functor(*slave.get());
} }
#endif
} }
void AvatarMixerSlavePool::setNumThreads(int numThreads) { void AvatarMixerSlavePool::setNumThreads(int numThreads) {
@ -158,9 +139,6 @@ void AvatarMixerSlavePool::setNumThreads(int numThreads) {
void AvatarMixerSlavePool::resize(int numThreads) { void AvatarMixerSlavePool::resize(int numThreads) {
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_slaves.size());
#ifdef AVATAR_SINGLE_THREADED
qDebug("%s: running single threaded", __FUNCTION__, numThreads);
#else
qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads); qDebug("%s: set %d threads (was %d)", __FUNCTION__, numThreads, _numThreads);
Lock lock(_mutex); Lock lock(_mutex);
@ -208,5 +186,4 @@ void AvatarMixerSlavePool::resize(int numThreads) {
_numThreads = _numStarted = _numFinished = numThreads; _numThreads = _numStarted = _numFinished = numThreads;
assert(_numThreads == (int)_slaves.size()); assert(_numThreads == (int)_slaves.size());
#endif
} }

View file

@ -566,25 +566,23 @@ SharedNodePointer LimitedNodeList::nodeWithLocalID(Node::LocalID localID) const
} }
void LimitedNodeList::eraseAllNodes() { void LimitedNodeList::eraseAllNodes() {
QSet<SharedNodePointer> killedNodes; std::vector<SharedNodePointer> killedNodes;
{ {
// iterate the current nodes - grab them so we can emit that they are dying // iterate the current nodes - grab them so we can emit that they are dying
// and then remove them from the hash // and then remove them from the hash
QWriteLocker writeLocker(&_nodeMutex); QWriteLocker writeLocker(&_nodeMutex);
_localIDMap.clear();
if (_nodeHash.size() > 0) { if (_nodeHash.size() > 0) {
qCDebug(networking) << "LimitedNodeList::eraseAllNodes() removing all nodes from NodeList."; qCDebug(networking) << "LimitedNodeList::eraseAllNodes() removing all nodes from NodeList.";
auto it = _nodeHash.begin(); killedNodes.reserve(_nodeHash.size());
for (auto& pair : _nodeHash) {
while (it != _nodeHash.end()) { killedNodes.push_back(pair.second);
killedNodes.insert(it->second);
it = _nodeHash.unsafe_erase(it);
} }
} }
_localIDMap.clear();
_nodeHash.clear();
} }
foreach(const SharedNodePointer& killedNode, killedNodes) { foreach(const SharedNodePointer& killedNode, killedNodes) {
@ -601,18 +599,13 @@ void LimitedNodeList::reset() {
} }
bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID) { bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID) {
QReadLocker readLocker(&_nodeMutex); auto matchingNode = nodeWithUUID(nodeUUID);
NodeHash::iterator it = _nodeHash.find(nodeUUID);
if (it != _nodeHash.end()) {
SharedNodePointer matchingNode = it->second;
readLocker.unlock();
if (matchingNode) {
{ {
QWriteLocker writeLocker(&_nodeMutex); QWriteLocker writeLocker(&_nodeMutex);
_localIDMap.unsafe_erase(matchingNode->getLocalID()); _localIDMap.unsafe_erase(matchingNode->getLocalID());
_nodeHash.unsafe_erase(it); _nodeHash.unsafe_erase(matchingNode->getUUID());
} }
handleNodeKill(matchingNode, newConnectionID); handleNodeKill(matchingNode, newConnectionID);
@ -653,30 +646,26 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
Node::LocalID localID, bool isReplicated, bool isUpstream, Node::LocalID localID, bool isReplicated, bool isUpstream,
const QUuid& connectionSecret, const NodePermissions& permissions) { const QUuid& connectionSecret, const NodePermissions& permissions) {
{ auto matchingNode = nodeWithUUID(uuid);
QReadLocker readLocker(&_nodeMutex); if (matchingNode) {
NodeHash::const_iterator it = _nodeHash.find(uuid); matchingNode->setPublicSocket(publicSocket);
matchingNode->setLocalSocket(localSocket);
matchingNode->setPermissions(permissions);
matchingNode->setConnectionSecret(connectionSecret);
matchingNode->setIsReplicated(isReplicated);
matchingNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType));
matchingNode->setLocalID(localID);
if (it != _nodeHash.end()) { return matchingNode;
SharedNodePointer& matchingNode = it->second;
matchingNode->setPublicSocket(publicSocket);
matchingNode->setLocalSocket(localSocket);
matchingNode->setPermissions(permissions);
matchingNode->setConnectionSecret(connectionSecret);
matchingNode->setIsReplicated(isReplicated);
matchingNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType));
matchingNode->setLocalID(localID);
return matchingNode;
}
} }
auto removeOldNode = [&](auto node) { auto removeOldNode = [&](auto node) {
if (node) { if (node) {
QWriteLocker writeLocker(&_nodeMutex); {
_localIDMap.unsafe_erase(node->getLocalID()); QWriteLocker writeLocker(&_nodeMutex);
_nodeHash.unsafe_erase(node->getUUID()); _localIDMap.unsafe_erase(node->getLocalID());
_nodeHash.unsafe_erase(node->getUUID());
}
handleNodeKill(node); handleNodeKill(node);
} }
}; };

View file

@ -207,7 +207,10 @@ public:
int* lockWaitOut = nullptr, int* lockWaitOut = nullptr,
int* nodeTransformOut = nullptr, int* nodeTransformOut = nullptr,
int* functorOut = nullptr) { int* functorOut = nullptr) {
auto start = usecTimestampNow(); quint64 start, endTransform, endFunctor;
start = usecTimestampNow();
std::vector<SharedNodePointer> nodes;
{ {
QReadLocker readLock(&_nodeMutex); QReadLocker readLock(&_nodeMutex);
auto endLock = usecTimestampNow(); auto endLock = usecTimestampNow();
@ -218,21 +221,21 @@ public:
// Size of _nodeHash could change at any time, // Size of _nodeHash could change at any time,
// so reserve enough memory for the current size // so reserve enough memory for the current size
// and then back insert all the nodes found // and then back insert all the nodes found
std::vector<SharedNodePointer> nodes;
nodes.reserve(_nodeHash.size()); nodes.reserve(_nodeHash.size());
std::transform(_nodeHash.cbegin(), _nodeHash.cend(), std::back_inserter(nodes), [&](const NodeHash::value_type& it) { std::transform(_nodeHash.cbegin(), _nodeHash.cend(), std::back_inserter(nodes), [&](const NodeHash::value_type& it) {
return it.second; return it.second;
}); });
auto endTransform = usecTimestampNow();
endTransform = usecTimestampNow();
if (nodeTransformOut) { if (nodeTransformOut) {
*nodeTransformOut = (endTransform - endLock); *nodeTransformOut = (endTransform - endLock);
} }
}
functor(nodes.cbegin(), nodes.cend()); functor(nodes.cbegin(), nodes.cend());
auto endFunctor = usecTimestampNow(); endFunctor = usecTimestampNow();
if (functorOut) { if (functorOut) {
*functorOut = (endFunctor - endTransform); *functorOut = (endFunctor - endTransform);
}
} }
} }