// // AvatarMixer.cpp // assignment-client/src/avatars // // Created by Stephen Birarda on 9/5/13. // Copyright 2013 High Fidelity, Inc. // // Distributed under the Apache License, Version 2.0. // See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html // #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "AvatarMixer.h" const QString AVATAR_MIXER_LOGGING_NAME = "avatar-mixer"; // FIXME - what we'd actually like to do is send to users at ~50% of their present rate down to 30hz. Assume 90 for now. const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; AvatarMixer::AvatarMixer(ReceivedMessage& message) : ThreadedAssignment(message) { // make sure we hear about node kills so we can tell the other nodes connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &AvatarMixer::nodeKilled); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerListener(PacketType::AvatarData, this, "queueIncomingPacket"); packetReceiver.registerListener(PacketType::AdjustAvatarSorting, this, "handleAdjustAvatarSorting"); packetReceiver.registerListener(PacketType::ViewFrustum, this, "handleViewFrustumPacket"); packetReceiver.registerListener(PacketType::AvatarIdentity, this, "handleAvatarIdentityPacket"); packetReceiver.registerListener(PacketType::KillAvatar, this, "handleKillAvatarPacket"); packetReceiver.registerListener(PacketType::NodeIgnoreRequest, this, "handleNodeIgnoreRequestPacket"); packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket"); packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket"); auto nodeList = DependencyManager::get(); connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch); } void AvatarMixer::queueIncomingPacket(QSharedPointer message, SharedNodePointer node) { auto start = usecTimestampNow(); getOrCreateClientData(node)->queuePacket(message, node); auto end = usecTimestampNow(); _queueIncomingPacketElapsedTime += (end - start); } AvatarMixer::~AvatarMixer() { } void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { QByteArray individualData = nodeData->getAvatar().identityByteArray(); auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size()); individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); identityPacket->write(individualData); DependencyManager::get()->sendPacket(std::move(identityPacket), *destinationNode); ++_sumIdentityPackets; } std::chrono::microseconds AvatarMixer::timeFrame(p_high_resolution_clock::time_point& timestamp) { // advance the next frame auto nextTimestamp = timestamp + std::chrono::microseconds((int)((float)USECS_PER_SECOND / (float)AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND)); auto now = p_high_resolution_clock::now(); // compute how long the last frame took auto duration = std::chrono::duration_cast(now - timestamp); // set the new frame timestamp timestamp = std::max(now, nextTimestamp); // sleep until the next frame should start // WIN32 sleep_until is broken until VS2015 Update 2 // instead, std::max (above) guarantees that timestamp >= now, so we can sleep_for std::this_thread::sleep_for(timestamp - now); return duration; } void AvatarMixer::start() { auto nodeList = DependencyManager::get(); unsigned int frame = 1; auto frameTimestamp = p_high_resolution_clock::now(); while (!_isFinished) { auto frameDuration = timeFrame(frameTimestamp); // calculates last frame duration and sleeps remainder of target amount throttle(frameDuration, frame); // determines _throttlingRatio for upcoming mix frame int lockWait, nodeTransform, functor; // Allow nodes to process any pending/queued packets across our worker threads { auto start = usecTimestampNow(); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { auto end = usecTimestampNow(); _processQueuedAvatarDataPacketsLockWaitElapsedTime += (end - start); _slavePool.processIncomingPackets(cbegin, cend); }, &lockWait, &nodeTransform, &functor); auto end = usecTimestampNow(); _processQueuedAvatarDataPacketsElapsedTime += (end - start); } // process pending display names... this doesn't currently run on multiple threads, because it // side-effects the mixer's data, which is fine because it's a very low cost operation { auto start = usecTimestampNow(); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { std::for_each(cbegin, cend, [&](const SharedNodePointer& node) { manageDisplayName(node); ++_sumListeners; }); }, &lockWait, &nodeTransform, &functor); auto end = usecTimestampNow(); _displayNameManagementElapsedTime += (end - start); } // this is where we need to put the real work... { auto start = usecTimestampNow(); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { auto start = usecTimestampNow(); _slavePool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode, _throttlingRatio); auto end = usecTimestampNow(); _broadcastAvatarDataInner += (end - start); }, &lockWait, &nodeTransform, &functor); auto end = usecTimestampNow(); _broadcastAvatarDataElapsedTime += (end - start); _broadcastAvatarDataLockWait += lockWait; _broadcastAvatarDataNodeTransform += nodeTransform; _broadcastAvatarDataNodeFunctor += functor; } ++frame; ++_numTightLoopFrames; _loopRate.increment(); // play nice with qt event-looping { // since we're a while loop we need to yield to qt's event processing auto start = usecTimestampNow(); QCoreApplication::processEvents(); if (_isFinished) { // alert qt eventing that this is finished QCoreApplication::sendPostedEvents(this, QEvent::DeferredDelete); break; } auto end = usecTimestampNow(); _processEventsElapsedTime += (end - start); } _lastFrameTimestamp = frameTimestamp; } } // NOTE: nodeData->getAvatar() might be side effected, must be called when access to node/nodeData // is guarenteed to not be accessed by other thread void AvatarMixer::manageDisplayName(const SharedNodePointer& node) { AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); if (nodeData && nodeData->getAvatarSessionDisplayNameMustChange()) { AvatarData& avatar = nodeData->getAvatar(); const QString& existingBaseDisplayName = nodeData->getBaseDisplayName(); if (--_sessionDisplayNames[existingBaseDisplayName].second <= 0) { _sessionDisplayNames.remove(existingBaseDisplayName); } QString baseName = avatar.getDisplayName().trimmed(); const QRegularExpression curses { "fuck|shit|damn|cock|cunt" }; // POC. We may eventually want something much more elaborate (subscription?). baseName = baseName.replace(curses, "*"); // Replace rather than remove, so that people have a clue that the person's a jerk. const QRegularExpression trailingDigits { "\\s*_\\d+$" }; // whitespace "_123" baseName = baseName.remove(trailingDigits); if (baseName.isEmpty()) { baseName = "anonymous"; } QPair& soFar = _sessionDisplayNames[baseName]; // Inserts and answers 0, 0 if not already present, which is what we want. int& highWater = soFar.first; nodeData->setBaseDisplayName(baseName); QString sessionDisplayName = (highWater > 0) ? baseName + "_" + QString::number(highWater) : baseName; avatar.setSessionDisplayName(sessionDisplayName); highWater++; soFar.second++; // refcount nodeData->flagIdentityChange(); nodeData->setAvatarSessionDisplayNameMustChange(false); sendIdentityPacket(nodeData, node); // Tell node whose name changed about its new session display name. qCDebug(avatars) << "Giving session display name" << sessionDisplayName << "to node with ID" << node->getUUID(); } } void AvatarMixer::throttle(std::chrono::microseconds duration, int frame) { // throttle using a modified proportional-integral controller const float FRAME_TIME = USECS_PER_SECOND / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND; float mixRatio = duration.count() / FRAME_TIME; // constants are determined based on a "regular" 16-CPU EC2 server // target different mix and backoff ratios (they also have different backoff rates) // this is to prevent oscillation, and encourage throttling to find a steady state const float TARGET = 0.9f; // on a "regular" machine with 100 avatars, this is the largest value where // - overthrottling can be recovered // - oscillations will not occur after the recovery const float BACKOFF_TARGET = 0.44f; // the mixer is known to struggle at about 150 on a "regular" machine // so throttle 2/150 the streams to ensure smooth mixing (throttling is linear) const float STRUGGLES_AT = 150.0f; const float THROTTLE_RATE = 2 / STRUGGLES_AT; const float BACKOFF_RATE = THROTTLE_RATE / 4; // recovery should be bounded so that large changes in user count is a tolerable experience // throttling is linear, so most cases will not need a full recovery const int RECOVERY_TIME = 180; // weight more recent frames to determine if throttling is necessary, const int TRAILING_FRAMES = (int)(100 * RECOVERY_TIME * BACKOFF_RATE); const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_FRAMES; const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO; _trailingMixRatio = PREVIOUS_FRAMES_RATIO * _trailingMixRatio + CURRENT_FRAME_RATIO * mixRatio; if (frame % TRAILING_FRAMES == 0) { if (_trailingMixRatio > TARGET) { int proportionalTerm = 1 + (_trailingMixRatio - TARGET) / 0.1f; _throttlingRatio += THROTTLE_RATE * proportionalTerm; _throttlingRatio = std::min(_throttlingRatio, 1.0f); qDebug("avatar-mixer is struggling (%f mix/sleep) - throttling %f of streams", (double)_trailingMixRatio, (double)_throttlingRatio); } else if (_throttlingRatio > 0.0f && _trailingMixRatio <= BACKOFF_TARGET) { int proportionalTerm = 1 + (TARGET - _trailingMixRatio) / 0.2f; _throttlingRatio -= BACKOFF_RATE * proportionalTerm; _throttlingRatio = std::max(_throttlingRatio, 0.0f); qDebug("avatar-mixer is recovering (%f mix/sleep) - throttling %f of streams", (double)_trailingMixRatio, (double)_throttlingRatio); } } } void AvatarMixer::nodeKilled(SharedNodePointer killedNode) { if (killedNode->getType() == NodeType::Agent && killedNode->getLinkedData()) { auto nodeList = DependencyManager::get(); { // decrement sessionDisplayNames table and possibly remove QMutexLocker nodeDataLocker(&killedNode->getLinkedData()->getMutex()); AvatarMixerClientData* nodeData = dynamic_cast(killedNode->getLinkedData()); const QString& baseDisplayName = nodeData->getBaseDisplayName(); // No sense guarding against very rare case of a node with no entry, as this will work without the guard and do one less lookup in the common case. if (--_sessionDisplayNames[baseDisplayName].second <= 0) { _sessionDisplayNames.remove(baseDisplayName); } } // this was an avatar we were sending to other people // send a kill packet for it to our other nodes auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); killPacket->write(killedNode->getUUID().toRfc4122()); killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); // we also want to remove sequence number data for this avatar on our other avatars // so invoke the appropriate method on the AvatarMixerClientData for other avatars nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { if (!node->getLinkedData()) { return false; } if (node->getUUID() == killedNode->getUUID()) { return false; } return true; }, [&](const SharedNodePointer& node) { QMetaObject::invokeMethod(node->getLinkedData(), "cleanupKilledNode", Qt::AutoConnection, Q_ARG(const QUuid&, QUuid(killedNode->getUUID()))); } ); } } void AvatarMixer::handleAdjustAvatarSorting(QSharedPointer message, SharedNodePointer senderNode) { auto start = usecTimestampNow(); // only allow admins with kick rights to change this value... if (senderNode->getCanKick()) { message->readPrimitive(&AvatarData::_avatarSortCoefficientSize); message->readPrimitive(&AvatarData::_avatarSortCoefficientCenter); message->readPrimitive(&AvatarData::_avatarSortCoefficientAge); qCDebug(avatars) << "New avatar sorting... " << "size:" << AvatarData::_avatarSortCoefficientSize << "center:" << AvatarData::_avatarSortCoefficientCenter << "age:" << AvatarData::_avatarSortCoefficientAge; } auto end = usecTimestampNow(); _handleAdjustAvatarSortingElapsedTime += (end - start); } void AvatarMixer::handleViewFrustumPacket(QSharedPointer message, SharedNodePointer senderNode) { auto start = usecTimestampNow(); getOrCreateClientData(senderNode); if (senderNode->getLinkedData()) { AvatarMixerClientData* nodeData = dynamic_cast(senderNode->getLinkedData()); if (nodeData != nullptr) { nodeData->readViewFrustumPacket(message->getMessage()); } } auto end = usecTimestampNow(); _handleViewFrustumPacketElapsedTime += (end - start); } void AvatarMixer::handleRequestsDomainListDataPacket(QSharedPointer message, SharedNodePointer senderNode) { auto start = usecTimestampNow(); getOrCreateClientData(senderNode); if (senderNode->getLinkedData()) { AvatarMixerClientData* nodeData = dynamic_cast(senderNode->getLinkedData()); if (nodeData != nullptr) { bool isRequesting; message->readPrimitive(&isRequesting); nodeData->setRequestsDomainListData(isRequesting); qCDebug(avatars) << "node" << nodeData->getNodeID() << "requestsDomainListData" << isRequesting; // If we just opened the PAL... if (isRequesting) { // For each node in the NodeList... auto nodeList = DependencyManager::get(); nodeList->eachMatchingNode( // Discover the valid nodes we're ignoring... [&](const SharedNodePointer& node)->bool { if (node->getUUID() != senderNode->getUUID() && (nodeData->isRadiusIgnoring(node->getUUID()) || senderNode->isIgnoringNodeWithID(node->getUUID()))) { return true; } return false; }, // ...For those nodes, reset the lastBroadcastTime to 0 // so that the AvatarMixer will send Identity data to us [&](const SharedNodePointer& node) { nodeData->setLastBroadcastTime(node->getUUID(), 0); } ); } } } auto end = usecTimestampNow(); _handleRequestsDomainListDataPacketElapsedTime += (end - start); } void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer message, SharedNodePointer senderNode) { auto start = usecTimestampNow(); auto nodeList = DependencyManager::get(); getOrCreateClientData(senderNode); if (senderNode->getLinkedData()) { AvatarMixerClientData* nodeData = dynamic_cast(senderNode->getLinkedData()); if (nodeData != nullptr) { AvatarData& avatar = nodeData->getAvatar(); // parse the identity packet and update the change timestamp if appropriate AvatarData::Identity identity; AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity); bool identityChanged = false; bool displayNameChanged = false; avatar.processAvatarIdentity(identity, identityChanged, displayNameChanged); if (identityChanged) { QMutexLocker nodeDataLocker(&nodeData->getMutex()); nodeData->flagIdentityChange(); if (displayNameChanged) { nodeData->setAvatarSessionDisplayNameMustChange(true); } } } } auto end = usecTimestampNow(); _handleAvatarIdentityPacketElapsedTime += (end - start); } void AvatarMixer::handleKillAvatarPacket(QSharedPointer message) { auto start = usecTimestampNow(); DependencyManager::get()->processKillNode(*message); auto end = usecTimestampNow(); _handleKillAvatarPacketElapsedTime += (end - start); } void AvatarMixer::handleNodeIgnoreRequestPacket(QSharedPointer message, SharedNodePointer senderNode) { auto start = usecTimestampNow(); auto nodeList = DependencyManager::get(); AvatarMixerClientData* nodeData = reinterpret_cast(senderNode->getLinkedData()); bool addToIgnore; message->readPrimitive(&addToIgnore); while (message->getBytesLeftToRead()) { // parse out the UUID being ignored from the packet QUuid ignoredUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); // Reset the lastBroadcastTime for the ignored avatar to 0 // so the AvatarMixer knows it'll have to send identity data about the ignored avatar // to the ignorer if the ignorer unignores. nodeData->setLastBroadcastTime(ignoredUUID, 0); // Reset the lastBroadcastTime for the ignorer (FROM THE PERSPECTIVE OF THE IGNORED) to 0 // so the AvatarMixer knows it'll have to send identity data about the ignorer // to the ignored if the ignorer unignores. auto ignoredNode = nodeList->nodeWithUUID(ignoredUUID); AvatarMixerClientData* ignoredNodeData = reinterpret_cast(ignoredNode->getLinkedData()); ignoredNodeData->setLastBroadcastTime(senderNode->getUUID(), 0); if (addToIgnore) { senderNode->addIgnoredNode(ignoredUUID); } else { senderNode->removeIgnoredNode(ignoredUUID); } } auto end = usecTimestampNow(); _handleNodeIgnoreRequestPacketElapsedTime += (end - start); } void AvatarMixer::handleRadiusIgnoreRequestPacket(QSharedPointer packet, SharedNodePointer sendingNode) { auto start = usecTimestampNow(); sendingNode->parseIgnoreRadiusRequestMessage(packet); auto end = usecTimestampNow(); _handleRadiusIgnoreRequestPacketElapsedTime += (end - start); } void AvatarMixer::sendStatsPacket() { auto start = usecTimestampNow(); QJsonObject statsObject; statsObject["broadcast_loop_rate"] = _loopRate.rate(); statsObject["threads"] = _slavePool.numThreads(); statsObject["trailing_mix_ratio"] = _trailingMixRatio; statsObject["throttling_ratio"] = _throttlingRatio; // this things all occur on the frequency of the tight loop int tightLoopFrames = _numTightLoopFrames; int tenTimesPerFrame = tightLoopFrames * 10; #define TIGHT_LOOP_STAT(x) (x > tenTimesPerFrame) ? x / tightLoopFrames : ((float)x / (float)tightLoopFrames); #define TIGHT_LOOP_STAT_UINT64(x) (x > (quint64)tenTimesPerFrame) ? x / tightLoopFrames : ((float)x / (float)tightLoopFrames); statsObject["average_listeners_last_second"] = TIGHT_LOOP_STAT(_sumListeners); QJsonObject singleCoreTasks; singleCoreTasks["processEvents"] = TIGHT_LOOP_STAT_UINT64(_processEventsElapsedTime); singleCoreTasks["queueIncomingPacket"] = TIGHT_LOOP_STAT_UINT64(_queueIncomingPacketElapsedTime); QJsonObject incomingPacketStats; incomingPacketStats["handleAvatarIdentityPacket"] = TIGHT_LOOP_STAT_UINT64(_handleAvatarIdentityPacketElapsedTime); incomingPacketStats["handleKillAvatarPacket"] = TIGHT_LOOP_STAT_UINT64(_handleKillAvatarPacketElapsedTime); incomingPacketStats["handleNodeIgnoreRequestPacket"] = TIGHT_LOOP_STAT_UINT64(_handleNodeIgnoreRequestPacketElapsedTime); incomingPacketStats["handleRadiusIgnoreRequestPacket"] = TIGHT_LOOP_STAT_UINT64(_handleRadiusIgnoreRequestPacketElapsedTime); incomingPacketStats["handleRequestsDomainListDataPacket"] = TIGHT_LOOP_STAT_UINT64(_handleRequestsDomainListDataPacketElapsedTime); incomingPacketStats["handleViewFrustumPacket"] = TIGHT_LOOP_STAT_UINT64(_handleViewFrustumPacketElapsedTime); singleCoreTasks["incoming_packets"] = incomingPacketStats; singleCoreTasks["sendStats"] = (float)_sendStatsElapsedTime; statsObject["singleCoreTasks"] = singleCoreTasks; QJsonObject parallelTasks; QJsonObject processQueuedAvatarDataPacketsStats; processQueuedAvatarDataPacketsStats["1_total"] = TIGHT_LOOP_STAT_UINT64(_processQueuedAvatarDataPacketsElapsedTime); processQueuedAvatarDataPacketsStats["2_lockWait"] = TIGHT_LOOP_STAT_UINT64(_processQueuedAvatarDataPacketsLockWaitElapsedTime); parallelTasks["processQueuedAvatarDataPackets"] = processQueuedAvatarDataPacketsStats; QJsonObject broadcastAvatarDataStats; broadcastAvatarDataStats["1_total"] = TIGHT_LOOP_STAT_UINT64(_broadcastAvatarDataElapsedTime); broadcastAvatarDataStats["2_innner"] = TIGHT_LOOP_STAT_UINT64(_broadcastAvatarDataInner); broadcastAvatarDataStats["3_lockWait"] = TIGHT_LOOP_STAT_UINT64(_broadcastAvatarDataLockWait); broadcastAvatarDataStats["4_NodeTransform"] = TIGHT_LOOP_STAT_UINT64(_broadcastAvatarDataNodeTransform); broadcastAvatarDataStats["5_Functor"] = TIGHT_LOOP_STAT_UINT64(_broadcastAvatarDataNodeFunctor); parallelTasks["broadcastAvatarData"] = broadcastAvatarDataStats; QJsonObject displayNameManagementStats; displayNameManagementStats["1_total"] = TIGHT_LOOP_STAT_UINT64(_displayNameManagementElapsedTime); parallelTasks["displayNameManagement"] = displayNameManagementStats; statsObject["parallelTasks"] = parallelTasks; AvatarMixerSlaveStats aggregateStats; QJsonObject slavesObject; float secondsSinceLastStats = (float)(start - _lastStatsTime) / (float)USECS_PER_SECOND; // gather stats int slaveNumber = 1; _slavePool.each([&](AvatarMixerSlave& slave) { QJsonObject slaveObject; AvatarMixerSlaveStats stats; slave.harvestStats(stats); slaveObject["recevied_1_nodesProcessed"] = TIGHT_LOOP_STAT(stats.nodesProcessed); slaveObject["received_2_numPacketsReceived"] = TIGHT_LOOP_STAT(stats.packetsProcessed); slaveObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(stats.nodesBroadcastedTo); slaveObject["sent_2_numBytesSent"] = TIGHT_LOOP_STAT(stats.numBytesSent); slaveObject["sent_3_numPacketsSent"] = TIGHT_LOOP_STAT(stats.numPacketsSent); slaveObject["sent_4_numIdentityPackets"] = TIGHT_LOOP_STAT(stats.numIdentityPackets); float averageNodes = ((float)stats.nodesBroadcastedTo / (float)tightLoopFrames); float averageOutboundAvatarKbps = averageNodes ? ((stats.numBytesSent / secondsSinceLastStats) / BYTES_PER_KILOBIT) / averageNodes : 0.0f; slaveObject["sent_5_averageOutboundAvatarKbps"] = averageOutboundAvatarKbps; float averageOthersIncluded = averageNodes ? stats.numOthersIncluded / averageNodes : 0.0f; slaveObject["sent_6_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded); float averageOverBudgetAvatars = averageNodes ? stats.overBudgetAvatars / averageNodes : 0.0f; slaveObject["sent_7_averageOverBudgetAvatars"] = TIGHT_LOOP_STAT(averageOverBudgetAvatars); slaveObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(stats.processIncomingPacketsElapsedTime); slaveObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(stats.ignoreCalculationElapsedTime); slaveObject["timing_3_toByteArray"] = TIGHT_LOOP_STAT_UINT64(stats.toByteArrayElapsedTime); slaveObject["timing_4_avatarDataPacking"] = TIGHT_LOOP_STAT_UINT64(stats.avatarDataPackingElapsedTime); slaveObject["timing_5_packetSending"] = TIGHT_LOOP_STAT_UINT64(stats.packetSendingElapsedTime); slaveObject["timing_6_jobElapsedTime"] = TIGHT_LOOP_STAT_UINT64(stats.jobElapsedTime); slavesObject[QString::number(slaveNumber)] = slaveObject; slaveNumber++; aggregateStats += stats; }); QJsonObject slavesAggregatObject; slavesAggregatObject["recevied_1_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed); slavesAggregatObject["received_2_numPacketsReceived"] = TIGHT_LOOP_STAT(aggregateStats.packetsProcessed); slavesAggregatObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(aggregateStats.nodesBroadcastedTo); slavesAggregatObject["sent_2_numBytesSent"] = TIGHT_LOOP_STAT(aggregateStats.numBytesSent); slavesAggregatObject["sent_3_numPacketsSent"] = TIGHT_LOOP_STAT(aggregateStats.numPacketsSent); slavesAggregatObject["sent_4_numIdentityPackets"] = TIGHT_LOOP_STAT(aggregateStats.numIdentityPackets); float averageNodes = ((float)aggregateStats.nodesBroadcastedTo / (float)tightLoopFrames); float averageOutboundAvatarKbps = averageNodes ? ((aggregateStats.numBytesSent / secondsSinceLastStats) / BYTES_PER_KILOBIT) / averageNodes : 0.0f; slavesAggregatObject["sent_5_averageOutboundAvatarKbps"] = averageOutboundAvatarKbps; float averageOthersIncluded = averageNodes ? aggregateStats.numOthersIncluded / averageNodes : 0.0f; slavesAggregatObject["sent_6_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded); float averageOverBudgetAvatars = averageNodes ? aggregateStats.overBudgetAvatars / averageNodes : 0.0f; slavesAggregatObject["sent_7_averageOverBudgetAvatars"] = TIGHT_LOOP_STAT(averageOverBudgetAvatars); slavesAggregatObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.processIncomingPacketsElapsedTime); slavesAggregatObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.ignoreCalculationElapsedTime); slavesAggregatObject["timing_3_toByteArray"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.toByteArrayElapsedTime); slavesAggregatObject["timing_4_avatarDataPacking"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.avatarDataPackingElapsedTime); slavesAggregatObject["timing_5_packetSending"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.packetSendingElapsedTime); slavesAggregatObject["timing_6_jobElapsedTime"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.jobElapsedTime); statsObject["slaves_aggregate"] = slavesAggregatObject; statsObject["slaves_individual"] = slavesObject; _handleViewFrustumPacketElapsedTime = 0; _handleAvatarIdentityPacketElapsedTime = 0; _handleKillAvatarPacketElapsedTime = 0; _handleNodeIgnoreRequestPacketElapsedTime = 0; _handleRadiusIgnoreRequestPacketElapsedTime = 0; _handleRequestsDomainListDataPacketElapsedTime = 0; _processEventsElapsedTime = 0; _queueIncomingPacketElapsedTime = 0; _processQueuedAvatarDataPacketsElapsedTime = 0; _processQueuedAvatarDataPacketsLockWaitElapsedTime = 0; QJsonObject avatarsObject; auto nodeList = DependencyManager::get(); // add stats for each listerner nodeList->eachNode([&](const SharedNodePointer& node) { QJsonObject avatarStats; const QString NODE_OUTBOUND_KBPS_STAT_KEY = "outbound_kbps"; const QString NODE_INBOUND_KBPS_STAT_KEY = "inbound_kbps"; // add the key to ask the domain-server for a username replacement, if it has it avatarStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundBandwidth(); avatarStats[NODE_INBOUND_KBPS_STAT_KEY] = node->getInboundBandwidth(); AvatarMixerClientData* clientData = static_cast(node->getLinkedData()); if (clientData) { MutexTryLocker lock(clientData->getMutex()); if (lock.isLocked()) { clientData->loadJSONStats(avatarStats); // add the diff between the full outbound bandwidth and the measured bandwidth for AvatarData send only avatarStats["delta_full_vs_avatar_data_kbps"] = avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY].toDouble() - avatarStats[OUTBOUND_AVATAR_DATA_STATS_KEY].toDouble(); } } avatarsObject[uuidStringWithoutCurlyBraces(node->getUUID())] = avatarStats; }); statsObject["z_avatars"] = avatarsObject; ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); _sumListeners = 0; _sumIdentityPackets = 0; _numTightLoopFrames = 0; _broadcastAvatarDataElapsedTime = 0; _broadcastAvatarDataInner = 0; _broadcastAvatarDataLockWait = 0; _broadcastAvatarDataNodeTransform = 0; _broadcastAvatarDataNodeFunctor = 0; _displayNameManagementElapsedTime = 0; _ignoreCalculationElapsedTime = 0; _avatarDataPackingElapsedTime = 0; _packetSendingElapsedTime = 0; auto end = usecTimestampNow(); _sendStatsElapsedTime = (end - start); _lastStatsTime = start; } void AvatarMixer::run() { qCDebug(avatars) << "Waiting for connection to domain to request settings from domain-server."; // wait until we have the domain-server settings, otherwise we bail DomainHandler& domainHandler = DependencyManager::get()->getDomainHandler(); connect(&domainHandler, &DomainHandler::settingsReceived, this, &AvatarMixer::domainSettingsRequestComplete); connect(&domainHandler, &DomainHandler::settingsReceiveFail, this, &AvatarMixer::domainSettingsRequestFailed); ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer); } AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node) { auto clientData = dynamic_cast(node->getLinkedData()); if (!clientData) { node->setLinkedData(std::unique_ptr { new AvatarMixerClientData(node->getUUID()) }); clientData = dynamic_cast(node->getLinkedData()); auto& avatar = clientData->getAvatar(); avatar.setDomainMinimumScale(_domainMinimumScale); avatar.setDomainMaximumScale(_domainMaximumScale); } return clientData; } void AvatarMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); // parse the settings to pull out the values we need parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); // start our tight loop... start(); } void AvatarMixer::handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID) { // if this client is using packet versions we don't expect. if ((type == PacketTypeEnum::Value::AvatarIdentity || type == PacketTypeEnum::Value::AvatarData) && !senderUUID.isNull()) { // Echo an empty AvatarData packet back to that client. // This should trigger a version mismatch dialog on their side. auto nodeList = DependencyManager::get(); auto node = nodeList->nodeWithUUID(senderUUID); if (node) { auto emptyPacket = NLPacket::create(PacketType::AvatarData, 0); nodeList->sendPacket(std::move(emptyPacket), *node); } } } void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { const QString AVATAR_MIXER_SETTINGS_KEY = "avatar_mixer"; QJsonObject avatarMixerGroupObject = domainSettings[AVATAR_MIXER_SETTINGS_KEY].toObject(); const QString NODE_SEND_BANDWIDTH_KEY = "max_node_send_bandwidth"; const float DEFAULT_NODE_SEND_BANDWIDTH = 5.0f; QJsonValue nodeBandwidthValue = avatarMixerGroupObject[NODE_SEND_BANDWIDTH_KEY]; if (!nodeBandwidthValue.isDouble()) { qCDebug(avatars) << NODE_SEND_BANDWIDTH_KEY << "is not a double - will continue with default value"; } _maxKbpsPerNode = nodeBandwidthValue.toDouble(DEFAULT_NODE_SEND_BANDWIDTH) * KILO_PER_MEGA; qCDebug(avatars) << "The maximum send bandwidth per node is" << _maxKbpsPerNode << "kbps."; const QString AUTO_THREADS = "auto_threads"; bool autoThreads = avatarMixerGroupObject[AUTO_THREADS].toBool(); if (!autoThreads) { bool ok; const QString NUM_THREADS = "num_threads"; int numThreads = avatarMixerGroupObject[NUM_THREADS].toString().toInt(&ok); if (!ok) { qCWarning(avatars) << "Avatar mixer: Error reading thread count. Using 1 thread."; numThreads = 1; } qCDebug(avatars) << "Avatar mixer will use specified number of threads:" << numThreads; _slavePool.setNumThreads(numThreads); } else { qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads."; } const QString AVATARS_SETTINGS_KEY = "avatars"; static const QString MIN_SCALE_OPTION = "min_avatar_scale"; float settingMinScale = domainSettings[AVATARS_SETTINGS_KEY].toObject()[MIN_SCALE_OPTION].toDouble(MIN_AVATAR_SCALE); _domainMinimumScale = glm::clamp(settingMinScale, MIN_AVATAR_SCALE, MAX_AVATAR_SCALE); static const QString MAX_SCALE_OPTION = "max_avatar_scale"; float settingMaxScale = domainSettings[AVATARS_SETTINGS_KEY].toObject()[MAX_SCALE_OPTION].toDouble(MAX_AVATAR_SCALE); _domainMaximumScale = glm::clamp(settingMaxScale, MIN_AVATAR_SCALE, MAX_AVATAR_SCALE); // make sure that the domain owner didn't flip min and max if (_domainMinimumScale > _domainMaximumScale) { std::swap(_domainMinimumScale, _domainMaximumScale); } qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale << "and a maximum avatar scale of" << _domainMaximumScale; }