diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index bb03a6ec93..ab4b20b9cc 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -55,7 +55,8 @@ QVector AudioMixer::_zoneSettings; QVector AudioMixer::_zoneReverbSettings; AudioMixer::AudioMixer(ReceivedMessage& message) : - ThreadedAssignment(message) { + ThreadedAssignment(message) +{ // Always clear settings first // This prevents previous assignment settings from sticking around @@ -92,7 +93,21 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : packetReceiver.registerListener(PacketType::NodeMuteRequest, this, "handleNodeMuteRequestPacket"); packetReceiver.registerListener(PacketType::KillAvatar, this, "handleKillAvatarPacket"); + packetReceiver.registerListenerForTypes({ + PacketType::ReplicatedMicrophoneAudioNoEcho, + PacketType::ReplicatedMicrophoneAudioWithEcho, + PacketType::ReplicatedInjectAudio, + PacketType::ReplicatedSilentAudioFrame + }, + this, "queueReplicatedAudioPacket" + ); + connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); + connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) { + if (node->getType() == NodeType::DownstreamAudioMixer) { + node->activatePublicSocket(); + } + }); } void AudioMixer::queueAudioPacket(QSharedPointer message, SharedNodePointer node) { @@ -103,6 +118,33 @@ void AudioMixer::queueAudioPacket(QSharedPointer message, Share getOrCreateClientData(node.data())->queuePacket(message, node); } +void AudioMixer::queueReplicatedAudioPacket(QSharedPointer message) { + // make sure we have a replicated node for the original sender of the packet + auto nodeList = DependencyManager::get(); + + QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); + + auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, + message->getSenderSockAddr(), message->getSenderSockAddr(), + true, true); + replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); + + // construct a "fake" audio received message from the byte array and packet list information + auto audioData = message->getMessage().mid(NUM_BYTES_RFC4122_UUID); + + PacketType rewrittenType = REPLICATED_PACKET_MAPPING.key(message->getType()); + + if (rewrittenType == PacketType::Unknown) { + qDebug() << "Cannot unwrap replicated packet type not present in REPLICATED_PACKET_WRAPPING"; + } + + auto replicatedMessage = QSharedPointer::create(audioData, rewrittenType, + versionForPacketType(rewrittenType), + message->getSenderSockAddr(), nodeID); + + getOrCreateClientData(replicatedNode.data())->queuePacket(replicatedMessage, replicatedNode); +} + void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer message, SharedNodePointer sendingNode) { auto nodeList = DependencyManager::get(); @@ -347,7 +389,7 @@ void AudioMixer::start() { auto nodeList = DependencyManager::get(); // prepare the NodeList - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer }); nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); }; // parse out any AudioMixer settings @@ -506,7 +548,7 @@ void AudioMixer::clearDomainSettings() { _zoneReverbSettings.clear(); } -void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) { +void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) { qDebug() << "AVX2 Support:" << (cpuSupportsAVX2() ? "enabled" : "disabled"); if (settingsObject.contains(AUDIO_THREADING_GROUP_KEY)) { diff --git a/assignment-client/src/audio/AudioMixer.h b/assignment-client/src/audio/AudioMixer.h index 18e754016e..e542d82a6a 100644 --- a/assignment-client/src/audio/AudioMixer.h +++ b/assignment-client/src/audio/AudioMixer.h @@ -63,6 +63,7 @@ private slots: void handleKillAvatarPacket(QSharedPointer packet, SharedNodePointer sendingNode); void queueAudioPacket(QSharedPointer packet, SharedNodePointer sendingNode); + void queueReplicatedAudioPacket(QSharedPointer packet); void removeHRTFsForFinishedInjector(const QUuid& streamID); void start(); diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index d5e06504a6..bf19a02d9a 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -68,9 +68,21 @@ void AudioMixerClientData::processPackets() { case PacketType::MicrophoneAudioWithEcho: case PacketType::InjectAudio: case PacketType::AudioStreamStats: - case PacketType::SilentAudioFrame: { + case PacketType::SilentAudioFrame: + case PacketType::ReplicatedMicrophoneAudioNoEcho: + case PacketType::ReplicatedMicrophoneAudioWithEcho: + case PacketType::ReplicatedInjectAudio: + case PacketType::ReplicatedSilentAudioFrame: { + + if (node->isUpstream() && !_hasSetupCodecForUpstreamNode) { + setupCodecForReplicatedAgent(packet); + } + QMutexLocker lock(&getMutex()); parseData(*packet); + + optionallyReplicatePacket(*packet, *node); + break; } case PacketType::NegotiateAudioFormat: @@ -97,6 +109,59 @@ void AudioMixerClientData::processPackets() { assert(_packetQueue.empty()); } +bool isReplicatedPacket(PacketType packetType) { + return packetType == PacketType::ReplicatedMicrophoneAudioNoEcho + || packetType == PacketType::ReplicatedMicrophoneAudioWithEcho + || packetType == PacketType::ReplicatedInjectAudio + || packetType == PacketType::ReplicatedSilentAudioFrame; +} + +void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) { + + // first, make sure that this is a packet from a node we are supposed to replicate + if (node.isReplicated()) { + + // now make sure it's a packet type that we want to replicate + + // first check if it is an original type that we should replicate + PacketType mirroredType = REPLICATED_PACKET_MAPPING.value(message.getType()); + + if (mirroredType == PacketType::Unknown) { + // if it wasn't check if it is a replicated type that we should re-replicate + if (REPLICATED_PACKET_MAPPING.key(message.getType()) != PacketType::Unknown) { + mirroredType = message.getType(); + } else { + qDebug() << "Packet passed to optionallyReplicatePacket was not a replicatable type - returning"; + return; + } + } + + std::unique_ptr packet; + auto nodeList = DependencyManager::get(); + + // enumerate the downstream audio mixers and send them the replicated version of this packet + nodeList->unsafeEachNode([&](const SharedNodePointer& downstreamNode) { + if (downstreamNode->getType() == NodeType::DownstreamAudioMixer) { + // construct the packet only once, if we have any downstream audio mixers to send to + if (!packet) { + // construct an NLPacket to send to the replicant that has the contents of the received packet + packet = NLPacket::create(mirroredType); + + if (!isReplicatedPacket(message.getType())) { + // since this packet will be non-sourced, we add the replicated node's ID here + packet->write(node.getUUID().toRfc4122()); + } + + packet->write(message.getMessage()); + } + + nodeList->sendUnreliablePacket(*packet, *downstreamNode); + } + }); + } + +} + void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) { quint8 numberOfCodecs; message.readPrimitive(&numberOfCodecs); @@ -188,8 +253,11 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { bool isMicStream = false; if (packetType == PacketType::MicrophoneAudioWithEcho + || packetType == PacketType::ReplicatedMicrophoneAudioWithEcho || packetType == PacketType::MicrophoneAudioNoEcho - || packetType == PacketType::SilentAudioFrame) { + || packetType == PacketType::ReplicatedMicrophoneAudioNoEcho + || packetType == PacketType::SilentAudioFrame + || packetType == PacketType::ReplicatedSilentAudioFrame) { QWriteLocker writeLocker { &_streamsLock }; @@ -209,7 +277,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { avatarAudioStream->setupCodec(_codec, _selectedCodecName, AudioConstants::MONO); qDebug() << "creating new AvatarAudioStream... codec:" << _selectedCodecName; - connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, this, &AudioMixerClientData::handleMismatchAudioFormat); + connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, + this, &AudioMixerClientData::handleMismatchAudioFormat); auto emplaced = _audioStreams.emplace( QUuid(), @@ -224,10 +293,12 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { writeLocker.unlock(); isMicStream = true; - } else if (packetType == PacketType::InjectAudio) { + } else if (packetType == PacketType::InjectAudio + || packetType == PacketType::ReplicatedInjectAudio) { // this is injected audio // grab the stream identifier for this injected audio message.seek(sizeof(quint16)); + QUuid streamIdentifier = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID)); bool isStereo; @@ -608,3 +679,22 @@ bool AudioMixerClientData::shouldIgnore(const SharedNodePointer self, const Shar return shouldIgnore; } + +void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer message) { + // hop past the sequence number that leads the packet + message->seek(sizeof(quint16)); + + // pull the codec string from the packet + auto codecString = message->readString(); + + qDebug() << "Manually setting codec for replicated agent" << uuidStringWithoutCurlyBraces(getNodeID()) + << "-" << codecString; + + const std::pair codec = AudioMixer::negotiateCodec({ codecString }); + setupCodec(codec.second, codec.first); + + _hasSetupCodecForUpstreamNode = true; + + // seek back to the beginning of the message so other readers are in the right place + message->seek(0); +} diff --git a/assignment-client/src/audio/AudioMixerClientData.h b/assignment-client/src/audio/AudioMixerClientData.h index 8d76cda2f1..76a9cd6aa7 100644 --- a/assignment-client/src/audio/AudioMixerClientData.h +++ b/assignment-client/src/audio/AudioMixerClientData.h @@ -26,7 +26,6 @@ #include "PositionalAudioStream.h" #include "AvatarAudioStream.h" - class AudioMixerClientData : public NodeData { Q_OBJECT public: @@ -108,6 +107,8 @@ public: bool getRequestsDomainListData() { return _requestsDomainListData; } void setRequestsDomainListData(bool requesting) { _requestsDomainListData = requesting; } + void setupCodecForReplicatedAgent(QSharedPointer message); + signals: void injectorStreamFinished(const QUuid& streamIdentifier); @@ -124,6 +125,8 @@ private: QReadWriteLock _streamsLock; AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID + void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node); + using IgnoreZone = AABox; class IgnoreZoneMemo { public: @@ -181,6 +184,8 @@ private: bool _shouldMuteClient { false }; bool _requestsDomainListData { false }; + + bool _hasSetupCodecForUpstreamNode { false }; }; #endif // hifi_AudioMixerClientData_h diff --git a/assignment-client/src/audio/AudioMixerSlave.cpp b/assignment-client/src/audio/AudioMixerSlave.cpp index d01d961e33..2d800c3561 100644 --- a/assignment-client/src/audio/AudioMixerSlave.cpp +++ b/assignment-client/src/audio/AudioMixerSlave.cpp @@ -74,6 +74,10 @@ void AudioMixerSlave::mix(const SharedNodePointer& node) { return; } + if (node->isUpstream()) { + return; + } + // check that the stream is valid auto avatarStream = data->getAvatarAudioStream(); if (avatarStream == nullptr) { diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index c8b68a740c..f1e30f4442 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -54,8 +54,108 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) : packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket"); packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket"); + packetReceiver.registerListenerForTypes({ + PacketType::ReplicatedAvatarIdentity, + PacketType::ReplicatedKillAvatar + }, this, "handleReplicatedPacket"); + + packetReceiver.registerListener(PacketType::ReplicatedBulkAvatarData, this, "handleReplicatedBulkAvatarPacket"); + auto nodeList = DependencyManager::get(); connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch); + connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) { + if (node->getType() == NodeType::DownstreamAvatarMixer) { + getOrCreateClientData(node); + node->activatePublicSocket(); + } + }); +} + +SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) { + auto replicatedNode = DependencyManager::get()->addOrUpdateNode(nodeID, NodeType::Agent, + senderSockAddr, + senderSockAddr, + true, true); + + replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); + + return replicatedNode; +} + +void AvatarMixer::handleReplicatedPacket(QSharedPointer message) { + auto nodeList = DependencyManager::get(); + auto nodeID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID)); + + auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr()); + + if (message->getType() == PacketType::ReplicatedAvatarIdentity) { + handleAvatarIdentityPacket(message, replicatedNode); + } else if (message->getType() == PacketType::ReplicatedKillAvatar) { + handleKillAvatarPacket(message, replicatedNode); + } +} + +void AvatarMixer::handleReplicatedBulkAvatarPacket(QSharedPointer message) { + while (message->getBytesLeftToRead()) { + // first, grab the node ID for this replicated avatar + auto nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); + + // make sure we have an upstream replicated node that matches + auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr()); + + // grab the size of the avatar byte array so we know how much to read + quint16 avatarByteArraySize; + message->readPrimitive(&avatarByteArraySize); + + // read the avatar byte array + auto avatarByteArray = message->read(avatarByteArraySize); + + // construct a "fake" avatar data received message from the byte array and packet list information + auto replicatedMessage = QSharedPointer::create(avatarByteArray, PacketType::AvatarData, + versionForPacketType(PacketType::AvatarData), + message->getSenderSockAddr(), nodeID); + + // queue up the replicated avatar data with the client data for the replicated node + auto start = usecTimestampNow(); + getOrCreateClientData(replicatedNode)->queuePacket(replicatedMessage, replicatedNode); + auto end = usecTimestampNow(); + _queueIncomingPacketElapsedTime += (end - start); + } +} + +void AvatarMixer::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) { + // first, make sure that this is a packet from a node we are supposed to replicate + if (node.isReplicated()) { + + // check if this is a packet type we replicate + // which means it must be a packet type present in REPLICATED_PACKET_MAPPING or must be the + // replicated version of one of those packet types + PacketType replicatedType = REPLICATED_PACKET_MAPPING.value(message.getType()); + + if (replicatedType == PacketType::Unknown) { + if (REPLICATED_PACKET_MAPPING.key(message.getType()) != PacketType::Unknown) { + replicatedType = message.getType(); + } else { + qDebug() << __FUNCTION__ << "called without replicatable packet type - returning"; + return; + } + } + + std::unique_ptr packet; + + auto nodeList = DependencyManager::get(); + nodeList->eachMatchingNode([&](const SharedNodePointer& node) { + return node->getType() == NodeType::DownstreamAvatarMixer; + }, [&](const SharedNodePointer& node) { + if (!packet) { + // construct an NLPacket to send to the replicant that has the contents of the received packet + packet = NLPacket::create(replicatedType, message.getSize()); + packet->write(message.getMessage()); + } + + nodeList->sendUnreliablePacket(*packet, *node); + }); + } } void AvatarMixer::queueIncomingPacket(QSharedPointer message, SharedNodePointer node) { @@ -70,12 +170,14 @@ AvatarMixer::~AvatarMixer() { } void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { - QByteArray individualData = nodeData->getAvatar().identityByteArray(); - individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); - auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); - identityPackets->write(individualData); - DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); - ++_sumIdentityPackets; + if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) { + QByteArray individualData = nodeData->getAvatar().identityByteArray(); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); + auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); + identityPackets->write(individualData); + DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); + ++_sumIdentityPackets; + } } std::chrono::microseconds AvatarMixer::timeFrame(p_high_resolution_clock::time_point& timestamp) { @@ -132,7 +234,10 @@ void AvatarMixer::start() { auto start = usecTimestampNow(); nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { std::for_each(cbegin, cend, [&](const SharedNodePointer& node) { - manageIdentityData(node); + if (node->getType() == NodeType::Agent && !node->isUpstream()) { + manageIdentityData(node); + } + ++_sumListeners; }); }, &lockWait, &nodeTransform, &functor); @@ -310,13 +415,38 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) { } } + std::unique_ptr killPacket; + std::unique_ptr replicatedKillPacket; + // this was an avatar we were sending to other people // send a kill packet for it to our other nodes - auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); - killPacket->write(killedNode->getUUID().toRfc4122()); - killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + nodeList->eachMatchingNode([&](const SharedNodePointer& node) { + // we relay avatar kill packets to agents that are not upstream + // and downstream avatar mixers, if the node that was just killed was being replicated + return (node->getType() == NodeType::Agent && !node->isUpstream()) + || (killedNode->isReplicated() && node->getType() == NodeType::DownstreamAvatarMixer); + }, [&](const SharedNodePointer& node) { + if (node->getType() == NodeType::Agent) { + if (!killPacket) { + killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); + killPacket->write(killedNode->getUUID().toRfc4122()); + killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + } + + nodeList->sendUnreliablePacket(*killPacket, *node); + } else { + // send a replicated kill packet to the downstream avatar mixer + if (!replicatedKillPacket) { + replicatedKillPacket = NLPacket::create(PacketType::ReplicatedKillAvatar, + NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason)); + replicatedKillPacket->write(killedNode->getUUID().toRfc4122()); + replicatedKillPacket->writePrimitive(KillAvatarReason::AvatarDisconnected); + } + + nodeList->sendUnreliablePacket(*replicatedKillPacket, *node); + } + }); - nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); // we also want to remove sequence number data for this avatar on our other avatars // so invoke the appropriate method on the AvatarMixerClientData for other avatars @@ -429,12 +559,11 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer mes AvatarData& avatar = nodeData->getAvatar(); // parse the identity packet and update the change timestamp if appropriate - AvatarData::Identity identity; - AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity); bool identityChanged = false; bool displayNameChanged = false; bool skeletonModelUrlChanged = false; - avatar.processAvatarIdentity(identity, identityChanged, displayNameChanged, skeletonModelUrlChanged); + avatar.processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged, skeletonModelUrlChanged); + if (identityChanged) { QMutexLocker nodeDataLocker(&nodeData->getMutex()); nodeData->flagIdentityChange(); @@ -451,11 +580,13 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer mes _handleAvatarIdentityPacketElapsedTime += (end - start); } -void AvatarMixer::handleKillAvatarPacket(QSharedPointer message) { +void AvatarMixer::handleKillAvatarPacket(QSharedPointer message, SharedNodePointer node) { auto start = usecTimestampNow(); DependencyManager::get()->processKillNode(*message); auto end = usecTimestampNow(); _handleKillAvatarPacketElapsedTime += (end - start); + + optionallyReplicatePacket(*message, *node); } void AvatarMixer::handleNodeIgnoreRequestPacket(QSharedPointer message, SharedNodePointer senderNode) { @@ -707,7 +838,6 @@ void AvatarMixer::run() { connect(&domainHandler, &DomainHandler::settingsReceiveFail, this, &AvatarMixer::domainSettingsRequestFailed); ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer); - } AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node) { @@ -726,7 +856,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node void AvatarMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer }); // parse the settings to pull out the values we need parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); @@ -803,12 +933,13 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { static const QString AVATAR_WHITELIST_OPTION = "avatar_whitelist"; _avatarWhitelist = domainSettings[AVATARS_SETTINGS_KEY].toObject()[AVATAR_WHITELIST_OPTION].toString(AVATAR_WHITELIST_DEFAULT).split(',', QString::KeepEmptyParts); - static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar"; + static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar"; _replacementAvatar = domainSettings[AVATARS_SETTINGS_KEY].toObject()[REPLACEMENT_AVATAR_OPTION].toString(REPLACEMENT_AVATAR_DEFAULT); if ((_avatarWhitelist.count() == 1) && _avatarWhitelist[0].isEmpty()) { _avatarWhitelist.clear(); // KeepEmptyParts above will parse "," as ["", ""] (which is ok), but "" as [""] (which is not ok). } + if (_avatarWhitelist.isEmpty()) { qCDebug(avatars) << "All avatars are allowed."; } else { diff --git a/assignment-client/src/avatars/AvatarMixer.h b/assignment-client/src/avatars/AvatarMixer.h index f8ebe419a9..58d4487652 100644 --- a/assignment-client/src/avatars/AvatarMixer.h +++ b/assignment-client/src/avatars/AvatarMixer.h @@ -42,10 +42,12 @@ private slots: void handleAdjustAvatarSorting(QSharedPointer message, SharedNodePointer senderNode); void handleViewFrustumPacket(QSharedPointer message, SharedNodePointer senderNode); void handleAvatarIdentityPacket(QSharedPointer message, SharedNodePointer senderNode); - void handleKillAvatarPacket(QSharedPointer message); + void handleKillAvatarPacket(QSharedPointer message, SharedNodePointer senderNode); void handleNodeIgnoreRequestPacket(QSharedPointer message, SharedNodePointer senderNode); void handleRadiusIgnoreRequestPacket(QSharedPointer packet, SharedNodePointer sendingNode); void handleRequestsDomainListDataPacket(QSharedPointer message, SharedNodePointer senderNode); + void handleReplicatedPacket(QSharedPointer message); + void handleReplicatedBulkAvatarPacket(QSharedPointer message); void domainSettingsRequestComplete(); void handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID); void start(); @@ -66,6 +68,8 @@ private: QStringList _avatarWhitelist { }; QString _replacementAvatar { REPLACEMENT_AVATAR_DEFAULT }; + void optionallyReplicatePacket(ReceivedMessage& message, const Node& node); + p_high_resolution_clock::time_point _lastFrameTimestamp; // FIXME - new throttling - use these values somehow diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 2ad8bb58ed..8092518454 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -65,15 +65,32 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) { _stats.processIncomingPacketsElapsedTime += (end - start); } - int AvatarMixerSlave::sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { - QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); - individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious - auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); - identityPackets->write(individualData); - DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); - _stats.numIdentityPackets++; - return individualData.size(); + if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) { + QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious + auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); + identityPackets->write(individualData); + DependencyManager::get()->sendPacketList(std::move(identityPackets), *destinationNode); + _stats.numIdentityPackets++; + return individualData.size(); + } else { + return 0; + } +} + +int AvatarMixerSlave::sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { + if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) { + QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious + auto identityPacket = NLPacket::create(PacketType::ReplicatedAvatarIdentity); + identityPacket->write(individualData); + DependencyManager::get()->sendUnreliablePacket(*identityPacket, *destinationNode); + _stats.numIdentityPackets++; + return individualData.size(); + } else { + return 0; + } } static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; @@ -81,6 +98,18 @@ static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45; void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { quint64 start = usecTimestampNow(); + if (node->getType() == NodeType::Agent && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) { + broadcastAvatarDataToAgent(node); + } else if (node->getType() == NodeType::DownstreamAvatarMixer) { + broadcastAvatarDataToDownstreamMixer(node); + } + + quint64 end = usecTimestampNow(); + _stats.jobElapsedTime += (end - start); +} + +void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) { + auto nodeList = DependencyManager::get(); // setup for distributed random floating point values @@ -88,308 +117,432 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { std::mt19937 generator(randomDevice()); std::uniform_real_distribution distribution; - if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) { - _stats.nodesBroadcastedTo++; + _stats.nodesBroadcastedTo++; - AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); - nodeData->resetInViewStats(); + nodeData->resetInViewStats(); - const AvatarData& avatar = nodeData->getAvatar(); - glm::vec3 myPosition = avatar.getClientGlobalPosition(); + const AvatarData& avatar = nodeData->getAvatar(); + glm::vec3 myPosition = avatar.getClientGlobalPosition(); - // reset the internal state for correct random number distribution - distribution.reset(); + // reset the internal state for correct random number distribution + distribution.reset(); - // reset the number of sent avatars - nodeData->resetNumAvatarsSentLastFrame(); + // reset the number of sent avatars + nodeData->resetNumAvatarsSentLastFrame(); - // keep a counter of the number of considered avatars - int numOtherAvatars = 0; + // keep a counter of the number of considered avatars + int numOtherAvatars = 0; - // keep track of outbound data rate specifically for avatar data - int numAvatarDataBytes = 0; - int identityBytesSent = 0; + // keep track of outbound data rate specifically for avatar data + int numAvatarDataBytes = 0; + int identityBytesSent = 0; - // max number of avatarBytes per frame - auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND; + // max number of avatarBytes per frame + auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND; - // FIXME - find a way to not send the sessionID for every avatar - int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID; + // FIXME - find a way to not send the sessionID for every avatar + int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID; - int overBudgetAvatars = 0; + int overBudgetAvatars = 0; - // keep track of the number of other avatars held back in this frame - int numAvatarsHeldBack = 0; + // keep track of the number of other avatars held back in this frame + int numAvatarsHeldBack = 0; - // keep track of the number of other avatar frames skipped - int numAvatarsWithSkippedFrames = 0; + // keep track of the number of other avatar frames skipped + int numAvatarsWithSkippedFrames = 0; - // When this is true, the AvatarMixer will send Avatar data to a client - // about avatars they've ignored or that are out of view - bool PALIsOpen = nodeData->getRequestsDomainListData(); + // When this is true, the AvatarMixer will send Avatar data to a client + // about avatars they've ignored or that are out of view + bool PALIsOpen = nodeData->getRequestsDomainListData(); - // When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them - bool getsAnyIgnored = PALIsOpen && node->getCanKick(); + // When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them + bool getsAnyIgnored = PALIsOpen && node->getCanKick(); - if (PALIsOpen) { - // Increase minimumBytesPerAvatar if the PAL is open - minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) + - sizeof(AvatarDataPacket::AudioLoudness); - } + if (PALIsOpen) { + // Increase minimumBytesPerAvatar if the PAL is open + minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) + + sizeof(AvatarDataPacket::AudioLoudness); + } - // setup a PacketList for the avatarPackets - auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData); + // setup a PacketList for the avatarPackets + auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData); - // Define the minimum bubble size - static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f); - // Define the scale of the box for the current node - glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f; - // Set up the bounding box for the current node - AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale); - // Clamp the size of the bounding box to a minimum scale - if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) { - nodeBox.setScaleStayCentered(minBubbleSize); - } - // Quadruple the scale of both bounding boxes - nodeBox.embiggen(4.0f); + // Define the minimum bubble size + static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f); + // Define the scale of the box for the current node + glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f; + // Set up the bounding box for the current node + AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale); + // Clamp the size of the bounding box to a minimum scale + if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) { + nodeBox.setScaleStayCentered(minBubbleSize); + } + // Quadruple the scale of both bounding boxes + nodeBox.embiggen(4.0f); - // setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes - // for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes - QList avatarList; - std::unordered_map avatarDataToNodes; + // setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes + // for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes + QList avatarList; + std::unordered_map avatarDataToNodes; - std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) { + std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) { + // make sure this is an agent that we have avatar data for before considering it for inclusion + if (otherNode->getType() == NodeType::Agent + && otherNode->getLinkedData()) { const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - // theoretically it's possible for a Node to be in the NodeList (and therefore end up here), - // but not have yet sent data that's linked to the node. Check for that case and don't - // consider those nodes. - if (otherNodeData) { - AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer(); - avatarList << otherAvatar; - avatarDataToNodes[otherAvatar] = otherNode; + AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer(); + avatarList << otherAvatar; + avatarDataToNodes[otherAvatar] = otherNode; + } + }); + + AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer(); + ViewFrustum cameraView = nodeData->getViewFrustom(); + std::priority_queue sortedAvatars; + AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars, + [&](AvatarSharedPointer avatar)->uint64_t { + auto avatarNode = avatarDataToNodes[avatar]; + assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + return nodeData->getLastBroadcastTime(avatarNode->getUUID()); + }, [&](AvatarSharedPointer avatar)->float{ + glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner()); + return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z)); + }, [&](AvatarSharedPointer avatar)->bool { + if (avatar == thisAvatar) { + return true; // ignore ourselves... + } + + bool shouldIgnore = false; + + // We will also ignore other nodes for a couple of different reasons: + // 1) ignore bubbles and ignore specific node + // 2) the node hasn't really updated it's frame data recently, this can + // happen if for example the avatar is connected on a desktop and sending + // updates at ~30hz. So every 3 frames we skip a frame. + auto avatarNode = avatarDataToNodes[avatar]; + + assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + + const AvatarMixerClientData* avatarNodeData = reinterpret_cast(avatarNode->getLinkedData()); + assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data + quint64 startIgnoreCalculation = usecTimestampNow(); + + // make sure we have data for this avatar, that it isn't the same node, + // and isn't an avatar that the viewing node has ignored + // or that has ignored the viewing node + if (!avatarNode->getLinkedData() + || avatarNode->getUUID() == node->getUUID() + || (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen) + || (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) { + shouldIgnore = true; + } else { + + // Check to see if the space bubble is enabled + // Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored + if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) { + + // Define the scale of the box for the current other node + glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f; + // Set up the bounding box for the current other node + AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); + // Clamp the size of the bounding box to a minimum scale + if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) { + otherNodeBox.setScaleStayCentered(minBubbleSize); + } + // Quadruple the scale of both bounding boxes + otherNodeBox.embiggen(4.0f); + + // Perform the collision check between the two bounding boxes + if (nodeBox.touches(otherNodeBox)) { + nodeData->ignoreOther(node, avatarNode); + shouldIgnore = !getsAnyIgnored; + } } - }); + // Not close enough to ignore + if (!shouldIgnore) { + nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID()); + } + } + quint64 endIgnoreCalculation = usecTimestampNow(); + _stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation); - AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer(); - ViewFrustum cameraView = nodeData->getViewFrustom(); - std::priority_queue sortedAvatars; - AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars, + if (!shouldIgnore) { + AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID()); + AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber(); - [&](AvatarSharedPointer avatar)->uint64_t{ - auto avatarNode = avatarDataToNodes[avatar]; - assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map - return nodeData->getLastBroadcastTime(avatarNode->getUUID()); - }, + // FIXME - This code does appear to be working. But it seems brittle. + // It supports determining if the frame of data for this "other" + // avatar has already been sent to the reciever. This has been + // verified to work on a desktop display that renders at 60hz and + // therefore sends to mixer at 30hz. Each second you'd expect to + // have 15 (45hz-30hz) duplicate frames. In this case, the stat + // avg_other_av_skips_per_second does report 15. + // + // make sure we haven't already sent this data from this sender to this receiver + // or that somehow we haven't sent + if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { + ++numAvatarsHeldBack; + shouldIgnore = true; + } else if (lastSeqFromSender - lastSeqToReceiver > 1) { + // this is a skip - we still send the packet but capture the presence of the skip so we see it happening + ++numAvatarsWithSkippedFrames; + } + } + return shouldIgnore; + }); - [&](AvatarSharedPointer avatar)->float{ - glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner()); - return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z)); - }, + // loop through our sorted avatars and allocate our bandwidth to them accordingly + int avatarRank = 0; - [&](AvatarSharedPointer avatar)->bool{ - if (avatar == thisAvatar) { - return true; // ignore ourselves... - } + // this is overly conservative, because it includes some avatars we might not consider + int remainingAvatars = (int)sortedAvatars.size(); - bool shouldIgnore = false; + while (!sortedAvatars.empty()) { + AvatarPriority sortData = sortedAvatars.top(); + sortedAvatars.pop(); + const auto& avatarData = sortData.avatar; + avatarRank++; + remainingAvatars--; - // We will also ignore other nodes for a couple of different reasons: - // 1) ignore bubbles and ignore specific node - // 2) the node hasn't really updated it's frame data recently, this can - // happen if for example the avatar is connected on a desktop and sending - // updates at ~30hz. So every 3 frames we skip a frame. - auto avatarNode = avatarDataToNodes[avatar]; + auto otherNode = avatarDataToNodes[avatarData]; + assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map - assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map + // NOTE: Here's where we determine if we are over budget and drop to bare minimum data + int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars; + bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame; - const AvatarMixerClientData* avatarNodeData = reinterpret_cast(avatarNode->getLinkedData()); - assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data - quint64 startIgnoreCalculation = usecTimestampNow(); + quint64 startAvatarDataPacking = usecTimestampNow(); - // make sure we have data for this avatar, that it isn't the same node, - // and isn't an avatar that the viewing node has ignored - // or that has ignored the viewing node - if (!avatarNode->getLinkedData() - || avatarNode->getUUID() == node->getUUID() - || (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen) - || (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) { - shouldIgnore = true; - } else { + ++numOtherAvatars; - // Check to see if the space bubble is enabled - // Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored - if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) { + const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - // Define the scale of the box for the current other node - glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f; - // Set up the bounding box for the current other node - AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); - // Clamp the size of the bounding box to a minimum scale - if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) { - otherNodeBox.setScaleStayCentered(minBubbleSize); - } - // Quadruple the scale of both bounding boxes - otherNodeBox.embiggen(4.0f); + // If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO + // the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A. + if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) { + identityBytesSent += sendIdentityPacket(otherNodeData, node); + } - // Perform the collision check between the two bounding boxes - if (nodeBox.touches(otherNodeBox)) { - nodeData->ignoreOther(node, avatarNode); - shouldIgnore = !getsAnyIgnored; - } - } - // Not close enough to ignore - if (!shouldIgnore) { - nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID()); - } - } - quint64 endIgnoreCalculation = usecTimestampNow(); - _stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation); + const AvatarData* otherAvatar = otherNodeData->getConstAvatarData(); + glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition(); - if (!shouldIgnore) { - AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID()); - AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber(); + // determine if avatar is in view, to determine how much data to include... + glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f; + AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); + bool isInView = nodeData->otherAvatarInView(otherNodeBox); - // FIXME - This code does appear to be working. But it seems brittle. - // It supports determining if the frame of data for this "other" - // avatar has already been sent to the reciever. This has been - // verified to work on a desktop display that renders at 60hz and - // therefore sends to mixer at 30hz. Each second you'd expect to - // have 15 (45hz-30hz) duplicate frames. In this case, the stat - // avg_other_av_skips_per_second does report 15. - // - // make sure we haven't already sent this data from this sender to this receiver - // or that somehow we haven't sent - if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { - ++numAvatarsHeldBack; - shouldIgnore = true; - } else if (lastSeqFromSender - lastSeqToReceiver > 1) { - // this is a skip - we still send the packet but capture the presence of the skip so we see it happening - ++numAvatarsWithSkippedFrames; - } - } - return shouldIgnore; - }); + // start a new segment in the PacketList for this avatar + avatarPacketList->startSegment(); - // loop through our sorted avatars and allocate our bandwidth to them accordingly - int avatarRank = 0; + AvatarData::AvatarDataDetail detail; - // this is overly conservative, because it includes some avatars we might not consider - int remainingAvatars = (int)sortedAvatars.size(); + if (overBudget) { + overBudgetAvatars++; + _stats.overBudgetAvatars++; + detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData; + } else if (!isInView) { + detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData; + nodeData->incrementAvatarOutOfView(); + } else { + detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO + ? AvatarData::SendAllData : AvatarData::CullSmallData; + nodeData->incrementAvatarInView(); + } - while (!sortedAvatars.empty()) { - AvatarPriority sortData = sortedAvatars.top(); - sortedAvatars.pop(); - const auto& avatarData = sortData.avatar; - avatarRank++; - remainingAvatars--; + bool includeThisAvatar = true; + auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID()); + QVector& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID()); + bool distanceAdjust = true; + glm::vec3 viewerPosition = myPosition; + AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray + bool dropFaceTracking = false; - auto otherNode = avatarDataToNodes[avatarData]; - assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map + quint64 start = usecTimestampNow(); + QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + quint64 end = usecTimestampNow(); + _stats.toByteArrayElapsedTime += (end - start); - // NOTE: Here's where we determine if we are over budget and drop to bare minimum data - int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars; - bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame; + static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID); + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data"; + + dropFaceTracking = true; // first try dropping the facial data + bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData"; + bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther, + hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + } + + if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { + qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!"; + includeThisAvatar = false; + } + } + + if (includeThisAvatar) { + numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122()); + numAvatarDataBytes += avatarPacketList->write(bytes); + + if (detail != AvatarData::NoData) { + _stats.numOthersIncluded++; + + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); + + // set the last sent sequence number for this sender on the receiver + nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), + otherNodeData->getLastReceivedSequenceNumber()); + + // remember the last time we sent details about this other node to the receiver + nodeData->setLastBroadcastTime(otherNode->getUUID(), start); + } + } + + avatarPacketList->endSegment(); + + quint64 endAvatarDataPacking = usecTimestampNow(); + _stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking); + }; + + quint64 startPacketSending = usecTimestampNow(); + + // close the current packet so that we're always sending something + avatarPacketList->closeCurrentPacket(true); + + _stats.numPacketsSent += (int)avatarPacketList->getNumPackets(); + _stats.numBytesSent += numAvatarDataBytes; + + // send the avatar data PacketList + nodeList->sendPacketList(std::move(avatarPacketList), *node); + + // record the bytes sent for other avatar data in the AvatarMixerClientData + nodeData->recordSentAvatarData(numAvatarDataBytes); + + // record the number of avatars held back this frame + nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); + nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); + + quint64 endPacketSending = usecTimestampNow(); + _stats.packetSendingElapsedTime += (endPacketSending - startPacketSending); +} + +uint64_t REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US = 5 * 1000 * 1000; + +void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) { + _stats.downstreamMixersBroadcastedTo++; + + AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + if (!nodeData) { + return; + } + + // setup a PacketList for the replicated bulk avatar data + auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData); + + int numAvatarDataBytes = 0; + + // reset the number of sent avatars + nodeData->resetNumAvatarsSentLastFrame(); + + std::for_each(_begin, _end, [&](const SharedNodePointer& agentNode) { + // collect agents that we have avatar data for that we are supposed to replicate + if (agentNode->getType() == NodeType::Agent && agentNode->getLinkedData() && agentNode->isReplicated()) { + const AvatarMixerClientData* agentNodeData = reinterpret_cast(agentNode->getLinkedData()); + + AvatarSharedPointer otherAvatar = agentNodeData->getAvatarSharedPointer(); quint64 startAvatarDataPacking = usecTimestampNow(); - ++numOtherAvatars; - - const AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - - // If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO - // the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A. - if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) { - identityBytesSent += sendIdentityPacket(otherNodeData, node); - } - - const AvatarData* otherAvatar = otherNodeData->getConstAvatarData(); - glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition(); - - // determine if avatar is in view, to determine how much data to include... - glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f; - AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale); - bool isInView = nodeData->otherAvatarInView(otherNodeBox); - - // start a new segment in the PacketList for this avatar - avatarPacketList->startSegment(); - - AvatarData::AvatarDataDetail detail; - - if (overBudget) { - overBudgetAvatars++; - _stats.overBudgetAvatars++; - detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData; - } else if (!isInView) { - detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData; - nodeData->incrementAvatarOutOfView(); - } else { - detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO - ? AvatarData::SendAllData : AvatarData::CullSmallData; - nodeData->incrementAvatarInView(); - } - - bool includeThisAvatar = true; - auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID()); - QVector& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID()); - bool distanceAdjust = true; - glm::vec3 viewerPosition = myPosition; - AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray - bool dropFaceTracking = false; + // we cannot send a downstream avatar mixer any updates that expect them to have previous state for this avatar + // since we have no idea if they're online and receiving our packets + // so we always send a full update for this avatar + quint64 start = usecTimestampNow(); - QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + AvatarDataPacket::HasFlags flagsOut; + + QVector emptyLastJointSendData { otherAvatar->getJointCount() }; + + QByteArray avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData, + flagsOut, false, false, glm::vec3(0), nullptr); quint64 end = usecTimestampNow(); _stats.toByteArrayElapsedTime += (end - start); - static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID); - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data"; + auto lastBroadcastTime = nodeData->getLastBroadcastTime(agentNode->getUUID()); + if (lastBroadcastTime <= agentNodeData->getIdentityChangeTimestamp() + || (start - lastBroadcastTime) >= REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US) { + sendReplicatedIdentityPacket(agentNodeData, node); + nodeData->setLastBroadcastTime(agentNode->getUUID(), start); + } - dropFaceTracking = true; // first try dropping the facial data - bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); + // figure out how large our avatar byte array can be to fit in the packet list + // given that we need it and the avatar UUID and the size of the byte array (16 bit) + // to fit in a segment of the packet list + auto maxAvatarByteArraySize = avatarPacketList->getMaxSegmentSize(); + maxAvatarByteArraySize -= NUM_BYTES_RFC4122_UUID; + maxAvatarByteArraySize -= sizeof(quint16); - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData"; - bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther, - hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther); - } + auto sequenceNumberSize = sizeof(agentNodeData->getLastReceivedSequenceNumber()); + maxAvatarByteArraySize -= sequenceNumberSize; - if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) { - qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!"; - includeThisAvatar = false; + if (avatarByteArray.size() > maxAvatarByteArraySize) { + qCWarning(avatars) << "Replicated avatar data too large for" << otherAvatar->getSessionUUID() + << "-" << avatarByteArray.size() << "bytes"; + + avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData, + flagsOut, true, false, glm::vec3(0), nullptr); + + if (avatarByteArray.size() > maxAvatarByteArraySize) { + qCWarning(avatars) << "Replicated avatar data without facial data still too large for" + << otherAvatar->getSessionUUID() << "-" << avatarByteArray.size() << "bytes"; + + avatarByteArray = otherAvatar->toByteArray(AvatarData::MinimumData, 0, emptyLastJointSendData, + flagsOut, true, false, glm::vec3(0), nullptr); } } - if (includeThisAvatar) { - numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122()); - numAvatarDataBytes += avatarPacketList->write(bytes); + if (avatarByteArray.size() <= maxAvatarByteArraySize) { + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); - if (detail != AvatarData::NoData) { - _stats.numOthersIncluded++; + // set the last sent sequence number for this sender on the receiver + nodeData->setLastBroadcastSequenceNumber(agentNode->getUUID(), + agentNodeData->getLastReceivedSequenceNumber()); - // increment the number of avatars sent to this reciever - nodeData->incrementNumAvatarsSentLastFrame(); + // increment the number of avatars sent to this reciever + nodeData->incrementNumAvatarsSentLastFrame(); - // set the last sent sequence number for this sender on the receiver - nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), - otherNodeData->getLastReceivedSequenceNumber()); + // start a new segment in the packet list for this avatar + avatarPacketList->startSegment(); - // remember the last time we sent details about this other node to the receiver - nodeData->setLastBroadcastTime(otherNode->getUUID(), start); - } + // write the node's UUID, the size of the replicated avatar data, + // the sequence number of the replicated avatar data, and the replicated avatar data + numAvatarDataBytes += avatarPacketList->write(agentNode->getUUID().toRfc4122()); + numAvatarDataBytes += avatarPacketList->writePrimitive((quint16) (avatarByteArray.size() + sequenceNumberSize)); + numAvatarDataBytes += avatarPacketList->writePrimitive(agentNodeData->getLastReceivedSequenceNumber()); + numAvatarDataBytes += avatarPacketList->write(avatarByteArray); + + avatarPacketList->endSegment(); + + } else { + qCWarning(avatars) << "Could not fit minimum data avatar for" << otherAvatar->getSessionUUID() + << "to packet list -" << avatarByteArray.size() << "bytes"; } - avatarPacketList->endSegment(); - quint64 endAvatarDataPacking = usecTimestampNow(); _stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking); - }; + } + }); + if (avatarPacketList->getNumPackets() > 0) { quint64 startPacketSending = usecTimestampNow(); // close the current packet so that we're always sending something @@ -398,21 +551,15 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) { _stats.numPacketsSent += (int)avatarPacketList->getNumPackets(); _stats.numBytesSent += numAvatarDataBytes; - // send the avatar data PacketList - nodeList->sendPacketList(std::move(avatarPacketList), *node); + // send the replicated bulk avatar data + auto nodeList = DependencyManager::get(); + nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket()); // record the bytes sent for other avatar data in the AvatarMixerClientData nodeData->recordSentAvatarData(numAvatarDataBytes); - // record the number of avatars held back this frame - nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); - nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); - quint64 endPacketSending = usecTimestampNow(); _stats.packetSendingElapsedTime += (endPacketSending - startPacketSending); } - - quint64 end = usecTimestampNow(); - _stats.jobElapsedTime += (end - start); } diff --git a/assignment-client/src/avatars/AvatarMixerSlave.h b/assignment-client/src/avatars/AvatarMixerSlave.h index 04141d9d72..69c707bbf1 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.h +++ b/assignment-client/src/avatars/AvatarMixerSlave.h @@ -21,6 +21,7 @@ public: quint64 processIncomingPacketsElapsedTime { 0 }; int nodesBroadcastedTo { 0 }; + int downstreamMixersBroadcastedTo { 0 }; int numPacketsSent { 0 }; int numBytesSent { 0 }; int numIdentityPackets { 0 }; @@ -41,6 +42,7 @@ public: // sending job stats nodesBroadcastedTo = 0; + downstreamMixersBroadcastedTo = 0; numPacketsSent = 0; numBytesSent = 0; numIdentityPackets = 0; @@ -60,6 +62,7 @@ public: processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime; nodesBroadcastedTo += rhs.nodesBroadcastedTo; + downstreamMixersBroadcastedTo += rhs.downstreamMixersBroadcastedTo; numPacketsSent += rhs.numPacketsSent; numBytesSent += rhs.numBytesSent; numIdentityPackets += rhs.numIdentityPackets; @@ -92,6 +95,10 @@ public: private: int sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode); + int sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode); + + void broadcastAvatarDataToAgent(const SharedNodePointer& node); + void broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node); // frame state ConstIter _begin; diff --git a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp index 07d4fa8851..cb5ae7735a 100644 --- a/assignment-client/src/avatars/AvatarMixerSlavePool.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlavePool.cpp @@ -76,8 +76,8 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end } void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, - p_high_resolution_clock::time_point lastFrameTimestamp, - float maxKbpsPerNode, float throttlingRatio) { + p_high_resolution_clock::time_point lastFrameTimestamp, + float maxKbpsPerNode, float throttlingRatio) { _function = &AvatarMixerSlave::broadcastAvatarData; _configure = [&](AvatarMixerSlave& slave) { slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio); diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index 817333bb72..fd0e107558 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1335,6 +1335,66 @@ "advanced": true } ] + }, + { + "name": "broadcasting", + "label": "Broadcasting", + "settings": [ + { + "name": "users", + "label": "Broadcasted Users", + "type": "table", + "can_add_new_rows": true, + "help": "Users that are broadcasted to downstream servers", + "numbered": false, + "columns": [ + { + "name": "user", + "label": "User", + "can_set": true + } + ] + }, + { + "name": "downstream_servers", + "label": "Receiving Servers", + "assignment-types": [0,1], + "type": "table", + "can_add_new_rows": true, + "help": "Servers that receive data for broadcasted users", + "numbered": false, + "columns": [ + { + "name": "address", + "label": "Address", + "can_set": true + }, + { + "name": "port", + "label": "Port", + "can_set": true + }, + { + "name": "server_type", + "label": "Server Type", + "type": "select", + "placeholder": "Audio Mixer", + "default": "Audio Mixer", + "can_set": true, + "options": [ + { + "value": "Audio Mixer", + "label": "Audio Mixer" + }, + { + "value": "Avatar Mixer", + "label": "Avatar Mixer" + } + ] + } + ] + } + ] } ] } diff --git a/domain-server/resources/web/settings/js/settings.js b/domain-server/resources/web/settings/js/settings.js index 0057df721a..ccd36da757 100644 --- a/domain-server/resources/web/settings/js/settings.js +++ b/domain-server/resources/web/settings/js/settings.js @@ -39,7 +39,8 @@ var Settings = { ACCESS_TOKEN_SELECTOR: '[name="metaverse.access_token"]', PLACES_TABLE_ID: 'places-table', FORM_ID: 'settings-form', - INVALID_ROW_CLASS: 'invalid-input' + INVALID_ROW_CLASS: 'invalid-input', + DATA_ROW_INDEX: 'data-row-index' }; var viewHelpers = { @@ -223,10 +224,10 @@ $(document).ready(function(){ // set focus to the first input in the new row $target.closest('table').find('tr.inputs input:first').focus(); } - + var tableRows = sibling.parent(); var tableBody = tableRows.parent(); - + // if theres no more siblings, we should jump to a new row if (sibling.next().length == 0 && tableRows.nextAll().length == 1) { tableBody.find("." + Settings.ADD_ROW_BUTTON_CLASS).click(); @@ -1005,7 +1006,7 @@ function saveSettings() { var password = formJSON["security"]["http_password"]; var verify_password = formJSON["security"]["verify_http_password"]; - // if they've only emptied out the default password field, we should go ahead and acknowledge + // if they've only emptied out the default password field, we should go ahead and acknowledge // the verify password field if (password != undefined && verify_password == undefined) { verify_password = ""; @@ -1158,8 +1159,9 @@ function makeTable(setting, keypath, setting_value) { } html += ""; + (isCategorized ? ("data-category='" + categoryValue + "'") : "") + " " + + (isArray ? "" : "name='" + keypath + "." + rowIndexOrName + "'") + + (isArray ? Settings.DATA_ROW_INDEX + "='" + (row_num - 1) + "'" : "" ) + ">"; if (setting.numbered === true) { html += "" + row_num + "" @@ -1289,6 +1291,17 @@ function makeTableHiddenInputs(setting, initialValues, categoryValue) { "" + ""; + } else if (col.type === "select") { + html += "" + html += ""; + html += ""; } else { html += " 0) { + row_index = parseInt(previous_row.attr(Settings.DATA_ROW_INDEX), 10) + 1; + } else { + row_index = 0; + } + + row.attr(Settings.DATA_ROW_INDEX, row_index); + } + var focusChanged = false; _.each(row.children(), function(element) { @@ -1440,19 +1468,23 @@ function addTableRow(row) { input.show(); var isCheckbox = input.hasClass("table-checkbox"); + var isDropdown = input.hasClass("table-dropdown"); if (isArray) { - var row_index = row.siblings('.' + Settings.DATA_ROW_CLASS).length var key = $(element).attr('name'); // are there multiple columns or just one? // with multiple we have an array of Objects, with one we have an array of whatever the value type is var num_columns = row.children('.' + Settings.DATA_COL_CLASS).length + var newName = setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : ""); - if (isCheckbox) { - input.attr("name", setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : "")) - } else { - input.attr("name", setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : "")) + input.attr("name", newName); + + if (isDropdown) { + // default values for hidden inputs inside child selects gets cleared so we need to remind it + var selectElement = $(element).children("select"); + selectElement.attr("data-hidden-input", newName); + $(element).children("input").val(selectElement.val()); } } else { // because the name of the setting in question requires the key @@ -1468,6 +1500,12 @@ function addTableRow(row) { focusChanged = true; } + // if we are adding a dropdown, we should go ahead and make its select + // element is visible + if (isDropdown) { + $(element).children("select").attr("style", ""); + } + if (isCheckbox) { $(input).find("input").attr("data-changed", "true"); } else { diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 0c1ebbf189..7c14d2e924 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -117,6 +117,10 @@ DomainServer::DomainServer(int argc, char* argv[]) : // if permissions are updated, relay the changes to the Node datastructures connect(&_settingsManager, &DomainServerSettingsManager::updateNodePermissions, &_gatekeeper, &DomainGatekeeper::updateNodePermissions); + connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, + this, &DomainServer::updateReplicatedNodes); + connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, + this, &DomainServer::updateDownstreamNodes); setupGroupCacheRefresh(); @@ -129,6 +133,9 @@ DomainServer::DomainServer(int argc, char* argv[]) : setupNodeListAndAssignments(); + updateReplicatedNodes(); + updateDownstreamNodes(); + if (_type != NonMetaverse) { // if we have a metaverse domain, we'll use an access token for API calls resetAccountManagerAccessToken(); @@ -958,6 +965,11 @@ void DomainServer::handleConnectedNode(SharedNodePointer newNode) { emit userConnected(); } + if (shouldReplicateNode(*newNode)) { + qDebug() << "Setting node to replicated: " << newNode->getUUID(); + newNode->setIsReplicated(true); + } + // send out this node to our other connected nodes broadcastNewNode(newNode); } @@ -2215,6 +2227,131 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer& _unfulfilledAssignments.enqueue(assignment); } +static const QString BROADCASTING_SETTINGS_KEY = "broadcasting"; + +void DomainServer::updateDownstreamNodes() { + auto settings = _settingsManager.getSettingsMap(); + if (settings.contains(BROADCASTING_SETTINGS_KEY)) { + auto nodeList = DependencyManager::get(); + std::vector downstreamNodesInSettings; + auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap(); + if (replicationSettings.contains("downstream_servers")) { + auto serversSettings = replicationSettings.value("downstream_servers").toList(); + + std::vector knownDownstreamNodes; + nodeList->eachNode([&](const SharedNodePointer& otherNode) { + if (NodeType::isDownstream(otherNode->getType())) { + knownDownstreamNodes.push_back(otherNode->getPublicSocket()); + } + }); + + for (auto& server : serversSettings) { + auto downstreamServer = server.toMap(); + + static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; + static const QString DOWNSTREAM_SERVER_PORT = "port"; + static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; + + // make sure we have the settings we need for this downstream server + if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) { + + auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString()); + auto downstreamNodeType = NodeType::downstreamType(nodeType); + + // read the address and port and construct a HifiSockAddr from them + HifiSockAddr downstreamServerAddr { + downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(), + (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt() + }; + downstreamNodesInSettings.push_back(downstreamServerAddr); + + bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(), + downstreamServerAddr) != knownDownstreamNodes.cend(); + if (!knownNode) { + // manually add the downstream node to our node list + auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType, + downstreamServerAddr, downstreamServerAddr); + node->setIsForcedNeverSilent(true); + + qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr; + + // manually activate the public socket for the downstream node + node->activatePublicSocket(); + } + } + + } + } + + // enumerate the nodes to determine which are no longer downstream for this domain + // collect them in a vector to separately remove them with handleKillNode (since eachNode has a read lock and + // we cannot recursively take the write lock required by handleKillNode) + std::vector nodesToKill; + nodeList->eachNode([&](const SharedNodePointer& otherNode) { + if (NodeType::isDownstream(otherNode->getType())) { + bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(), + otherNode->getPublicSocket()) != downstreamNodesInSettings.cend(); + if (!nodeInSettings) { + qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket(); + nodesToKill.push_back(otherNode); + } + } + }); + + for (auto& node : nodesToKill) { + handleKillNode(node); + } + } +} + +void DomainServer::updateReplicatedNodes() { + // Make sure we have downstream nodes in our list + auto settings = _settingsManager.getSettingsMap(); + + static const QString REPLICATED_USERS_KEY = "users"; + _replicatedUsernames.clear(); + + if (settings.contains(BROADCASTING_SETTINGS_KEY)) { + auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap(); + if (replicationSettings.contains(REPLICATED_USERS_KEY)) { + auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList(); + for (auto& username : usersSettings) { + _replicatedUsernames.push_back(username.toString().toLower()); + } + } + } + + auto nodeList = DependencyManager::get(); + nodeList->eachMatchingNode([this](const SharedNodePointer& otherNode) -> bool { + return otherNode->getType() == NodeType::Agent; + }, [this](const SharedNodePointer& otherNode) { + auto shouldReplicate = shouldReplicateNode(*otherNode); + auto isReplicated = otherNode->isReplicated(); + if (isReplicated && !shouldReplicate) { + qDebug() << "Setting node to NOT be replicated:" + << otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID(); + } else if (!isReplicated && shouldReplicate) { + qDebug() << "Setting node to replicated:" + << otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID(); + } + otherNode->setIsReplicated(shouldReplicate); + } + ); +} + +bool DomainServer::shouldReplicateNode(const Node& node) { + if (node.getType() == NodeType::Agent) { + QString verifiedUsername = node.getPermissions().getVerifiedUserName(); + + // Both the verified username and usernames in _replicatedUsernames are lowercase, so + // comparisons here are case-insensitive. + auto it = find(_replicatedUsernames.cbegin(), _replicatedUsernames.cend(), verifiedUsername); + return it != _replicatedUsernames.end(); + } else { + return false; + } +}; + void DomainServer::nodeAdded(SharedNodePointer node) { // we don't use updateNodeWithData, so add the DomainServerNodeData to the node here node->setLinkedData(std::unique_ptr { new DomainServerNodeData() }); diff --git a/domain-server/src/DomainServer.h b/domain-server/src/DomainServer.h index 63b82cb37d..7e43397e9c 100644 --- a/domain-server/src/DomainServer.h +++ b/domain-server/src/DomainServer.h @@ -102,6 +102,9 @@ private slots: void handleOctreeFileReplacement(QByteArray octreeFile); + void updateReplicatedNodes(); + void updateDownstreamNodes(); + signals: void iceServerChanged(); void userConnected(); @@ -161,12 +164,16 @@ private: QJsonObject jsonForSocket(const HifiSockAddr& socket); QJsonObject jsonObjectForNode(const SharedNodePointer& node); + bool shouldReplicateNode(const Node& node); + void setupGroupCacheRefresh(); QString pathForRedirect(QString path = QString()) const; SubnetList _acSubnetWhitelist; + std::vector _replicatedUsernames; + DomainGatekeeper _gatekeeper; HTTPManager _httpManager; diff --git a/domain-server/src/DomainServerSettingsManager.cpp b/domain-server/src/DomainServerSettingsManager.cpp index d6b57b450a..9279648319 100644 --- a/domain-server/src/DomainServerSettingsManager.cpp +++ b/domain-server/src/DomainServerSettingsManager.cpp @@ -991,6 +991,7 @@ bool DomainServerSettingsManager::handleAuthenticatedHTTPRequest(HTTPConnection unpackPermissions(); apiRefreshGroupInformation(); emit updateNodePermissions(); + emit settingsUpdated(); } return true; @@ -1196,13 +1197,14 @@ QJsonObject DomainServerSettingsManager::settingDescriptionFromGroup(const QJson bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJsonObject& postedObject) { static const QString SECURITY_ROOT_KEY = "security"; static const QString AC_SUBNET_WHITELIST_KEY = "ac_subnet_whitelist"; + static const QString BROADCASTING_KEY = "broadcasting"; auto& settingsVariant = _configMap.getConfig(); bool needRestart = false; // Iterate on the setting groups foreach(const QString& rootKey, postedObject.keys()) { - QJsonValue rootValue = postedObject[rootKey]; + const QJsonValue& rootValue = postedObject[rootKey]; if (!settingsVariant.contains(rootKey)) { // we don't have a map below this key yet, so set it up now @@ -1247,7 +1249,7 @@ bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJ if (!matchingDescriptionObject.isEmpty()) { updateSetting(rootKey, rootValue, *thisMap, matchingDescriptionObject); - if (rootKey != SECURITY_ROOT_KEY) { + if (rootKey != SECURITY_ROOT_KEY && rootKey != BROADCASTING_KEY) { needRestart = true; } } else { @@ -1261,9 +1263,10 @@ bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJ // if we matched the setting then update the value if (!matchingDescriptionObject.isEmpty()) { - QJsonValue settingValue = rootValue.toObject()[settingKey]; + const QJsonValue& settingValue = rootValue.toObject()[settingKey]; updateSetting(settingKey, settingValue, *thisMap, matchingDescriptionObject); - if (rootKey != SECURITY_ROOT_KEY || settingKey == AC_SUBNET_WHITELIST_KEY) { + if ((rootKey != SECURITY_ROOT_KEY && rootKey != BROADCASTING_KEY) + || settingKey == AC_SUBNET_WHITELIST_KEY) { needRestart = true; } } else { diff --git a/domain-server/src/DomainServerSettingsManager.h b/domain-server/src/DomainServerSettingsManager.h index d56a786d4b..4c7d8dfbc9 100644 --- a/domain-server/src/DomainServerSettingsManager.h +++ b/domain-server/src/DomainServerSettingsManager.h @@ -108,6 +108,7 @@ public: signals: void updateNodePermissions(); + void settingsUpdated(); public slots: void apiGetGroupIDJSONCallback(QNetworkReply& requestReply); diff --git a/libraries/audio/src/InboundAudioStream.cpp b/libraries/audio/src/InboundAudioStream.cpp index 88ec7e7bc0..da8997895c 100644 --- a/libraries/audio/src/InboundAudioStream.cpp +++ b/libraries/audio/src/InboundAudioStream.cpp @@ -127,6 +127,7 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { // parse the info after the seq number and before the audio data (the stream properties) int prePropertyPosition = message.getPosition(); int propertyBytes = parseStreamProperties(message.getType(), message.readWithoutCopy(message.getBytesLeftToRead()), networkFrames); + message.seek(prePropertyPosition + propertyBytes); // handle this packet based on its arrival status. @@ -147,7 +148,8 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { } case SequenceNumberStats::OnTime: { // Packet is on time; parse its data to the ringbuffer - if (message.getType() == PacketType::SilentAudioFrame) { + if (message.getType() == PacketType::SilentAudioFrame + || message.getType() == PacketType::ReplicatedSilentAudioFrame) { // If we recieved a SilentAudioFrame from our sender, we might want to drop // some of the samples in order to catch up to our desired jitter buffer size. writeDroppableSilentFrames(networkFrames); @@ -168,7 +170,10 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { // inform others of the mismatch auto sendingNode = DependencyManager::get()->nodeWithUUID(message.getSourceID()); - emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket); + if (sendingNode) { + emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket); + } + } } break; diff --git a/libraries/avatars/src/AvatarData.cpp b/libraries/avatars/src/AvatarData.cpp index 7731d53ec3..eb4a02cb62 100644 --- a/libraries/avatars/src/AvatarData.cpp +++ b/libraries/avatars/src/AvatarData.cpp @@ -1473,27 +1473,6 @@ QStringList AvatarData::getJointNames() const { return _jointNames; } -void AvatarData::parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut) { - QDataStream packetStream(data); - - packetStream >> identityOut.uuid - >> identityOut.skeletonModelURL - >> identityOut.attachmentData - >> identityOut.displayName - >> identityOut.sessionDisplayName - >> identityOut.avatarEntityData - >> identityOut.sequenceId; - -#ifdef WANT_DEBUG - qCDebug(avatars) << __FUNCTION__ - << "identityOut.uuid:" << identityOut.uuid - << "identityOut.skeletonModelURL:" << identityOut.skeletonModelURL - << "identityOut.displayName:" << identityOut.displayName - << "identityOut.sessionDisplayName:" << identityOut.sessionDisplayName; -#endif - -} - glm::quat AvatarData::getOrientationOutbound() const { return (getLocalOrientation()); } @@ -1504,47 +1483,82 @@ QUrl AvatarData::cannonicalSkeletonModelURL(const QUrl& emptyURL) const { return _skeletonModelURL.scheme() == "file" ? emptyURL : _skeletonModelURL; } -void AvatarData::processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged, bool& skeletonModelUrlChanged) { +void AvatarData::processAvatarIdentity(const QByteArray& identityData, bool& identityChanged, + bool& displayNameChanged, bool& skeletonModelUrlChanged) { - if (identity.sequenceId < _identitySequenceId) { - qCDebug(avatars) << "Ignoring older identity packet for avatar" << getSessionUUID() - << "_identitySequenceId (" << _identitySequenceId << ") is greater than" << identity.sequenceId; - return; + QDataStream packetStream(identityData); + + QUuid avatarSessionID; + + // peek the sequence number, this will tell us if we should be processing this identity packet at all + udt::SequenceNumber::Type incomingSequenceNumberType; + packetStream >> avatarSessionID >> incomingSequenceNumberType; + udt::SequenceNumber incomingSequenceNumber(incomingSequenceNumberType); + + if (!_hasProcessedFirstIdentity) { + _lastSequenceNumber = incomingSequenceNumber - 1; + _hasProcessedFirstIdentity = true; + qCDebug(avatars) << "Processing first identity packet for" << avatarSessionID << "-" + << (udt::SequenceNumber::Type) incomingSequenceNumber; } - // otherwise, set the identitySequenceId to match the incoming identity - _identitySequenceId = identity.sequenceId; - if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) { - setSkeletonModelURL(identity.skeletonModelURL); - identityChanged = true; - skeletonModelUrlChanged = true; - if (_firstSkeletonCheck) { + if (incomingSequenceNumber > _lastSequenceNumber) { + Identity identity; + + packetStream >> identity.skeletonModelURL + >> identity.attachmentData + >> identity.displayName + >> identity.sessionDisplayName + >> identity.avatarEntityData; + + // set the store identity sequence number to match the incoming identity + _lastSequenceNumber = incomingSequenceNumber; + + if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) { + setSkeletonModelURL(identity.skeletonModelURL); + identityChanged = true; + skeletonModelUrlChanged = true; + if (_firstSkeletonCheck) { + displayNameChanged = true; + } + _firstSkeletonCheck = false; + } + + if (identity.displayName != _displayName) { + _displayName = identity.displayName; + identityChanged = true; displayNameChanged = true; } - _firstSkeletonCheck = false; - } + maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName); - if (identity.displayName != _displayName) { - _displayName = identity.displayName; - identityChanged = true; - displayNameChanged = true; - } - maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName); + if (identity.attachmentData != _attachmentData) { + setAttachmentData(identity.attachmentData); + identityChanged = true; + } - if (identity.attachmentData != _attachmentData) { - setAttachmentData(identity.attachmentData); - identityChanged = true; - } + bool avatarEntityDataChanged = false; + _avatarEntitiesLock.withReadLock([&] { + avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData); + }); + + if (avatarEntityDataChanged) { + setAvatarEntityData(identity.avatarEntityData); + identityChanged = true; + } - bool avatarEntityDataChanged = false; - _avatarEntitiesLock.withReadLock([&] { - avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData); - }); - if (avatarEntityDataChanged) { - setAvatarEntityData(identity.avatarEntityData); - identityChanged = true; - } +#ifdef WANT_DEBUG + qCDebug(avatars) << __FUNCTION__ + << "identity.uuid:" << identity.uuid + << "identity.skeletonModelURL:" << identity.skeletonModelURL + << "identity.displayName:" << identity.displayName + << "identity.sessionDisplayName:" << identity.sessionDisplayName; + } else { + qCDebug(avatars) << "Refusing to process identity for" << uuidStringWithoutCurlyBraces(avatarSessionID) << "since" + << (udt::SequenceNumber::Type) _lastSequenceNumber + << "is >=" << (udt::SequenceNumber::Type) incomingSequenceNumber; +#endif + } } QByteArray AvatarData::identityByteArray() const { @@ -1552,14 +1566,17 @@ QByteArray AvatarData::identityByteArray() const { QDataStream identityStream(&identityData, QIODevice::Append); const QUrl& urlToSend = cannonicalSkeletonModelURL(emptyURL); // depends on _skeletonModelURL + // when mixers send identity packets to agents, they simply forward along the last incoming sequence number they received + // whereas agents send a fresh outgoing sequence number when identity data has changed + _avatarEntitiesLock.withReadLock([&] { identityStream << getSessionUUID() - << urlToSend - << _attachmentData - << _displayName - << getSessionDisplayNameForTransport() // depends on _sessionDisplayName - << _avatarEntityData - << _identitySequenceId; + << (udt::SequenceNumber::Type) _lastSequenceNumber + << urlToSend + << _attachmentData + << _displayName + << getSessionDisplayNameForTransport() // depends on _sessionDisplayName + << _avatarEntityData; }); return identityData; @@ -1735,6 +1752,12 @@ void AvatarData::sendAvatarDataPacket() { void AvatarData::sendIdentityPacket() { auto nodeList = DependencyManager::get(); + + if (_identityDataChanged) { + // if the identity data has changed, push the sequence number forwards + ++_lastSequenceNumber; + } + QByteArray identityData = identityByteArray(); auto packetList = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true); @@ -1745,7 +1768,7 @@ void AvatarData::sendIdentityPacket() { }, [&](const SharedNodePointer& node) { nodeList->sendPacketList(std::move(packetList), *node); - }); + }); _avatarEntityDataLocallyEdited = false; _identityDataChanged = false; diff --git a/libraries/avatars/src/AvatarData.h b/libraries/avatars/src/AvatarData.h index 8941d9d95f..8d09f936b5 100644 --- a/libraries/avatars/src/AvatarData.h +++ b/libraries/avatars/src/AvatarData.h @@ -52,15 +52,16 @@ typedef unsigned long long quint64; #include #include #include -#include -#include -#include #include #include -#include +#include #include -#include +#include +#include +#include #include +#include +#include #include "AABox.h" #include "HeadData.h" @@ -526,19 +527,17 @@ public: const HeadData* getHeadData() const { return _headData; } struct Identity { - QUuid uuid; QUrl skeletonModelURL; QVector attachmentData; QString displayName; QString sessionDisplayName; AvatarEntityMap avatarEntityData; - quint64 sequenceId; }; - static void parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut); - + // identityChanged returns true if identity has changed, false otherwise. // identityChanged returns true if identity has changed, false otherwise. Similarly for displayNameChanged and skeletonModelUrlChange. - void processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged, bool& skeletonModelUrlChanged); + void processAvatarIdentity(const QByteArray& identityData, bool& identityChanged, + bool& displayNameChanged, bool& skeletonModelUrlChanged); QByteArray identityByteArray() const; @@ -624,10 +623,7 @@ public: static float _avatarSortCoefficientAge; bool getIdentityDataChanged() const { return _identityDataChanged; } // has the identity data changed since the last time sendIdentityPacket() was called - void markIdentityDataChanged() { - _identityDataChanged = true; - _identitySequenceId++; - } + void markIdentityDataChanged() { _identityDataChanged = true; } float getDensity() const { return _density; } @@ -785,7 +781,8 @@ protected: float _audioAverageLoudness { 0.0f }; bool _identityDataChanged { false }; - quint64 _identitySequenceId { 0 }; + udt::SequenceNumber _lastSequenceNumber { 0 }; + bool _hasProcessedFirstIdentity { false }; float _density; private: diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index fb954f4731..e8c37bdaa8 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -126,8 +126,14 @@ AvatarSharedPointer AvatarHashMap::parseAvatarData(QSharedPointer message, SharedNodePointer sendingNode) { - AvatarData::Identity identity; - AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity); + + // peek the avatar UUID from the incoming packet + QUuid identityUUID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID)); + + if (identityUUID.isNull()) { + qCDebug(avatars) << "Refusing to process identity packet for null avatar ID"; + return; + } // make sure this isn't for an ignored avatar auto nodeList = DependencyManager::get(); @@ -136,21 +142,22 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer { QReadLocker locker(&_hashLock); auto me = _avatarHash.find(EMPTY); - if ((me != _avatarHash.end()) && (identity.uuid == me.value()->getSessionUUID())) { + if ((me != _avatarHash.end()) && (identityUUID == me.value()->getSessionUUID())) { // We add MyAvatar to _avatarHash with an empty UUID. Code relies on this. In order to correctly handle an // identity packet for ourself (such as when we are assigned a sessionDisplayName by the mixer upon joining), // we make things match here. - identity.uuid = EMPTY; + identityUUID = EMPTY; } } - if (!nodeList->isIgnoringNode(identity.uuid) || nodeList->getRequestsDomainListData()) { + + if (!nodeList->isIgnoringNode(identityUUID) || nodeList->getRequestsDomainListData()) { // mesh URL for a UUID, find avatar in our list - auto avatar = newOrExistingAvatar(identity.uuid, sendingNode); + auto avatar = newOrExistingAvatar(identityUUID, sendingNode); bool identityChanged = false; bool displayNameChanged = false; bool skeletonModelUrlChanged = false; // In this case, the "sendingNode" is the Avatar Mixer. - avatar->processAvatarIdentity(identity, identityChanged, displayNameChanged, skeletonModelUrlChanged); + avatar->processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged, skeletonModelUrlChanged); } } diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index f9baff0daf..d2bae28820 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -446,7 +446,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr packetList, return _nodeSocket.writePacketList(std::move(packetList), *activeSocket); } else { - qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node. Not sending."; + qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node " + << destinationNode.getUUID() << ". Not sending."; return ERROR_SENDING_PACKET_BYTES; } } @@ -454,7 +455,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr packetList, qint64 LimitedNodeList::sendPacket(std::unique_ptr packet, const Node& destinationNode, const HifiSockAddr& overridenSockAddr) { if (overridenSockAddr.isNull() && !destinationNode.getActiveSocket()) { - qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node. Not sending."; + qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node" + << destinationNode.getUUID() << ". Not sending."; return ERROR_SENDING_PACKET_BYTES; } @@ -568,8 +570,8 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) { SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, - const NodePermissions& permissions, - const QUuid& connectionSecret) { + bool isReplicated, bool isUpstream, + const QUuid& connectionSecret, const NodePermissions& permissions) { QReadLocker readLocker(&_nodeMutex); NodeHash::const_iterator it = _nodeHash.find(uuid); @@ -580,11 +582,20 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t matchingNode->setLocalSocket(localSocket); matchingNode->setPermissions(permissions); matchingNode->setConnectionSecret(connectionSecret); + matchingNode->setIsReplicated(isReplicated); + matchingNode->setIsUpstream(isUpstream); return matchingNode; } else { // we didn't have this node, so add them - Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, connectionSecret); + Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); + newNode->setIsReplicated(isReplicated); + newNode->setIsUpstream(isUpstream); + newNode->setConnectionSecret(connectionSecret); + newNode->setPermissions(permissions); + + // move the newly constructed node to the LNL thread + newNode->moveToThread(thread()); if (nodeType == NodeType::AudioMixer) { LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer); @@ -742,7 +753,8 @@ void LimitedNodeList::removeSilentNodes() { SharedNodePointer node = it->second; node->getMutex().lock(); - if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { + if (!node->isForcedNeverSilent() + && (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { // call the NodeHash erase to get rid of this node it = _nodeHash.unsafe_erase(it); diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 554386f786..ab61c71952 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -145,8 +145,9 @@ public: SharedNodePointer addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, - const NodePermissions& permissions = DEFAULT_AGENT_PERMISSIONS, - const QUuid& connectionSecret = QUuid()); + bool isReplicated = false, bool isUpstream = false, + const QUuid& connectionSecret = QUuid(), + const NodePermissions& permissions = DEFAULT_AGENT_PERMISSIONS); static bool parseSTUNResponse(udt::BasePacket* packet, QHostAddress& newPublicAddress, uint16_t& newPublicPort); bool hasCompletedInitialSTUN() const { return _hasCompletedInitialSTUN; } @@ -257,6 +258,16 @@ public: return SharedNodePointer(); } + // This is unsafe because it does not take a lock + // Must only be called when you know that a read lock on the node mutex is held + // and will be held for the duration of your iteration + template + void unsafeEachNode(NodeLambda functor) { + for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) { + functor(it->second); + } + } + void putLocalPortIntoSharedMemory(const QString key, QObject* parent, quint16 localPort); bool getLocalServerPortFromSharedMemory(const QString key, quint16& localPort); @@ -386,6 +397,7 @@ protected: } } + private slots: void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp); void possiblyTimeoutSTUNAddressLookup(); diff --git a/libraries/networking/src/NLPacket.h b/libraries/networking/src/NLPacket.h index 33de262dfb..f9056bbfaa 100644 --- a/libraries/networking/src/NLPacket.h +++ b/libraries/networking/src/NLPacket.h @@ -42,6 +42,7 @@ public: static std::unique_ptr fromReceivedPacket(std::unique_ptr data, qint64 size, const HifiSockAddr& senderSockAddr); + static std::unique_ptr fromBase(std::unique_ptr packet); // Provided for convenience, try to limit use diff --git a/libraries/networking/src/NLPacketList.h b/libraries/networking/src/NLPacketList.h index 48ce5ef81a..910d39f71b 100644 --- a/libraries/networking/src/NLPacketList.h +++ b/libraries/networking/src/NLPacketList.h @@ -23,6 +23,8 @@ public: PacketVersion getVersion() const { return _packetVersion; } const QUuid& getSourceID() const { return _sourceID; } + + qint64 getMaxSegmentSize() const override { return NLPacket::maxPayloadSize(_packetType, _isOrdered); } private: NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false, diff --git a/libraries/networking/src/NetworkPeer.h b/libraries/networking/src/NetworkPeer.h index 7185ffef29..9842768b37 100644 --- a/libraries/networking/src/NetworkPeer.h +++ b/libraries/networking/src/NetworkPeer.h @@ -76,6 +76,14 @@ public: float getOutboundBandwidth() const; // in kbps float getInboundBandwidth() const; // in kbps + // Typically the LimitedNodeList removes nodes after they are "silent" + // meaning that we have not received any packets (including simple keepalive pings) from them for a set interval. + // The _isForcedNeverSilent flag tells the LimitedNodeList that a Node should never be killed by removeSilentNodes() + // even if its the timestamp of when it was last heard from has never been updated. + + bool isForcedNeverSilent() const { return _isForcedNeverSilent; } + void setIsForcedNeverSilent(bool isForcedNeverSilent) { _isForcedNeverSilent = isForcedNeverSilent; } + friend QDataStream& operator<<(QDataStream& out, const NetworkPeer& peer); friend QDataStream& operator>>(QDataStream& in, NetworkPeer& peer); public slots: @@ -103,6 +111,8 @@ protected: QTimer* _pingTimer = NULL; int _connectionAttempts; + + bool _isForcedNeverSilent { false }; }; QDebug operator<<(QDebug debug, const NetworkPeer &peer); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index 60227eeaa1..ea1b6e0eb5 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -42,6 +42,8 @@ void NodeType::init() { TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer"); TypeNameHash.insert(NodeType::AssetServer, "Asset Server"); TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); + TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer"); + TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); } @@ -50,17 +52,34 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) { return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME; } +bool NodeType::isDownstream(NodeType_t nodeType) { + return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; +} + +NodeType_t NodeType::downstreamType(NodeType_t primaryType) { + switch (primaryType) { + case AudioMixer: + return DownstreamAudioMixer; + case AvatarMixer: + return DownstreamAvatarMixer; + default: + return Unassigned; + } +} + +NodeType_t NodeType::fromString(QString type) { + return TypeNameHash.key(type, NodeType::Unassigned); +} + + Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, - const HifiSockAddr& localSocket, const NodePermissions& permissions, const QUuid& connectionSecret, - QObject* parent) : + const HifiSockAddr& localSocket, QObject* parent) : NetworkPeer(uuid, publicSocket, localSocket, parent), _type(type), - _connectionSecret(connectionSecret), _pingMs(-1), // "Uninitialized" _clockSkewUsec(0), _mutex(), - _clockSkewMovingPercentile(30, 0.8f), // moving 80th percentile of 30 samples - _permissions(permissions) + _clockSkewMovingPercentile(30, 0.8f) // moving 80th percentile of 30 samples { // Update socket's object name setType(_type); @@ -135,6 +154,7 @@ QDataStream& operator<<(QDataStream& out, const Node& node) { out << node._publicSocket; out << node._localSocket; out << node._permissions; + out << node._isReplicated; return out; } @@ -144,6 +164,7 @@ QDataStream& operator>>(QDataStream& in, Node& node) { in >> node._publicSocket; in >> node._localSocket; in >> node._permissions; + in >> node._isReplicated; return in; } diff --git a/libraries/networking/src/Node.h b/libraries/networking/src/Node.h index 1bb3a0cdc8..c20ff5a395 100644 --- a/libraries/networking/src/Node.h +++ b/libraries/networking/src/Node.h @@ -37,9 +37,9 @@ class Node : public NetworkPeer { Q_OBJECT public: + Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, - const NodePermissions& permissions, const QUuid& connectionSecret = QUuid(), QObject* parent = nullptr); bool operator==(const Node& otherNode) const { return _uuid == otherNode._uuid; } @@ -48,6 +48,12 @@ public: char getType() const { return _type; } void setType(char type); + bool isReplicated() const { return _isReplicated; } + void setIsReplicated(bool isReplicated) { _isReplicated = isReplicated; } + + bool isUpstream() const { return _isUpstream; } + void setIsUpstream(bool isUpstream) { _isUpstream = isUpstream; } + const QUuid& getConnectionSecret() const { return _connectionSecret; } void setConnectionSecret(const QUuid& connectionSecret) { _connectionSecret = connectionSecret; } @@ -89,13 +95,16 @@ private: QUuid _connectionSecret; std::unique_ptr _linkedData; + bool _isReplicated { false }; int _pingMs; qint64 _clockSkewUsec; QMutex _mutex; MovingPercentile _clockSkewMovingPercentile; NodePermissions _permissions; + bool _isUpstream { false }; tbb::concurrent_unordered_set _ignoredNodeIDSet; mutable QReadWriteLock _ignoredNodeIDSetLock; + std::vector _replicatedUsernames { }; std::atomic_bool _ignoreRadiusEnabled; }; diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 2aa30b84aa..f502cb2a3c 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -654,8 +654,9 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { QUuid nodeUUID, connectionUUID; HifiSockAddr nodePublicSocket, nodeLocalSocket; NodePermissions permissions; + bool isReplicated; - packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions; + packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions >> isReplicated; // if the public socket address is 0 then it's reachable at the same IP // as the domain server @@ -666,7 +667,12 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { packetStream >> connectionUUID; SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, - nodeLocalSocket, permissions, connectionUUID); + nodeLocalSocket, isReplicated, false, connectionUUID, permissions); + + // nodes that are downstream of our own type are kept alive when we hear about them from the domain server + if (node->getType() == NodeType::downstreamType(_ownerType)) { + node->setLastHeardMicrostamp(usecTimestampNow()); + } } void NodeList::sendAssignment(Assignment& assignment) { @@ -711,14 +717,20 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::startNodeHolePunch(const SharedNodePointer& node) { - // connect to the correct signal on this node so we know when to ping it - connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); - // start the ping timer for this node - node->startPingTimer(); + // we don't hole punch to downstream servers, since it is assumed that we have a direct line to them + // we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them - // ping this node immediately - pingPunchForInactiveNode(node); + if (!NodeType::isDownstream(node->getType()) && !node->isUpstream()) { + // connect to the correct signal on this node so we know when to ping it + connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); + + // start the ping timer for this node + node->startPingTimer(); + + // ping this node immediately + pingPunchForInactiveNode(node); + } } void NodeList::handleNodePingTimeout() { @@ -761,8 +773,11 @@ void NodeList::stopKeepalivePingTimer() { } void NodeList::sendKeepAlivePings() { + // send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node + eachMatchingNode([this](const SharedNodePointer& node)->bool { - return _nodeTypesOfInterest.contains(node->getType()); + auto type = node->getType(); + return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type); }, [&](const SharedNodePointer& node) { sendPacket(constructPingPacket(), *node); }); @@ -1120,4 +1135,4 @@ void NodeList::setRequestsDomainListData(bool isRequesting) { void NodeList::startThread() { moveToNewNamedThread(this, "NodeList Thread", QThread::TimeCriticalPriority); -} \ No newline at end of file +} diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index 5ae7a835b6..dacd5e1a68 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -25,10 +25,17 @@ namespace NodeType { const NodeType_t AssetServer = 'A'; const NodeType_t MessagesMixer = 'm'; const NodeType_t EntityScriptServer = 'S'; + const NodeType_t DownstreamAudioMixer = 'a'; + const NodeType_t DownstreamAvatarMixer = 'w'; const NodeType_t Unassigned = 1; void init(); + const QString& getNodeTypeName(NodeType_t nodeType); + bool isDownstream(NodeType_t nodeType); + NodeType_t downstreamType(NodeType_t primaryType); + + NodeType_t fromString(QString type); } typedef QSet NodeSet; diff --git a/libraries/networking/src/ReceivedMessage.cpp b/libraries/networking/src/ReceivedMessage.cpp index 2c5a11334b..6ca249fb22 100644 --- a/libraries/networking/src/ReceivedMessage.cpp +++ b/libraries/networking/src/ReceivedMessage.cpp @@ -42,6 +42,20 @@ ReceivedMessage::ReceivedMessage(NLPacket& packet) { } +ReceivedMessage::ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion, + const HifiSockAddr& senderSockAddr, QUuid sourceID) : + _data(byteArray), + _headData(_data.mid(0, HEAD_DATA_SIZE)), + _numPackets(1), + _sourceID(sourceID), + _packetType(packetType), + _packetVersion(packetVersion), + _senderSockAddr(senderSockAddr), + _isComplete(true) +{ + +} + void ReceivedMessage::setFailed() { _failed = true; _isComplete = true; diff --git a/libraries/networking/src/ReceivedMessage.h b/libraries/networking/src/ReceivedMessage.h index 3acb7163e7..ae51e7592a 100644 --- a/libraries/networking/src/ReceivedMessage.h +++ b/libraries/networking/src/ReceivedMessage.h @@ -24,6 +24,8 @@ class ReceivedMessage : public QObject { public: ReceivedMessage(const NLPacketList& packetList); ReceivedMessage(NLPacket& packet); + ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion, + const HifiSockAddr& senderSockAddr, QUuid sourceID = QUuid()); QByteArray getMessage() const { return _data; } const char* getRawMessage() const { return _data.constData(); } diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 9db540cae2..18e4593c91 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -10,6 +10,7 @@ // #include +#include #include #include #include diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h index e9832a2a91..8b35acaac5 100644 --- a/libraries/networking/src/ThreadedAssignment.h +++ b/libraries/networking/src/ThreadedAssignment.h @@ -18,6 +18,8 @@ #include "Assignment.h" +using DownstreamNodeFoundCallback = std::function; + class ThreadedAssignment : public Assignment { Q_OBJECT public: @@ -40,6 +42,7 @@ signals: protected: void commonInit(const QString& targetName, NodeType_t nodeType); + bool _isFinished; QTimer _domainServerTimer; QTimer _statsTimer; diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index d59da2f726..fb416cac4d 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -39,7 +39,19 @@ const QSet NON_SOURCED_PACKETS = QSet() << PacketType::ICEServerPeerInformation << PacketType::ICEServerQuery << PacketType::ICEServerHeartbeat << PacketType::ICEServerHeartbeatACK << PacketType::ICEPing << PacketType::ICEPingReply << PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode - << PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement; + << PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement + << PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho + << PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame + << PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedKillAvatar << PacketType::ReplicatedBulkAvatarData; + +const QHash REPLICATED_PACKET_MAPPING { + { PacketType::MicrophoneAudioNoEcho, PacketType::ReplicatedMicrophoneAudioNoEcho }, + { PacketType::MicrophoneAudioWithEcho, PacketType::ReplicatedMicrophoneAudioWithEcho }, + { PacketType::InjectAudio, PacketType::ReplicatedInjectAudio }, + { PacketType::SilentAudioFrame, PacketType::ReplicatedSilentAudioFrame }, + { PacketType::AvatarIdentity, PacketType::ReplicatedAvatarIdentity }, + { PacketType::KillAvatar, PacketType::ReplicatedKillAvatar }, +}; PacketVersion versionForPacketType(PacketType packetType) { switch (packetType) { @@ -56,7 +68,7 @@ PacketVersion versionForPacketType(PacketType packetType) { case PacketType::AvatarData: case PacketType::BulkAvatarData: case PacketType::KillAvatar: - return static_cast(AvatarMixerPacketVersion::MannequinDefaultAvatar); + return static_cast(AvatarMixerPacketVersion::AvatarIdentitySequenceFront); case PacketType::MessagesData: return static_cast(MessageDataVersion::TextOrBinaryData); case PacketType::ICEServerHeartbeat: @@ -119,7 +131,7 @@ static void ensureProtocolVersionsSignature() { std::call_once(once, [&] { QByteArray buffer; QDataStream stream(&buffer, QIODevice::WriteOnly); - uint8_t numberOfProtocols = static_cast(PacketType::LAST_PACKET_TYPE) + 1; + uint8_t numberOfProtocols = static_cast(PacketType::NUM_PACKET_TYPE); stream << numberOfProtocols; for (uint8_t packetType = 0; packetType < numberOfProtocols; packetType++) { uint8_t packetTypeVersion = static_cast(versionForPacketType(static_cast(packetType))); diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index fa1151e0a6..2944c1ce93 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -115,12 +115,21 @@ public: AdjustAvatarSorting, OctreeFileReplacement, CollisionEventChanges, - LAST_PACKET_TYPE = CollisionEventChanges + ReplicatedMicrophoneAudioNoEcho, + ReplicatedMicrophoneAudioWithEcho, + ReplicatedInjectAudio, + ReplicatedSilentAudioFrame, + ReplicatedAvatarIdentity, + ReplicatedKillAvatar, + ReplicatedBulkAvatarData, + NUM_PACKET_TYPE }; }; using PacketType = PacketTypeEnum::Value; +extern const QHash REPLICATED_PACKET_MAPPING; + const int NUM_BYTES_MD5_HASH = 16; typedef char PacketVersion; @@ -237,7 +246,8 @@ enum class AvatarMixerPacketVersion : PacketVersion { StickAndBallDefaultAvatar, IdentityPacketsIncludeUpdateTime, AvatarIdentitySequenceId, - MannequinDefaultAvatar + MannequinDefaultAvatar, + AvatarIdentitySequenceFront }; enum class DomainConnectRequestVersion : PacketVersion { diff --git a/libraries/networking/src/udt/PacketList.cpp b/libraries/networking/src/udt/PacketList.cpp index 8651f9eed4..d69ff39197 100644 --- a/libraries/networking/src/udt/PacketList.cpp +++ b/libraries/networking/src/udt/PacketList.cpp @@ -36,8 +36,8 @@ std::unique_ptr PacketList::fromReceivedPackets(std::list> _packets; + + bool _isOrdered = false; private: friend class ::LimitedNodeList; @@ -93,7 +97,6 @@ private: Packet::MessageNumber _messageNumber; bool _isReliable = false; - bool _isOrdered = false; std::unique_ptr _currentPacket; diff --git a/libraries/networking/src/udt/SequenceNumber.h b/libraries/networking/src/udt/SequenceNumber.h index 3abc80bdd8..2c82eccfa3 100644 --- a/libraries/networking/src/udt/SequenceNumber.h +++ b/libraries/networking/src/udt/SequenceNumber.h @@ -35,8 +35,8 @@ public: explicit SequenceNumber(char* value) { _value = (*reinterpret_cast(value)) & MAX; } explicit SequenceNumber(Type value) { _value = (value <= MAX) ? ((value >= 0) ? value : 0) : MAX; } explicit SequenceNumber(UType value) { _value = (value <= MAX) ? value : MAX; } - explicit operator Type() { return _value; } - explicit operator UType() { return static_cast(_value); } + explicit operator Type() const { return _value; } + explicit operator UType() const { return static_cast(_value); } inline SequenceNumber& operator++() { _value = (_value + 1) % (MAX + 1);