From 61c4e6d72e58bdc828f79c18031438cfe7d5b680 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 6 Jul 2015 17:37:41 -0700 Subject: [PATCH] some network API reworking in AvatarMixer --- assignment-client/src/avatars/AvatarMixer.cpp | 223 +++++++++--------- 1 file changed, 107 insertions(+), 116 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 7ef9c42524..af9328cd12 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -30,7 +30,7 @@ const QString AVATAR_MIXER_LOGGING_NAME = "avatar-mixer"; -const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 60; +const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 60; const unsigned int AVATAR_DATA_SEND_INTERVAL_MSECS = (1.0f / (float) AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND) * 1000; AvatarMixer::AvatarMixer(const QByteArray& packet) : @@ -63,73 +63,70 @@ const float BILLBOARD_AND_IDENTITY_SEND_PROBABILITY = 1.0f / 300.0f; // 1) use the view frustum to cull those avatars that are out of view. Since avatar data doesn't need to be present // if the avatar is not in view or in the keyhole. void AvatarMixer::broadcastAvatarData() { - + int idleTime = QDateTime::currentMSecsSinceEpoch() - _lastFrameTimestamp; - + ++_numStatFrames; - + const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f; const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f; - + const float RATIO_BACK_OFF = 0.02f; - + const int TRAILING_AVERAGE_FRAMES = 100; int framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES; - + const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES; const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO; - + // NOTE: The following code calculates the _performanceThrottlingRatio based on how much the avatar-mixer was // able to sleep. This will eventually be used to ask for an additional avatar-mixer to help out. Currently the value // is unused as it is assumed this should not be hit before the avatar-mixer hits the desired bandwidth limit per client. // It is reported in the domain-server stats for the avatar-mixer. - + _trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio) + (idleTime * CURRENT_FRAME_RATIO / (float) AVATAR_DATA_SEND_INTERVAL_MSECS); - + float lastCutoffRatio = _performanceThrottlingRatio; bool hasRatioChanged = false; - + if (framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) { if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) { // we're struggling - change our performance throttling ratio _performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio)); - + qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was" << lastCutoffRatio << "and is now" << _performanceThrottlingRatio; hasRatioChanged = true; } else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) { // we've recovered and can back off the performance throttling _performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF; - + if (_performanceThrottlingRatio < 0) { _performanceThrottlingRatio = 0; } - + qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was" << lastCutoffRatio << "and is now" << _performanceThrottlingRatio; hasRatioChanged = true; } - + if (hasRatioChanged) { framesSinceCutoffEvent = 0; } } - + if (!hasRatioChanged) { ++framesSinceCutoffEvent; } - - static QByteArray mixedAvatarByteArray; - + auto nodeList = DependencyManager::get(); - int numPacketHeaderBytes = nodeList->populatePacketHeader(mixedAvatarByteArray, PacketTypeBulkAvatarData); - - // setup for distributed random floating point values + + // setup for distributed random floating point values std::random_device randomDevice; std::mt19937 generator(randomDevice()); std::uniform_real_distribution distribution; - + nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { if (!node->getLinkedData()) { @@ -150,25 +147,22 @@ void AvatarMixer::broadcastAvatarData() { return; } ++_sumListeners; - - // reset packet pointers for this node - mixedAvatarByteArray.resize(numPacketHeaderBytes); AvatarData& avatar = nodeData->getAvatar(); glm::vec3 myPosition = avatar.getPosition(); // reset the internal state for correct random number distribution distribution.reset(); - + // reset the max distance for this frame float maxAvatarDistanceThisFrame = 0.0f; - + // reset the number of sent avatars nodeData->resetNumAvatarsSentLastFrame(); // keep a counter of the number of considered avatars int numOtherAvatars = 0; - + // keep track of outbound data rate specifically for avatar data int numAvatarDataBytes = 0; @@ -185,7 +179,7 @@ void AvatarMixer::broadcastAvatarData() { // bandwidth to this node. We do this once a second, which is also the window for // the bandwidth reported by node->getOutboundBandwidth(); if (nodeData->getNumFramesSinceFRDAdjustment() > AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND) { - + const float FRD_ADJUSTMENT_ACCEPTABLE_RATIO = 0.8f; const float HYSTERISIS_GAP = (1 - FRD_ADJUSTMENT_ACCEPTABLE_RATIO); const float HYSTERISIS_MIDDLE_PERCENTAGE = (1 - (HYSTERISIS_GAP * 0.5f)); @@ -195,22 +189,22 @@ void AvatarMixer::broadcastAvatarData() { if (avatarDataRateLastSecond > _maxKbpsPerNode) { - // is the FRD greater than the farthest avatar? + // is the FRD greater than the farthest avatar? // if so, before we calculate anything, set it to that distance currentFullRateDistance = std::min(currentFullRateDistance, nodeData->getMaxAvatarDistance()); // we're adjusting the full rate distance to target a bandwidth in the middle // of the hysterisis gap - currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond; - + currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond; + nodeData->setFullRateDistance(currentFullRateDistance); nodeData->resetNumFramesSinceFRDAdjustment(); - } else if (currentFullRateDistance < nodeData->getMaxAvatarDistance() + } else if (currentFullRateDistance < nodeData->getMaxAvatarDistance() && avatarDataRateLastSecond < _maxKbpsPerNode * FRD_ADJUSTMENT_ACCEPTABLE_RATIO) { - // we are constrained AND we've recovered to below the acceptable ratio + // we are constrained AND we've recovered to below the acceptable ratio // lets adjust the full rate distance to target a bandwidth in the middle of the hyterisis gap currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond; - + nodeData->setFullRateDistance(currentFullRateDistance); nodeData->resetNumFramesSinceFRDAdjustment(); } @@ -218,6 +212,9 @@ void AvatarMixer::broadcastAvatarData() { nodeData->incrementNumFramesSinceFRDAdjustment(); } + // setup a PacketList for the avatarPackets + PacketList avatarPacketList(PacketType::AvatarData); + // this is an AGENT we have received head data from // send back a packet with other active node data to this node nodeList->eachMatchingNode( @@ -233,16 +230,16 @@ void AvatarMixer::broadcastAvatarData() { }, [&](const SharedNodePointer& otherNode) { ++numOtherAvatars; - + AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); MutexTryLocker lock(otherNodeData->getMutex()); if (!lock.isLocked()) { return; } - + AvatarData& otherAvatar = otherNodeData->getAvatar(); // Decide whether to send this avatar's data based on it's distance from us - + // The full rate distance is the distance at which EVERY update will be sent for this avatar // at twice the full rate distance, there will be a 50% chance of sending this avatar's update glm::vec3 otherPosition = otherAvatar.getPosition(); @@ -251,24 +248,24 @@ void AvatarMixer::broadcastAvatarData() { // potentially update the max full rate distance for this frame maxAvatarDistanceThisFrame = std::max(maxAvatarDistanceThisFrame, distanceToAvatar); - if (distanceToAvatar != 0.0f + if (distanceToAvatar != 0.0f && distribution(generator) > (nodeData->getFullRateDistance() / distanceToAvatar)) { return; } PacketSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(otherNode->getUUID()); - PacketSequenceNumber lastSeqFromSender = otherNode->getLastSequenceNumberForPacketType(PacketTypeAvatarData); + PacketSequenceNumber lastSeqFromSender = otherNode->getLastSequenceNumberForPacketType(PacketType::AvatarData); if (lastSeqToReceiver > lastSeqFromSender) { // Did we somehow get out of order packets from the sender? // We don't expect this to happen - in RELEASE we add this to a trackable stat // and in DEBUG we crash on the assert - + otherNodeData->incrementNumOutOfOrderSends(); assert(false); } - + // make sure we haven't already sent this data from this sender to this receiver // or that somehow we haven't sent if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { @@ -277,76 +274,70 @@ void AvatarMixer::broadcastAvatarData() { } else if (lastSeqFromSender - lastSeqToReceiver > 1) { // this is a skip - we still send the packet but capture the presence of the skip so we see it happening ++numAvatarsWithSkippedFrames; - } - + } + // we're going to send this avatar - + // increment the number of avatars sent to this reciever nodeData->incrementNumAvatarsSentLastFrame(); - + // set the last sent sequence number for this sender on the receiver - nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), - otherNode->getLastSequenceNumberForPacketType(PacketTypeAvatarData)); + nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), + otherNode->getLastSequenceNumberForPacketType(PacketType::AvatarData)); - QByteArray avatarByteArray; - avatarByteArray.append(otherNode->getUUID().toRfc4122()); - avatarByteArray.append(otherAvatar.toByteArray()); - - if (avatarByteArray.size() + mixedAvatarByteArray.size() > MAX_PACKET_SIZE) { - nodeList->writeDatagram(mixedAvatarByteArray, node); + // start a new segment in the PacketList for this avatar + avatarPacketList.startSegment(); + + numAvatarDataBytes += avatarPacketList.write(otherNode->getUUID().toRfc4122()); + numAvatarDataBytes += avatarPacketList.write(otherAvatar.toByteArray()); + + avatarPacketList.endSegment(); - numAvatarDataBytes += mixedAvatarByteArray.size(); - - // reset the packet - mixedAvatarByteArray.resize(numPacketHeaderBytes); - } - - // copy the avatar into the mixedAvatarByteArray packet - mixedAvatarByteArray.append(avatarByteArray); - // if the receiving avatar has just connected make sure we send out the mesh and billboard // for this avatar (assuming they exist) bool forceSend = !nodeData->checkAndSetHasReceivedFirstPackets(); - + // we will also force a send of billboard or identity packet // if either has changed in the last frame - + if (otherNodeData->getBillboardChangeTimestamp() > 0 && (forceSend || otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - QByteArray billboardPacket = nodeList->byteArrayWithPopulatedHeader(PacketTypeAvatarBillboard); - billboardPacket.append(otherNode->getUUID().toRfc4122()); - billboardPacket.append(otherNodeData->getAvatar().getBillboard()); - nodeList->writeDatagram(billboardPacket, node); - + auto billboardPacket { NLPacket::create(PacketType::AvatarBillboard); } + billboardPacket.write(otherNode->getUUID().toRfc4122()); + billboardPacket.write(otherNodeData->getAvatar().getBillboard()); + + nodeList->sendPacket(billboardPacket, node); + ++_sumBillboardPackets; } - + if (otherNodeData->getIdentityChangeTimestamp() > 0 && (forceSend || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - - QByteArray identityPacket = nodeList->byteArrayWithPopulatedHeader(PacketTypeAvatarIdentity); - + + auto identityPacket { NLPacket::create(PacketType::AvatarIdentity); } + QByteArray individualData = otherNodeData->getAvatar().identityByteArray(); individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122()); - identityPacket.append(individualData); - - nodeList->writeDatagram(identityPacket, node); - + + identityPacket.write(individualData); + + nodeList->sendPacket(identityPacket, node); + ++_sumIdentityPackets; } }); - - // send the last packet - nodeList->writeDatagram(mixedAvatarByteArray, node); - + + // send the avatar data PacketList + nodeList->sendPacketList(avatarPacketList, node); + // record the bytes sent for other avatar data in the AvatarMixerClientData - nodeData->recordSentAvatarData(numAvatarDataBytes + mixedAvatarByteArray.size()); - + nodeData->recordSentAvatarData(numAvatarDataBytes); + // record the number of avatars held back this frame nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); @@ -359,7 +350,7 @@ void AvatarMixer::broadcastAvatarData() { } } ); - + _lastFrameTimestamp = QDateTime::currentMSecsSinceEpoch(); } @@ -370,9 +361,9 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) { // this was an avatar we were sending to other people // send a kill packet for it to our other nodes - QByteArray killPacket = nodeList->byteArrayWithPopulatedHeader(PacketTypeKillAvatar); - killPacket += killedNode->getUUID().toRfc4122(); - + auto killPacket { NLPacket::create(PacketType::KillAvatar); } + killPacket.write(killedNode->getUUID().toRfc4122()); + nodeList->broadcastToNodes(killPacket, NodeSet() << NodeType::Agent); // we also want to remove sequence number data for this avatar on our other avatars @@ -402,9 +393,9 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) { void AvatarMixer::readPendingDatagrams() { QByteArray receivedPacket; HifiSockAddr senderSockAddr; - + auto nodeList = DependencyManager::get(); - + while (readAvailableDatagram(receivedPacket, senderSockAddr)) { if (nodeList->packetVersionAndHashMatch(receivedPacket)) { switch (packetTypeForPacket(receivedPacket)) { @@ -413,14 +404,14 @@ void AvatarMixer::readPendingDatagrams() { break; } case PacketTypeAvatarIdentity: { - + // check if we have a matching node in our list SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(receivedPacket); - + if (avatarNode && avatarNode->getLinkedData()) { AvatarMixerClientData* nodeData = reinterpret_cast(avatarNode->getLinkedData()); AvatarData& avatar = nodeData->getAvatar(); - + // parse the identity packet and update the change timestamp if appropriate if (avatar.hasIdentityChangedAfterParsing(receivedPacket)) { QMutexLocker nodeDataLocker(&nodeData->getMutex()); @@ -430,20 +421,20 @@ void AvatarMixer::readPendingDatagrams() { break; } case PacketTypeAvatarBillboard: { - + // check if we have a matching node in our list SharedNodePointer avatarNode = nodeList->sendingNodeForPacket(receivedPacket); - + if (avatarNode && avatarNode->getLinkedData()) { AvatarMixerClientData* nodeData = static_cast(avatarNode->getLinkedData()); AvatarData& avatar = nodeData->getAvatar(); - + // parse the billboard packet and update the change timestamp if appropriate if (avatar.hasBillboardChangedAfterParsing(receivedPacket)) { QMutexLocker nodeDataLocker(&nodeData->getMutex()); nodeData->setBillboardChangeTimestamp(QDateTime::currentMSecsSinceEpoch()); } - + } break; } @@ -463,15 +454,15 @@ void AvatarMixer::readPendingDatagrams() { void AvatarMixer::sendStatsPacket() { QJsonObject statsObject; statsObject["average_listeners_last_second"] = (float) _sumListeners / (float) _numStatFrames; - + statsObject["average_billboard_packets_per_frame"] = (float) _sumBillboardPackets / (float) _numStatFrames; statsObject["average_identity_packets_per_frame"] = (float) _sumIdentityPackets / (float) _numStatFrames; - + statsObject["trailing_sleep_percentage"] = _trailingSleepRatio * 100; statsObject["performance_throttling_ratio"] = _performanceThrottlingRatio; QJsonObject avatarsObject; - + auto nodeList = DependencyManager::get(); // add stats for each listerner nodeList->eachNode([&](const SharedNodePointer& node) { @@ -482,7 +473,7 @@ void AvatarMixer::sendStatsPacket() { // add the key to ask the domain-server for a username replacement, if it has it avatarStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundBandwidth(); - + AvatarMixerClientData* clientData = static_cast(node->getLinkedData()); if (clientData) { MutexTryLocker lock(clientData->getMutex()); @@ -490,9 +481,9 @@ void AvatarMixer::sendStatsPacket() { clientData->loadJSONStats(avatarStats); // add the diff between the full outbound bandwidth and the measured bandwidth for AvatarData send only - avatarStats["delta_full_vs_avatar_data_kbps"] = + avatarStats["delta_full_vs_avatar_data_kbps"] = avatarStats[NODE_OUTBOUND_KBPS_STAT_KEY].toDouble() - avatarStats[OUTBOUND_AVATAR_DATA_STATS_KEY].toDouble(); - } + } } avatarsObject[uuidStringWithoutCurlyBraces(node->getUUID())] = avatarStats; @@ -500,7 +491,7 @@ void AvatarMixer::sendStatsPacket() { statsObject["avatars"] = avatarsObject; ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); - + _sumListeners = 0; _sumBillboardPackets = 0; _sumIdentityPackets = 0; @@ -509,41 +500,41 @@ void AvatarMixer::sendStatsPacket() { void AvatarMixer::run() { ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer); - + auto nodeList = DependencyManager::get(); nodeList->addNodeTypeToInterestSet(NodeType::Agent); - + nodeList->linkedDataCreateCallback = [] (Node* node) { node->setLinkedData(new AvatarMixerClientData()); }; - + // setup the timer that will be fired on the broadcast thread _broadcastTimer = new QTimer; _broadcastTimer->setInterval(AVATAR_DATA_SEND_INTERVAL_MSECS); _broadcastTimer->moveToThread(&_broadcastThread); - + // connect appropriate signals and slots connect(_broadcastTimer, &QTimer::timeout, this, &AvatarMixer::broadcastAvatarData, Qt::DirectConnection); connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start())); // wait until we have the domain-server settings, otherwise we bail DomainHandler& domainHandler = nodeList->getDomainHandler(); - + qDebug() << "Waiting for domain settings from domain-server."; - + // block until we get the settingsRequestComplete signal QEventLoop loop; connect(&domainHandler, &DomainHandler::settingsReceived, &loop, &QEventLoop::quit); connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit); domainHandler.requestDomainSettings(); loop.exec(); - + if (domainHandler.getSettingsObject().isEmpty()) { qDebug() << "Failed to retreive settings object from domain-server. Bailing on assignment."; setFinished(true); return; } - + // parse the settings to pull out the values we need parseDomainServerSettings(domainHandler.getSettingsObject()); @@ -554,13 +545,13 @@ void AvatarMixer::run() { void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { const QString AVATAR_MIXER_SETTINGS_KEY = "avatar_mixer"; const QString NODE_SEND_BANDWIDTH_KEY = "max_node_send_bandwidth"; - + const float DEFAULT_NODE_SEND_BANDWIDTH = 1.0f; QJsonValue nodeBandwidthValue = domainSettings[AVATAR_MIXER_SETTINGS_KEY].toObject()[NODE_SEND_BANDWIDTH_KEY]; if (!nodeBandwidthValue.isDouble()) { qDebug() << NODE_SEND_BANDWIDTH_KEY << "is not a double - will continue with default value"; - } + } _maxKbpsPerNode = nodeBandwidthValue.toDouble(DEFAULT_NODE_SEND_BANDWIDTH) * KILO_PER_MEGA; - qDebug() << "The maximum send bandwidth per node is" << _maxKbpsPerNode << "kbps."; + qDebug() << "The maximum send bandwidth per node is" << _maxKbpsPerNode << "kbps."; }