diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index f8c2cf86e8..2bbd6a1950 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -125,7 +125,28 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); replicatedNode->setIsUpstream(true); - getOrCreateClientData(replicatedNode.data())->queuePacket(message, replicatedNode); + // construct a "fake" audio received message from the byte array and packet list information + auto audioData = message->getMessage().mid(NUM_BYTES_RFC4122_UUID); + + PacketType rewrittenType; + + if (message->getType() == PacketType::ReplicatedMicrophoneAudioNoEcho) { + rewrittenType = PacketType::MicrophoneAudioNoEcho; + } else if (message->getType() == PacketType::ReplicatedMicrophoneAudioWithEcho) { + rewrittenType = PacketType::MicrophoneAudioWithEcho; + } else if (message->getType() == PacketType::ReplicatedInjectAudio) { + rewrittenType = PacketType::InjectAudio; + } else if (message->getType() == PacketType::ReplicatedSilentAudioFrame) { + rewrittenType = PacketType::SilentAudioFrame; + } else { + return; + } + + auto replicatedMessage = QSharedPointer::create(audioData, rewrittenType, + versionForPacketType(rewrittenType), + message->getSenderSockAddr(), nodeID); + + getOrCreateClientData(replicatedNode.data())->queuePacket(replicatedMessage, replicatedNode); } void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer message, SharedNodePointer sendingNode) { diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 848dc969d8..7641000a7a 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -160,16 +160,12 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c if (!isReplicatedPacket(message.getType())) { // since this packet will be non-sourced, we add the replicated node's ID here packet->write(node.getUUID().toRfc4122()); - - // we won't negotiate an audio format with the replicant, because we aren't a listener - // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs - packet->writeString(_selectedCodecName); } packet->write(message.getMessage()); } - nodeList->sendUnreliablePacket(*packet, downstreamNode->getPublicSocket()); + nodeList->sendUnreliablePacket(*packet, *downstreamNode); } }); } @@ -312,6 +308,7 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { // this is injected audio // grab the stream identifier for this injected audio message.seek(sizeof(quint16)); + QUuid streamIdentifier = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID)); bool isStereo; @@ -346,18 +343,6 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { // seek to the beginning of the packet so that the next reader is in the right spot message.seek(0); - if (packetType == PacketType::ReplicatedMicrophoneAudioWithEcho - || packetType == PacketType::ReplicatedMicrophoneAudioNoEcho - || packetType == PacketType::ReplicatedSilentAudioFrame - || packetType == PacketType::ReplicatedInjectAudio) { - - // skip past source ID for the replicated packet - message.seek(NUM_BYTES_RFC4122_UUID); - - // skip past the codec string - message.readString(); - } - // check the overflow count before we parse data auto overflowBefore = matchingStream->getOverflowCount(); auto parseResult = matchingStream->parseData(message); @@ -706,9 +691,10 @@ bool AudioMixerClientData::shouldIgnore(const SharedNodePointer self, const Shar } void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer message) { - // first pull the codec string from the packet + // hop past the sequence number that leads the packet + message->seek(sizeof(quint16)); - // read the string for the codec + // pull the codec string from the packet auto codecString = message->readString(); qDebug() << "Manually setting codec for replicated agent" << uuidStringWithoutCurlyBraces(getNodeID()) @@ -718,4 +704,7 @@ void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointerseek(0); } diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 870149f1bc..32222a6406 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -54,10 +54,95 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) : packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket"); packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket"); + packetReceiver.registerListenerForTypes({ + PacketType::ReplicatedAvatarIdentity, + PacketType::ReplicatedKillAvatar + }, this, "handleReplicatedPackets"); + + packetReceiver.registerListener(PacketType::ReplicatedBulkAvatarData, this, "handleReplicatedBulkAvatarPacket"); + auto nodeList = DependencyManager::get(); connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch); } +SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) { + auto replicatedNode = DependencyManager::get()->addOrUpdateNode(nodeID, NodeType::Agent, + senderSockAddr, + senderSockAddr, + DEFAULT_AGENT_PERMISSIONS, true); + + replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); + replicatedNode->setIsUpstream(true); + + return replicatedNode; +} + +void AvatarMixer::handleReplicatedPackets(QSharedPointer message) { + auto nodeList = DependencyManager::get(); + auto nodeID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID)); + + auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr()); + + if (message->getType() == PacketType::ReplicatedAvatarIdentity) { + handleAvatarIdentityPacket(message, replicatedNode); + } else if (message->getType() == PacketType::ReplicatedKillAvatar) { + handleKillAvatarPacket(message, replicatedNode); + } +} + +void AvatarMixer::handleReplicatedBulkAvatarPacket(QSharedPointer message) { + auto nodeList = DependencyManager::get(); + + while (message->getBytesLeftToRead()) { + // first, grab the node ID for this replicated avatar + auto nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); + + // make sure we have an upstream replicated node that matches + auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr()); + + // grab the size of the avatar byte array so we know how much to read + quint16 avatarByteArraySize; + message->readPrimitive(&avatarByteArraySize); + + // read the avatar byte array + auto avatarByteArray = message->read(avatarByteArraySize); + + // construct a "fake" avatar data received message from the byte array and packet list information + auto replicatedMessage = QSharedPointer::create(avatarByteArray, PacketType::AvatarData, + versionForPacketType(PacketType::AvatarData), + message->getSenderSockAddr(), nodeID); + + // queue up the replicated avatar data with the client data for the replicated node + auto start = usecTimestampNow(); + getOrCreateClientData(replicatedNode)->queuePacket(replicatedMessage, replicatedNode); + auto end = usecTimestampNow(); + _queueIncomingPacketElapsedTime += (end - start); + } +} + +void AvatarMixer::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) { + // first, make sure that this is a packet from a node we are supposed to replicate + if (node.isReplicated() + && (message.getType() == PacketType::KillAvatar || message.getType() == PacketType::ReplicatedKillAvatar)) { + PacketType replicatedType = PacketType::ReplicatedKillAvatar; + + std::unique_ptr packet; + + auto nodeList = DependencyManager::get(); + nodeList->eachMatchingNode([&](const SharedNodePointer& node) { + return node->getType() == NodeType::DownstreamAvatarMixer; + }, [&](const SharedNodePointer& node) { + if (!packet) { + // construct an NLPacket to send to the replicant that has the contents of the received packet + packet = NLPacket::create(replicatedType, message.getSize()); + packet->write(message.getMessage()); + } + + nodeList->sendUnreliablePacket(*packet, *node); + }); + } +} + void AvatarMixer::queueIncomingPacket(QSharedPointer message, SharedNodePointer node) { auto start = usecTimestampNow(); getOrCreateClientData(node)->queuePacket(message, node); @@ -70,12 +155,14 @@ AvatarMixer::~AvatarMixer() { } void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { - QByteArray individualData = nodeData->getAvatar().identityByteArray(); - individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); - auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); - identityPackets->write(individualData); - DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); - ++_sumIdentityPackets; + if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) { + QByteArray individualData = nodeData->getAvatar().identityByteArray(); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); + auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); + identityPackets->write(individualData); + DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); + ++_sumIdentityPackets; + } } std::chrono::microseconds AvatarMixer::timeFrame(p_high_resolution_clock::time_point& timestamp) { @@ -279,13 +366,38 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) { } } + std::unique_ptr killPacket; + std::unique_ptr replicatedKillPacket; + // this was an avatar we were sending to other people // send a kill packet for it to our other nodes - auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); - killPacket->write(killedNode->getUUID().toRfc4122()); - killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + nodeList->eachMatchingNode([&](const SharedNodePointer& node) { + // we relay avatar kill packets to agents that are not upstream + // and downstream avatar mixers, if the node that was just killed was being replicated + return (node->getType() == NodeType::Agent && !node->isUpstream()) + || (killedNode->isReplicated() && node->getType() == NodeType::DownstreamAvatarMixer); + }, [&](const SharedNodePointer& node) { + if (node->getType() == NodeType::Agent) { + if (!killPacket) { + killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); + killPacket->write(killedNode->getUUID().toRfc4122()); + killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + } + + nodeList->sendUnreliablePacket(*killPacket, *node); + } else { + // send a replicated kill packet to the downstream avatar mixer + if (!replicatedKillPacket) { + replicatedKillPacket = NLPacket::create(PacketType::ReplicatedKillAvatar, + NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); + replicatedKillPacket->write(killedNode->getUUID().toRfc4122()); + replicatedKillPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + } + + nodeList->sendUnreliablePacket(*replicatedKillPacket, *node); + } + }); - nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); // we also want to remove sequence number data for this avatar on our other avatars // so invoke the appropriate method on the AvatarMixerClientData for other avatars @@ -398,11 +510,9 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer mes AvatarData& avatar = nodeData->getAvatar(); // parse the identity packet and update the change timestamp if appropriate - AvatarData::Identity identity; - AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity); bool identityChanged = false; bool displayNameChanged = false; - avatar.processAvatarIdentity(identity, identityChanged, displayNameChanged); + avatar.processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged); if (identityChanged) { QMutexLocker nodeDataLocker(&nodeData->getMutex()); nodeData->flagIdentityChange(); @@ -416,11 +526,13 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer mes _handleAvatarIdentityPacketElapsedTime += (end - start); } -void AvatarMixer::handleKillAvatarPacket(QSharedPointer message) { +void AvatarMixer::handleKillAvatarPacket(QSharedPointer message, SharedNodePointer node) { auto start = usecTimestampNow(); DependencyManager::get()->processKillNode(*message); auto end = usecTimestampNow(); _handleKillAvatarPacketElapsedTime += (end - start); + + optionallyReplicatePacket(*message, *node); } void AvatarMixer::handleNodeIgnoreRequestPacket(QSharedPointer message, SharedNodePointer senderNode) { @@ -672,7 +784,6 @@ void AvatarMixer::run() { connect(&domainHandler, &DomainHandler::settingsReceiveFail, this, &AvatarMixer::domainSettingsRequestFailed); ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer); - } AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node) { @@ -764,4 +875,10 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale << "and a maximum avatar scale of" << _domainMaximumScale; + + parseDownstreamServers(domainSettings, NodeType::AvatarMixer, [](Node& node) { + if (!node.getLinkedData()) { + node.setLinkedData(std::unique_ptr { new AvatarMixerClientData(node.getUUID()) }); + } + }); } diff --git a/assignment-client/src/avatars/AvatarMixer.h b/assignment-client/src/avatars/AvatarMixer.h index 1925ec1ebd..e0d073a281 100644 --- a/assignment-client/src/avatars/AvatarMixer.h +++ b/assignment-client/src/avatars/AvatarMixer.h @@ -42,10 +42,12 @@ private slots: void handleAdjustAvatarSorting(QSharedPointer message, SharedNodePointer senderNode); void handleViewFrustumPacket(QSharedPointer message, SharedNodePointer senderNode); void handleAvatarIdentityPacket(QSharedPointer message, SharedNodePointer senderNode); - void handleKillAvatarPacket(QSharedPointer message); + void handleKillAvatarPacket(QSharedPointer message, SharedNodePointer senderNode); void handleNodeIgnoreRequestPacket(QSharedPointer message, SharedNodePointer senderNode); void handleRadiusIgnoreRequestPacket(QSharedPointer packet, SharedNodePointer sendingNode); void handleRequestsDomainListDataPacket(QSharedPointer message, SharedNodePointer senderNode); + void handleReplicatedPackets(QSharedPointer message); + void handleReplicatedBulkAvatarPacket(QSharedPointer message); void domainSettingsRequestComplete(); void handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID); void start(); @@ -61,6 +63,8 @@ private: void manageDisplayName(const SharedNodePointer& node); + void optionallyReplicatePacket(ReceivedMessage& message, const Node& node); + p_high_resolution_clock::time_point _lastFrameTimestamp; // FIXME - new throttling - use these values somehow diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 2ad8bb58ed..e5a0a1f4fd 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -65,15 +65,32 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) { _stats.processIncomingPacketsElapsedTime += (end - start); } - int AvatarMixerSlave::sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { - QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); - individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious - auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); - identityPackets->write(individualData); - DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); - _stats.numIdentityPackets++; - return individualData.size(); + if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) { + QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(true); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious + auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); + identityPackets->write(individualData); + DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); + _stats.numIdentityPackets++; + return individualData.size(); + } else { + return 0; + } +} + +int AvatarMixerSlave::sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { + if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) { + QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(true); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious + auto identityPacket = NLPacket::create(PacketType::ReplicatedAvatarIdentity); + identityPacket->write(individualData); + DependencyManager::get()->sendUnreliablePacket(*identityPacket, *destinationNode); + _stats.numIdentityPackets++; + return individualData.size(); + } else { + return 0; + } } static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; @@ -81,6 +98,18 @@ static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { quint64 start = usecTimestampNow(); + if (node->getType() == NodeType::Agent && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) { + broadcastAvatarDataToAgent(node); + } else if (node->getType() == NodeType::DownstreamAvatarMixer) { + broadcastAvatarDataToDownstreamMixer(node); + } + + quint64 end = usecTimestampNow(); + _stats.jobElapsedTime += (end - start); +} + +void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) { + auto nodeList = DependencyManager::get(); // setup for distributed random floating point values @@ -88,331 +117,445 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { std::mt19937 generator(randomDevice()); std::uniform_real_distribution distribution; - if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) { - _stats.nodesBroadcastedTo++; + _stats.nodesBroadcastedTo++; - AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); - nodeData->resetInViewStats(); + nodeData->resetInViewStats(); - const AvatarData& avatar = nodeData->getAvatar(); - glm::vec3 myPosition = avatar.getClientGlobalPosition(); + const AvatarData& avatar = nodeData->getAvatar(); + glm::vec3 myPosition = avatar.getClientGlobalPosition(); - // reset the internal state for correct random number distribution - distribution.reset(); + // reset the internal state for correct random number distribution + distribution.reset(); - // reset the number of sent avatars - nodeData->resetNumAvatarsSentLastFrame(); + // reset the number of sent avatars + nodeData->resetNumAvatarsSentLastFrame(); - // keep a counter of the number of considered avatars - int numOtherAvatars = 0; + // keep a counter of the number of considered avatars + int numOtherAvatars = 0; - // keep track of outbound data rate specifically for avatar data - int numAvatarDataBytes = 0; - int identityBytesSent = 0; + // keep track of outbound data rate specifically for avatar data + int numAvatarDataBytes = 0; + int identityBytesSent = 0; - // max number of avatarBytes per frame - auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND; + // max number of avatarBytes per frame + auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND; - // FIXME - find a way to not send the sessionID for every avatar - int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID; + // FIXME - find a way to not send the sessionID for every avatar + int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID; - int overBudgetAvatars = 0; + int overBudgetAvatars = 0; - // keep track of the number of other avatars held back in this frame - int numAvatarsHeldBack = 0; + // keep track of the number of other avatars held back in this frame + int numAvatarsHeldBack = 0; - // keep track of the number of other avatar frames skipped - int numAvatarsWithSkippedFrames = 0; + // keep track of the number of other avatar frames skipped + int numAvatarsWithSkippedFrames = 0; - // When this is true, the AvatarMixer will send Avatar data to a client - // about avatars they've ignored or that are out of view - bool PALIsOpen = nodeData->getRequestsDomainListData(); + // When this is true, the AvatarMixer will send Avatar data to a client + // about avatars they've ignored or that are out of view + bool PALIsOpen = nodeData->getRequestsDomainListData(); - // When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them - bool getsAnyIgnored = PALIsOpen && node->getCanKick(); + // When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them + bool getsAnyIgnored = PALIsOpen && node->getCanKick(); - if (PALIsOpen) { - // Increase minimumBytesPerAvatar if the PAL is open - minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) + - sizeof(AvatarDataPacket::AudioLoudness); - } + if (PALIsOpen) { + // Increase minimumBytesPerAvatar if the PAL is open + minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) + + sizeof(AvatarDataPacket::AudioLoudness); + } - // setup a PacketList for the avatarPackets - auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData); + // setup a PacketList for the avatarPackets + auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData); - // Define the minimum bubble size - static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f); - // Define the scale of the box for the current node - glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f; - // Set up the bounding box for the current node - AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale); - // Clamp the size of the bounding box to a minimum scale - if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) { - nodeBox.setScaleStayCentered(minBubbleSize); - } - // Quadruple the scale of both bounding boxes - nodeBox.embiggen(4.0f); + // Define the minimum bubble size + static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f); + // Define the scale of the box for the current node + glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f; + // Set up the bounding box for the current node + AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale); + // Clamp the size of the bounding box to a minimum scale + if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) { + nodeBox.setScaleStayCentered(minBubbleSize); + } + // Quadruple the scale of both bounding boxes + nodeBox.embiggen(4.0f); - // setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes - // for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes - QList avatarList; - std::unordered_map avatarDataToNodes; + // setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes + // for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes + QList avatarList; + std::unordered_map avatarDataToNodes; - std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) { + std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) { + // make sure this is an agent that we have avatar data for before considering it for inclusion + if (otherNode->getType() == NodeType::Agent + && otherNode->getLinkedData()) { const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - // theoretically it's possible for a Node to be in the NodeList (and therefore end up here), - // but not have yet sent data that's linked to the node. Check for that case and don't - // consider those nodes. - if (otherNodeData) { - AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer(); - avatarList << otherAvatar; - avatarDataToNodes[otherAvatar] = otherNode; + AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer(); + avatarList << otherAvatar; + avatarDataToNodes[otherAvatar] = otherNode; + } + }); + + AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer(); + ViewFrustum cameraView = nodeData->getViewFrustom(); + std::priority_queue sortedAvatars; + AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars, + [&](AvatarSharedPointer avatar)->uint64_t { + auto avatarNode = avatarDataToNodes[avatar]; + assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + return nodeData->getLastBroadcastTime(avatarNode->getUUID()); + }, [&](AvatarSharedPointer avatar)->float{ + glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner()); + return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z)); + }, [&](AvatarSharedPointer avatar)->bool { + if (avatar == thisAvatar) { + return true; // ignore ourselves... + } + + bool shouldIgnore = false; + + // We will also ignore other nodes for a couple of different reasons: + // 1) ignore bubbles and ignore specific node + // 2) the node hasn't really updated it's frame data recently, this can + // happen if for example the avatar is connected on a desktop and sending + // updates at ~30hz. So every 3 frames we skip a frame. + auto avatarNode = avatarDataToNodes[avatar]; + + assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + + const AvatarMixerClientData* avatarNodeData = reinterpret_cast(avatarNode->getLinkedData()); + assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data + quint64 startIgnoreCalculation = usecTimestampNow(); + + // make sure we have data for this avatar, that it isn't the same node, + // and isn't an avatar that the viewing node has ignored + // or that has ignored the viewing node + if (!avatarNode->getLinkedData() + || avatarNode->getUUID() == node->getUUID() + || (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen) + || (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) { + shouldIgnore = true; + } else { + + // Check to see if the space bubble is enabled + // Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored + if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) { + + // Define the scale of the box for the current other node + glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f; + // Set up the bounding box for the current other node + AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); + // Clamp the size of the bounding box to a minimum scale + if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) { + otherNodeBox.setScaleStayCentered(minBubbleSize); + } + // Quadruple the scale of both bounding boxes + otherNodeBox.embiggen(4.0f); + + // Perform the collision check between the two bounding boxes + if (nodeBox.touches(otherNodeBox)) { + nodeData->ignoreOther(node, avatarNode); + shouldIgnore = !getsAnyIgnored; + } } - }); + // Not close enough to ignore + if (!shouldIgnore) { + nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID()); + } + } + quint64 endIgnoreCalculation = usecTimestampNow(); + _stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation); - AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer(); - ViewFrustum cameraView = nodeData->getViewFrustom(); - std::priority_queue sortedAvatars; - AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars, + if (!shouldIgnore) { + AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID()); + AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber(); - [&](AvatarSharedPointer avatar)->uint64_t{ - auto avatarNode = avatarDataToNodes[avatar]; - assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map - return nodeData->getLastBroadcastTime(avatarNode->getUUID()); - }, + // FIXME - This code does appear to be working. But it seems brittle. + // It supports determining if the frame of data for this "other" + // avatar has already been sent to the reciever. This has been + // verified to work on a desktop display that renders at 60hz and + // therefore sends to mixer at 30hz. Each second you'd expect to + // have 15 (45hz-30hz) duplicate frames. In this case, the stat + // avg_other_av_skips_per_second does report 15. + // + // make sure we haven't already sent this data from this sender to this receiver + // or that somehow we haven't sent + if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { + ++numAvatarsHeldBack; + shouldIgnore = true; + } else if (lastSeqFromSender - lastSeqToReceiver > 1) { + // this is a skip - we still send the packet but capture the presence of the skip so we see it happening + ++numAvatarsWithSkippedFrames; + } + } + return shouldIgnore; + }); - [&](AvatarSharedPointer avatar)->float{ - glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner()); - return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z)); - }, + // loop through our sorted avatars and allocate our bandwidth to them accordingly + int avatarRank = 0; - [&](AvatarSharedPointer avatar)->bool{ - if (avatar == thisAvatar) { - return true; // ignore ourselves... - } + // this is overly conservative, because it includes some avatars we might not consider + int remainingAvatars = (int)sortedAvatars.size(); - bool shouldIgnore = false; + while (!sortedAvatars.empty()) { + AvatarPriority sortData = sortedAvatars.top(); + sortedAvatars.pop(); + const auto& avatarData = sortData.avatar; + avatarRank++; + remainingAvatars--; - // We will also ignore other nodes for a couple of different reasons: - // 1) ignore bubbles and ignore specific node - // 2) the node hasn't really updated it's frame data recently, this can - // happen if for example the avatar is connected on a desktop and sending - // updates at ~30hz. So every 3 frames we skip a frame. - auto avatarNode = avatarDataToNodes[avatar]; + auto otherNode = avatarDataToNodes[avatarData]; + assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map - assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + // NOTE: Here's where we determine if we are over budget and drop to bare minimum data + int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars; + bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame; - const AvatarMixerClientData* avatarNodeData = reinterpret_cast(avatarNode->getLinkedData()); - assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data - quint64 startIgnoreCalculation = usecTimestampNow(); + quint64 startAvatarDataPacking = usecTimestampNow(); - // make sure we have data for this avatar, that it isn't the same node, - // and isn't an avatar that the viewing node has ignored - // or that has ignored the viewing node - if (!avatarNode->getLinkedData() - || avatarNode->getUUID() == node->getUUID() - || (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen) - || (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) { - shouldIgnore = true; - } else { + ++numOtherAvatars; - // Check to see if the space bubble is enabled - // Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored - if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) { + const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - // Define the scale of the box for the current other node - glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f; - // Set up the bounding box for the current other node - AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); - // Clamp the size of the bounding box to a minimum scale - if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) { - otherNodeBox.setScaleStayCentered(minBubbleSize); - } - // Quadruple the scale of both bounding boxes - otherNodeBox.embiggen(4.0f); + // If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO + // the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A. + if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) { + identityBytesSent += sendIdentityPacket(otherNodeData, node); + } - // Perform the collision check between the two bounding boxes - if (nodeBox.touches(otherNodeBox)) { - nodeData->ignoreOther(node, avatarNode); - shouldIgnore = !getsAnyIgnored; - } - } - // Not close enough to ignore - if (!shouldIgnore) { - nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID()); - } - } - quint64 endIgnoreCalculation = usecTimestampNow(); - _stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation); + const AvatarData* otherAvatar = otherNodeData->getConstAvatarData(); + glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition(); - if (!shouldIgnore) { - AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID()); - AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber(); + // determine if avatar is in view, to determine how much data to include... + glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f; + AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); + bool isInView = nodeData->otherAvatarInView(otherNodeBox); - // FIXME - This code does appear to be working. But it seems brittle. - // It supports determining if the frame of data for this "other" - // avatar has already been sent to the reciever. This has been - // verified to work on a desktop display that renders at 60hz and - // therefore sends to mixer at 30hz. Each second you'd expect to - // have 15 (45hz-30hz) duplicate frames. In this case, the stat - // avg_other_av_skips_per_second does report 15. - // - // make sure we haven't already sent this data from this sender to this receiver - // or that somehow we haven't sent - if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { - ++numAvatarsHeldBack; - shouldIgnore = true; - } else if (lastSeqFromSender - lastSeqToReceiver > 1) { - // this is a skip - we still send the packet but capture the presence of the skip so we see it happening - ++numAvatarsWithSkippedFrames; - } - } - return shouldIgnore; - }); + // start a new segment in the PacketList for this avatar + avatarPacketList->startSegment(); - // loop through our sorted avatars and allocate our bandwidth to them accordingly - int avatarRank = 0; + AvatarData::AvatarDataDetail detail; - // this is overly conservative, because it includes some avatars we might not consider - int remainingAvatars = (int)sortedAvatars.size(); + if (overBudget) { + overBudgetAvatars++; + _stats.overBudgetAvatars++; + detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData; + } else if (!isInView) { + detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData; + nodeData->incrementAvatarOutOfView(); + } else { + detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO + ? AvatarData::SendAllData : AvatarData::CullSmallData; + nodeData->incrementAvatarInView(); + } - while (!sortedAvatars.empty()) { - AvatarPriority sortData = sortedAvatars.top(); - sortedAvatars.pop(); - const auto& avatarData = sortData.avatar; - avatarRank++; - remainingAvatars--; + bool includeThisAvatar = true; + auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID()); + QVector& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID()); + bool distanceAdjust = true; + glm::vec3 viewerPosition = myPosition; + AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray + bool dropFaceTracking = false; - auto otherNode = avatarDataToNodes[avatarData]; - assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map + quint64 start = usecTimestampNow(); + QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + quint64 end = usecTimestampNow(); + _stats.toByteArrayElapsedTime += (end - start); - // NOTE: Here's where we determine if we are over budget and drop to bare minimum data - int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars; - bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame; + static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID); + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data"; + + dropFaceTracking = true; // first try dropping the facial data + bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData"; + bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + } + + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!"; + includeThisAvatar = false; + } + } + + if (includeThisAvatar) { + numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122()); + numAvatarDataBytes += avatarPacketList->write(bytes); + + if (detail != AvatarData::NoData) { + _stats.numOthersIncluded++; + + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); + + // set the last sent sequence number for this sender on the receiver + nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), + otherNodeData->getLastReceivedSequenceNumber()); + + // remember the last time we sent details about this other node to the receiver + nodeData->setLastBroadcastTime(otherNode->getUUID(), start); + } + } + + avatarPacketList->endSegment(); + + quint64 endAvatarDataPacking = usecTimestampNow(); + _stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking); + }; + + quint64 startPacketSending = usecTimestampNow(); + + // close the current packet so that we're always sending something + avatarPacketList->closeCurrentPacket(true); + + _stats.numPacketsSent += (int)avatarPacketList->getNumPackets(); + _stats.numBytesSent += numAvatarDataBytes; + + // send the avatar data PacketList + nodeList->sendPacketList(std::move(avatarPacketList), *node); + + // record the bytes sent for other avatar data in the AvatarMixerClientData + nodeData->recordSentAvatarData(numAvatarDataBytes); + + // record the number of avatars held back this frame + nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); + nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); + + quint64 endPacketSending = usecTimestampNow(); + _stats.packetSendingElapsedTime += (endPacketSending - startPacketSending); +} + +uint64_t REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US = 5 * 1000 * 1000; + +void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) { + _stats.downstreamMixersBroadcastedTo++; + + AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + + // setup a PacketList for the replicated bulk avatar data + auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData); + + int numAvatarDataBytes = 0; + + // reset the number of sent avatars + nodeData->resetNumAvatarsSentLastFrame(); + + std::for_each(_begin, _end, [&](const SharedNodePointer& agentNode) { + // collect agents that we have avatar data for that we are supposed to replicate + if (agentNode->getType() == NodeType::Agent && agentNode->getLinkedData() && agentNode->isReplicated()) { + const AvatarMixerClientData* agentNodeData = reinterpret_cast(agentNode->getLinkedData()); + + AvatarSharedPointer otherAvatar = agentNodeData->getAvatarSharedPointer(); quint64 startAvatarDataPacking = usecTimestampNow(); - ++numOtherAvatars; - - const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - - // If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO - // the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A. - if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) { - identityBytesSent += sendIdentityPacket(otherNodeData, node); - } - - const AvatarData* otherAvatar = otherNodeData->getConstAvatarData(); - glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition(); - - // determine if avatar is in view, to determine how much data to include... - glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f; - AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); - bool isInView = nodeData->otherAvatarInView(otherNodeBox); - - // start a new segment in the PacketList for this avatar - avatarPacketList->startSegment(); - - AvatarData::AvatarDataDetail detail; - - if (overBudget) { - overBudgetAvatars++; - _stats.overBudgetAvatars++; - detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData; - } else if (!isInView) { - detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData; - nodeData->incrementAvatarOutOfView(); - } else { - detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO - ? AvatarData::SendAllData : AvatarData::CullSmallData; - nodeData->incrementAvatarInView(); - } - - bool includeThisAvatar = true; - auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID()); - QVector& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID()); - bool distanceAdjust = true; - glm::vec3 viewerPosition = myPosition; - AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray - bool dropFaceTracking = false; + // we cannot send a downstream avatar mixer any updates that expect them to have previous state for this avatar + // since we have no idea if they're online and receiving our packets + // so we always send a full update for this avatar + quint64 start = usecTimestampNow(); - QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + AvatarDataPacket::HasFlags flagsOut; + + QVector emptyLastJointSendData { otherAvatar->getJointCount() }; + + QByteArray avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData, + flagsOut, false, false, glm::vec3(0), nullptr); quint64 end = usecTimestampNow(); _stats.toByteArrayElapsedTime += (end - start); - static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID); - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data"; + auto lastBroadcastTime = nodeData->getLastBroadcastTime(agentNode->getUUID()); + if (lastBroadcastTime <= agentNodeData->getIdentityChangeTimestamp() + || (start - lastBroadcastTime) >= REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US) { + qDebug() << "Sending identity packet for " << agentNode->getUUID() << " to " << node->getUUID(); + sendReplicatedIdentityPacket(agentNodeData, node); + nodeData->setLastBroadcastTime(agentNode->getUUID(), start); + } - dropFaceTracking = true; // first try dropping the facial data - bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + // figure out how large our avatar byte array can be to fit in the packet list + // given that we need it and the avatar UUID and the size of the byte array (16 bit) + // to fit in a segment of the packet list + auto maxAvatarByteArraySize = avatarPacketList->getMaxSegmentSize(); + maxAvatarByteArraySize -= NUM_BYTES_RFC4122_UUID; + maxAvatarByteArraySize -= sizeof(quint16); - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData"; - bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); - } + auto sequenceNumberSize = sizeof(agentNodeData->getLastReceivedSequenceNumber()); + maxAvatarByteArraySize -= sequenceNumberSize; - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!"; - includeThisAvatar = false; + if (avatarByteArray.size() > maxAvatarByteArraySize) { + qCWarning(avatars) << "Replicated avatar data too large for" << otherAvatar->getSessionUUID() + << "-" << avatarByteArray.size() << "bytes"; + + avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData, + flagsOut, true, false, glm::vec3(0), nullptr); + + if (avatarByteArray.size() > maxAvatarByteArraySize) { + qCWarning(avatars) << "Replicated avatar data without facial data still too large for" + << otherAvatar->getSessionUUID() << "-" << avatarByteArray.size() << "bytes"; + + avatarByteArray = otherAvatar->toByteArray(AvatarData::MinimumData, 0, emptyLastJointSendData, + flagsOut, true, false, glm::vec3(0), nullptr); } } - if (includeThisAvatar) { - numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122()); - numAvatarDataBytes += avatarPacketList->write(bytes); + if (avatarByteArray.size() <= maxAvatarByteArraySize) { + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); - if (detail != AvatarData::NoData) { - _stats.numOthersIncluded++; + // set the last sent sequence number for this sender on the receiver + nodeData->setLastBroadcastSequenceNumber(agentNode->getUUID(), + agentNodeData->getLastReceivedSequenceNumber()); - // increment the number of avatars sent to this reciever - nodeData->incrementNumAvatarsSentLastFrame(); + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); - // set the last sent sequence number for this sender on the receiver - nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), - otherNodeData->getLastReceivedSequenceNumber()); + // start a new segment in the packet list for this avatar + avatarPacketList->startSegment(); - // remember the last time we sent details about this other node to the receiver - nodeData->setLastBroadcastTime(otherNode->getUUID(), start); - } + // write the node's UUID, the size of the replicated avatar data, + // the sequence number of the replicated avatar data, and the replicated avatar data + numAvatarDataBytes += avatarPacketList->write(agentNode->getUUID().toRfc4122()); + numAvatarDataBytes += avatarPacketList->writePrimitive((quint16) (avatarByteArray.size() + sequenceNumberSize)); + numAvatarDataBytes += avatarPacketList->writePrimitive(agentNodeData->getLastReceivedSequenceNumber()); + numAvatarDataBytes += avatarPacketList->write(avatarByteArray); + + avatarPacketList->endSegment(); + + } else { + qCWarning(avatars) << "Could not fit minimum data avatar for" << otherAvatar->getSessionUUID() + << "to packet list -" << avatarByteArray.size() << "bytes"; } - avatarPacketList->endSegment(); - quint64 endAvatarDataPacking = usecTimestampNow(); _stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking); - }; + } + }); - quint64 startPacketSending = usecTimestampNow(); + quint64 startPacketSending = usecTimestampNow(); - // close the current packet so that we're always sending something - avatarPacketList->closeCurrentPacket(true); + // close the current packet so that we're always sending something + avatarPacketList->closeCurrentPacket(true); - _stats.numPacketsSent += (int)avatarPacketList->getNumPackets(); - _stats.numBytesSent += numAvatarDataBytes; + _stats.numPacketsSent += (int)avatarPacketList->getNumPackets(); + _stats.numBytesSent += numAvatarDataBytes; - // send the avatar data PacketList - nodeList->sendPacketList(std::move(avatarPacketList), *node); + // send the replicated bulk avatar data + auto nodeList = DependencyManager::get(); + nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket()); - // record the bytes sent for other avatar data in the AvatarMixerClientData - nodeData->recordSentAvatarData(numAvatarDataBytes); + // record the bytes sent for other avatar data in the AvatarMixerClientData + nodeData->recordSentAvatarData(numAvatarDataBytes); - // record the number of avatars held back this frame - nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); - nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); - - quint64 endPacketSending = usecTimestampNow(); - _stats.packetSendingElapsedTime += (endPacketSending - startPacketSending); - } - - quint64 end = usecTimestampNow(); - _stats.jobElapsedTime += (end - start); + quint64 endPacketSending = usecTimestampNow(); + _stats.packetSendingElapsedTime += (endPacketSending - startPacketSending); } diff --git a/assignment-client/src/avatars/AvatarMixerSlave.h b/assignment-client/src/avatars/AvatarMixerSlave.h index 04141d9d72..69c707bbf1 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.h +++ b/assignment-client/src/avatars/AvatarMixerSlave.h @@ -21,6 +21,7 @@ public: quint64 processIncomingPacketsElapsedTime { 0 }; int nodesBroadcastedTo { 0 }; + int downstreamMixersBroadcastedTo { 0 }; int numPacketsSent { 0 }; int numBytesSent { 0 }; int numIdentityPackets { 0 }; @@ -41,6 +42,7 @@ public: // sending job stats nodesBroadcastedTo = 0; + downstreamMixersBroadcastedTo = 0; numPacketsSent = 0; numBytesSent = 0; numIdentityPackets = 0; @@ -60,6 +62,7 @@ public: processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime; nodesBroadcastedTo += rhs.nodesBroadcastedTo; + downstreamMixersBroadcastedTo += rhs.downstreamMixersBroadcastedTo; numPacketsSent += rhs.numPacketsSent; numBytesSent += rhs.numBytesSent; numIdentityPackets += rhs.numIdentityPackets; @@ -92,6 +95,10 @@ public: private: int sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode); + int sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode); + + void broadcastAvatarDataToAgent(const SharedNodePointer& node); + void broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node); // frame state ConstIter _begin; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 07d4fa8851..cb5ae7735a 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -76,8 +76,8 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end } void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, - p_high_resolution_clock::time_point lastFrameTimestamp, - float maxKbpsPerNode, float throttlingRatio) { + p_high_resolution_clock::time_point lastFrameTimestamp, + float maxKbpsPerNode, float throttlingRatio) { _function = &AvatarMixerSlave::broadcastAvatarData; _configure = [&](AvatarMixerSlave& slave) { slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio); diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 65f85215cb..0c0f8a5e55 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -2228,23 +2228,33 @@ void DomainServer::updateReplicatedNodes() { if (replicationSettings.contains("users")) { auto usersSettings = replicationSettings.value("users").toList(); for (auto& username : usersSettings) { - _replicatedUsernames.push_back(username.toString()); + _replicatedUsernames.push_back(username.toString().toLower()); } } } auto nodeList = DependencyManager::get(); - nodeList->eachNode([&](const SharedNodePointer& otherNode) { - if (shouldReplicateNode(*otherNode)) { - qDebug() << "Setting node to replicated: " << otherNode->getUUID(); - otherNode->setIsReplicated(true); + nodeList->eachMatchingNode([this](const SharedNodePointer& otherNode) -> bool { + return otherNode->getType() == NodeType::Agent; + }, [this](const SharedNodePointer& otherNode) { + auto shouldReplicate = shouldReplicateNode(*otherNode); + auto isReplicated = otherNode->isReplicated(); + if (isReplicated && !shouldReplicate) { + qDebug() << "Setting node to NOT be replicated:" + << otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID(); + } else if (!isReplicated && shouldReplicate) { + qDebug() << "Setting node to replicated:" + << otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID(); + } + otherNode->setIsReplicated(shouldReplicate); } - }); + ); } bool DomainServer::shouldReplicateNode(const Node& node) { QString verifiedUsername = node.getPermissions().getVerifiedUserName(); - qDebug() << "Verified username: " << verifiedUsername; + // Both the verified username and usernames in _replicatedUsernames are lowercase, so + // comparisons here are case-insensitive. auto it = find(_replicatedUsernames.cbegin(), _replicatedUsernames.cend(), verifiedUsername); return it != _replicatedUsernames.end() && node.getType() == NodeType::Agent; }; diff --git a/libraries/audio/src/InboundAudioStream.cpp b/libraries/audio/src/InboundAudioStream.cpp index 65cccf1fe0..da8997895c 100644 --- a/libraries/audio/src/InboundAudioStream.cpp +++ b/libraries/audio/src/InboundAudioStream.cpp @@ -127,6 +127,7 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { // parse the info after the seq number and before the audio data (the stream properties) int prePropertyPosition = message.getPosition(); int propertyBytes = parseStreamProperties(message.getType(), message.readWithoutCopy(message.getBytesLeftToRead()), networkFrames); + message.seek(prePropertyPosition + propertyBytes); // handle this packet based on its arrival status. diff --git a/libraries/avatars/src/AvatarData.cpp b/libraries/avatars/src/AvatarData.cpp index 4407e12295..3c048e590d 100644 --- a/libraries/avatars/src/AvatarData.cpp +++ b/libraries/avatars/src/AvatarData.cpp @@ -1473,27 +1473,6 @@ QStringList AvatarData::getJointNames() const { return _jointNames; } -void AvatarData::parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut) { - QDataStream packetStream(data); - - packetStream >> identityOut.uuid - >> identityOut.skeletonModelURL - >> identityOut.attachmentData - >> identityOut.displayName - >> identityOut.sessionDisplayName - >> identityOut.avatarEntityData - >> identityOut.sequenceId; - -#ifdef WANT_DEBUG - qCDebug(avatars) << __FUNCTION__ - << "identityOut.uuid:" << identityOut.uuid - << "identityOut.skeletonModelURL:" << identityOut.skeletonModelURL - << "identityOut.displayName:" << identityOut.displayName - << "identityOut.sessionDisplayName:" << identityOut.sessionDisplayName; -#endif - -} - glm::quat AvatarData::getOrientationOutbound() const { return (getLocalOrientation()); } @@ -1504,61 +1483,106 @@ QUrl AvatarData::cannonicalSkeletonModelURL(const QUrl& emptyURL) const { return _skeletonModelURL.scheme() == "file" ? emptyURL : _skeletonModelURL; } -void AvatarData::processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged) { +void AvatarData::processAvatarIdentity(const QByteArray& identityData, bool& identityChanged, bool& displayNameChanged) { - if (identity.sequenceId < _identitySequenceId) { - qCDebug(avatars) << "Ignoring older identity packet for avatar" << getSessionUUID() - << "_identitySequenceId (" << _identitySequenceId << ") is greater than" << identity.sequenceId; - return; + QDataStream packetStream(identityData); + + QUuid avatarSessionID; + + // peek the sequence number, this will tell us if we should be processing this identity packet at all + udt::SequenceNumber::Type incomingSequenceNumberType; + packetStream >> avatarSessionID >> incomingSequenceNumberType; + udt::SequenceNumber incomingSequenceNumber(incomingSequenceNumberType); + + if (!_hasProcessedFirstIdentity) { + _lastIncomingSequenceNumber = incomingSequenceNumber - 1; + _hasProcessedFirstIdentity = true; + qCDebug(avatars) << "Processing first identity packet for" << avatarSessionID << "-" + << (udt::SequenceNumber::Type) incomingSequenceNumber; } - // otherwise, set the identitySequenceId to match the incoming identity - _identitySequenceId = identity.sequenceId; - if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) { - setSkeletonModelURL(identity.skeletonModelURL); - identityChanged = true; - if (_firstSkeletonCheck) { + if (incomingSequenceNumber > _lastIncomingSequenceNumber) { + Identity identity; + + packetStream >> identity.skeletonModelURL + >> identity.attachmentData + >> identity.displayName + >> identity.sessionDisplayName + >> identity.avatarEntityData; + + // set the store identity sequence number to match the incoming identity + _lastIncomingSequenceNumber = incomingSequenceNumber; + + if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) { + setSkeletonModelURL(identity.skeletonModelURL); + identityChanged = true; + if (_firstSkeletonCheck) { + displayNameChanged = true; + } + _firstSkeletonCheck = false; + } + + if (identity.displayName != _displayName) { + _displayName = identity.displayName; + identityChanged = true; displayNameChanged = true; } - _firstSkeletonCheck = false; - } + maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName); - if (identity.displayName != _displayName) { - _displayName = identity.displayName; - identityChanged = true; - displayNameChanged = true; - } - maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName); + if (identity.attachmentData != _attachmentData) { + setAttachmentData(identity.attachmentData); + identityChanged = true; + } - if (identity.attachmentData != _attachmentData) { - setAttachmentData(identity.attachmentData); - identityChanged = true; - } + bool avatarEntityDataChanged = false; + _avatarEntitiesLock.withReadLock([&] { + avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData); + }); + + if (avatarEntityDataChanged) { + setAvatarEntityData(identity.avatarEntityData); + identityChanged = true; + } - bool avatarEntityDataChanged = false; - _avatarEntitiesLock.withReadLock([&] { - avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData); - }); - if (avatarEntityDataChanged) { - setAvatarEntityData(identity.avatarEntityData); - identityChanged = true; - } +#ifdef WANT_DEBUG + qCDebug(avatars) << __FUNCTION__ + << "identity.uuid:" << identity.uuid + << "identity.skeletonModelURL:" << identity.skeletonModelURL + << "identity.displayName:" << identity.displayName + << "identity.sessionDisplayName:" << identity.sessionDisplayName; +#endif + } else { +#ifdef WANT_DEBUG + qCDebug(avatars) << "Refusing to process identity for" << uuidStringWithoutCurlyBraces(avatarSessionID) << "since" + << (udt::SequenceNumber::Type) _lastIncomingSequenceNumber + << "is >=" << (udt::SequenceNumber::Type) incomingSequenceNumber; +#endif + } } -QByteArray AvatarData::identityByteArray() const { +QByteArray AvatarData::identityByteArray(bool shouldForwardIncomingSequenceNumber) const { QByteArray identityData; QDataStream identityStream(&identityData, QIODevice::Append); const QUrl& urlToSend = cannonicalSkeletonModelURL(emptyURL); // depends on _skeletonModelURL + // we use the boolean flag to determine if this is an identity byte array for a mixer to send to an agent + // or an agent to send to a mixer + + // when mixers send identity packets to agents, they simply forward along the last incoming sequence number they received + // whereas agents send a fresh outgoing sequence number when identity data has changed + + udt::SequenceNumber identitySequenceNumber = + shouldForwardIncomingSequenceNumber ? _lastIncomingSequenceNumber : _lastOutgoingSequenceNumber; + _avatarEntitiesLock.withReadLock([&] { identityStream << getSessionUUID() - << urlToSend - << _attachmentData - << _displayName - << getSessionDisplayNameForTransport() // depends on _sessionDisplayName - << _avatarEntityData - << _identitySequenceId; + << (udt::SequenceNumber::Type) identitySequenceNumber + << urlToSend + << _attachmentData + << _displayName + << getSessionDisplayNameForTransport() // depends on _sessionDisplayName + << _avatarEntityData; }); return identityData; @@ -1734,6 +1758,12 @@ void AvatarData::sendAvatarDataPacket() { void AvatarData::sendIdentityPacket() { auto nodeList = DependencyManager::get(); + + if (_identityDataChanged) { + // if the identity data has changed, push the sequence number forwards + ++_lastOutgoingSequenceNumber; + } + QByteArray identityData = identityByteArray(); auto packetList = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); @@ -1744,7 +1774,7 @@ void AvatarData::sendIdentityPacket() { }, [&](const SharedNodePointer& node) { nodeList->sendPacketList(std::move(packetList), *node); - }); + }); _avatarEntityDataLocallyEdited = false; _identityDataChanged = false; diff --git a/libraries/avatars/src/AvatarData.h b/libraries/avatars/src/AvatarData.h index 4104615cfe..2d43c4d412 100644 --- a/libraries/avatars/src/AvatarData.h +++ b/libraries/avatars/src/AvatarData.h @@ -52,15 +52,16 @@ typedef unsigned long long quint64; #include #include #include -#include -#include -#include #include #include -#include +#include #include -#include +#include +#include +#include #include +#include +#include #include "AABox.h" #include "HeadData.h" @@ -525,22 +526,18 @@ public: const HeadData* getHeadData() const { return _headData; } struct Identity { - QUuid uuid; QUrl skeletonModelURL; QVector attachmentData; QString displayName; QString sessionDisplayName; AvatarEntityMap avatarEntityData; - quint64 sequenceId; }; - static void parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut); - // identityChanged returns true if identity has changed, false otherwise. // displayNameChanged returns true if displayName has changed, false otherwise. - void processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged); + void processAvatarIdentity(const QByteArray& identityData, bool& identityChanged, bool& displayNameChanged); - QByteArray identityByteArray() const; + QByteArray identityByteArray(bool shouldForwardIncomingSequenceNumber = false) const; const QUrl& getSkeletonModelURL() const { return _skeletonModelURL; } const QString& getDisplayName() const { return _displayName; } @@ -624,10 +621,7 @@ public: static float _avatarSortCoefficientAge; bool getIdentityDataChanged() const { return _identityDataChanged; } // has the identity data changed since the last time sendIdentityPacket() was called - void markIdentityDataChanged() { - _identityDataChanged = true; - _identitySequenceId++; - } + void markIdentityDataChanged() { _identityDataChanged = true; } float getDensity() const { return _density; } @@ -786,7 +780,9 @@ protected: float _audioAverageLoudness { 0.0f }; bool _identityDataChanged { false }; - quint64 _identitySequenceId { 0 }; + udt::SequenceNumber _lastIncomingSequenceNumber { 0 }; + udt::SequenceNumber _lastOutgoingSequenceNumber { 0 }; + bool _hasProcessedFirstIdentity { false }; float _density; private: diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index 2ccc64fee2..cfbf2a8806 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -126,8 +126,9 @@ AvatarSharedPointer AvatarHashMap::parseAvatarData(QSharedPointer message, SharedNodePointer sendingNode) { - AvatarData::Identity identity; - AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity); + + // peek the avatar UUID from the incoming packet + QUuid identityUUID = message->peek(NUM_BYTES_RFC4122_UUID); // make sure this isn't for an ignored avatar auto nodeList = DependencyManager::get(); @@ -136,20 +137,21 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer { QReadLocker locker(&_hashLock); auto me = _avatarHash.find(EMPTY); - if ((me != _avatarHash.end()) && (identity.uuid == me.value()->getSessionUUID())) { + if ((me != _avatarHash.end()) && (identityUUID == me.value()->getSessionUUID())) { // We add MyAvatar to _avatarHash with an empty UUID. Code relies on this. In order to correctly handle an // identity packet for ourself (such as when we are assigned a sessionDisplayName by the mixer upon joining), // we make things match here. - identity.uuid = EMPTY; + identityUUID = EMPTY; } } - if (!nodeList->isIgnoringNode(identity.uuid) || nodeList->getRequestsDomainListData()) { + + if (!nodeList->isIgnoringNode(identityUUID) || nodeList->getRequestsDomainListData()) { // mesh URL for a UUID, find avatar in our list - auto avatar = newOrExistingAvatar(identity.uuid, sendingNode); + auto avatar = newOrExistingAvatar(identityUUID, sendingNode); bool identityChanged = false; bool displayNameChanged = false; // In this case, the "sendingNode" is the Avatar Mixer. - avatar->processAvatarIdentity(identity, identityChanged, displayNameChanged); + avatar->processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged); } } diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 0494efc7b4..9c0754cf26 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -446,7 +446,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr packetList, return _nodeSocket.writePacketList(std::move(packetList), *activeSocket); } else { - qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node. Not sending."; + qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node " + << destinationNode.getUUID() << ". Not sending."; return ERROR_SENDING_PACKET_BYTES; } } @@ -454,7 +455,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr packetList, qint64 LimitedNodeList::sendPacket(std::unique_ptr packet, const Node& destinationNode, const HifiSockAddr& overridenSockAddr) { if (overridenSockAddr.isNull() && !destinationNode.getActiveSocket()) { - qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node. Not sending."; + qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node" + << destinationNode.getUUID() << ". Not sending."; return ERROR_SENDING_PACKET_BYTES; } diff --git a/libraries/networking/src/NLPacketList.h b/libraries/networking/src/NLPacketList.h index 48ce5ef81a..910d39f71b 100644 --- a/libraries/networking/src/NLPacketList.h +++ b/libraries/networking/src/NLPacketList.h @@ -23,6 +23,8 @@ public: PacketVersion getVersion() const { return _packetVersion; } const QUuid& getSourceID() const { return _sourceID; } + + qint64 getMaxSegmentSize() const override { return NLPacket::maxPayloadSize(_packetType, _isOrdered); } private: NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, diff --git a/libraries/networking/src/ReceivedMessage.cpp b/libraries/networking/src/ReceivedMessage.cpp index 2c5a11334b..6ca249fb22 100644 --- a/libraries/networking/src/ReceivedMessage.cpp +++ b/libraries/networking/src/ReceivedMessage.cpp @@ -42,6 +42,20 @@ ReceivedMessage::ReceivedMessage(NLPacket& packet) { } +ReceivedMessage::ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion, + const HifiSockAddr& senderSockAddr, QUuid sourceID) : + _data(byteArray), + _headData(_data.mid(0, HEAD_DATA_SIZE)), + _numPackets(1), + _sourceID(sourceID), + _packetType(packetType), + _packetVersion(packetVersion), + _senderSockAddr(senderSockAddr), + _isComplete(true) +{ + +} + void ReceivedMessage::setFailed() { _failed = true; _isComplete = true; diff --git a/libraries/networking/src/ReceivedMessage.h b/libraries/networking/src/ReceivedMessage.h index 3acb7163e7..ae51e7592a 100644 --- a/libraries/networking/src/ReceivedMessage.h +++ b/libraries/networking/src/ReceivedMessage.h @@ -24,6 +24,8 @@ class ReceivedMessage : public QObject { public: ReceivedMessage(const NLPacketList& packetList); ReceivedMessage(NLPacket& packet); + ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion, + const HifiSockAddr& senderSockAddr, QUuid sourceID = QUuid()); QByteArray getMessage() const { return _data; } const char* getRawMessage() const { return _data.constData(); } diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 5aeb076c11..3e679f643a 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -134,7 +134,7 @@ void ThreadedAssignment::domainSettingsRequestFailed() { setFinished(true); } -void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType) { +void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, DownstreamNodeFoundCallback callback) { static const QString REPLICATION_GROUP_KEY = "replication"; static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers"; if (settingsObject.contains(REPLICATION_GROUP_KEY)) { @@ -161,8 +161,13 @@ void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObjec }; // manually add the downstream node to our node list - nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), - downstreamServerAddr, downstreamServerAddr); + auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), + downstreamServerAddr, downstreamServerAddr); + + // manually activate the public socket for the downstream node + node->activatePublicSocket(); + + callback(*node); } } } diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h index f96755a776..0cc7b2f40c 100644 --- a/libraries/networking/src/ThreadedAssignment.h +++ b/libraries/networking/src/ThreadedAssignment.h @@ -18,6 +18,8 @@ #include "Assignment.h" +using DownstreamNodeFoundCallback = std::function; + class ThreadedAssignment : public Assignment { Q_OBJECT public: @@ -40,7 +42,8 @@ signals: protected: void commonInit(const QString& targetName, NodeType_t nodeType); - void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType); + void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, + DownstreamNodeFoundCallback callback = [](Node& downstreamNode) {}); bool _isFinished; QTimer _domainServerTimer; diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index 29cf89be35..7985df58bf 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -41,7 +41,8 @@ const QSet NON_SOURCED_PACKETS = QSet() << PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode << PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement << PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho - << PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame; + << PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame + << PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedKillAvatar << PacketType::ReplicatedBulkAvatarData; PacketVersion versionForPacketType(PacketType packetType) { switch (packetType) { @@ -58,7 +59,7 @@ PacketVersion versionForPacketType(PacketType packetType) { case PacketType::AvatarData: case PacketType::BulkAvatarData: case PacketType::KillAvatar: - return static_cast(AvatarMixerPacketVersion::MannequinDefaultAvatar); + return static_cast(AvatarMixerPacketVersion::AvatarIdentitySequenceFront); case PacketType::MessagesData: return static_cast(MessageDataVersion::TextOrBinaryData); case PacketType::ICEServerHeartbeat: @@ -121,7 +122,7 @@ static void ensureProtocolVersionsSignature() { std::call_once(once, [&] { QByteArray buffer; QDataStream stream(&buffer, QIODevice::WriteOnly); - uint8_t numberOfProtocols = static_cast(PacketType::LAST_PACKET_TYPE) + 1; + uint8_t numberOfProtocols = static_cast(PacketType::NUM_PACKET_TYPE); stream << numberOfProtocols; for (uint8_t packetType = 0; packetType < numberOfProtocols; packetType++) { uint8_t packetTypeVersion = static_cast(versionForPacketType(static_cast(packetType))); diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index 0a1db3700b..c080ab8e19 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -119,8 +119,10 @@ public: ReplicatedMicrophoneAudioWithEcho, ReplicatedInjectAudio, ReplicatedSilentAudioFrame, - LAST_PACKET_TYPE = ReplicatedSilentAudioFrame, - + ReplicatedAvatarIdentity, + ReplicatedKillAvatar, + ReplicatedBulkAvatarData, + NUM_PACKET_TYPE }; }; @@ -242,7 +244,8 @@ enum class AvatarMixerPacketVersion : PacketVersion { StickAndBallDefaultAvatar, IdentityPacketsIncludeUpdateTime, AvatarIdentitySequenceId, - MannequinDefaultAvatar + MannequinDefaultAvatar, + AvatarIdentitySequenceFront }; enum class DomainConnectRequestVersion : PacketVersion { diff --git a/libraries/networking/src/udt/PacketList.cpp b/libraries/networking/src/udt/PacketList.cpp index 8651f9eed4..d69ff39197 100644 --- a/libraries/networking/src/udt/PacketList.cpp +++ b/libraries/networking/src/udt/PacketList.cpp @@ -36,8 +36,8 @@ std::unique_ptr PacketList::fromReceivedPackets(std::list> _packets; + + bool _isOrdered = false; private: friend class ::LimitedNodeList; @@ -93,7 +97,6 @@ private: Packet::MessageNumber _messageNumber; bool _isReliable = false; - bool _isOrdered = false; std::unique_ptr _currentPacket; diff --git a/libraries/networking/src/udt/SequenceNumber.h b/libraries/networking/src/udt/SequenceNumber.h index 3abc80bdd8..2c82eccfa3 100644 --- a/libraries/networking/src/udt/SequenceNumber.h +++ b/libraries/networking/src/udt/SequenceNumber.h @@ -35,8 +35,8 @@ public: explicit SequenceNumber(char* value) { _value = (*reinterpret_cast(value)) & MAX; } explicit SequenceNumber(Type value) { _value = (value <= MAX) ? ((value >= 0) ? value : 0) : MAX; } explicit SequenceNumber(UType value) { _value = (value <= MAX) ? value : MAX; } - explicit operator Type() { return _value; } - explicit operator UType() { return static_cast(_value); } + explicit operator Type() const { return _value; } + explicit operator UType() const { return static_cast(_value); } inline SequenceNumber& operator++() { _value = (_value + 1) % (MAX + 1);