From c7b3b79a235c5ea7fd2f1f2c03da5c6abde2a4dd Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 11:13:43 -0700 Subject: [PATCH 01/23] use replicant nodes for audio packet replication --- .../src/audio/AudioMixerClientData.cpp | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 70c9b615d8..f28e8dfa0b 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -108,30 +108,34 @@ void AudioMixerClientData::processPackets() { void AudioMixerClientData::replicatePacket(ReceivedMessage& message) { auto nodeList = DependencyManager::get(); - if (!nodeList->getMirrorSocket().isNull()) { - PacketType mirroredType; - if (message.getType() == PacketType::MicrophoneAudioNoEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::InjectAudio) { - mirroredType = PacketType::ReplicatedInjectAudio; - } else if (message.getType() == PacketType::SilentAudioFrame) { - mirroredType = PacketType::ReplicatedSilentAudioFrame; - } else if (message.getType() == PacketType::NegotiateAudioFormat) { - mirroredType = PacketType::ReplicatedNegotiateAudioFormat; - } else { - return; - } + PacketType mirroredType; - // construct an NLPacket to send to the replicant that has the contents of the received packet - auto packet = NLPacket::create(mirroredType, message.getSize() + NUM_BYTES_RFC4122_UUID); - packet->write(message.getSourceID().toRfc4122()); - packet->write(message.getMessage()); - - nodeList->sendPacket(std::move(packet), nodeList->getMirrorSocket()); + if (message.getType() == PacketType::MicrophoneAudioNoEcho) { + mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { + mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + } else if (message.getType() == PacketType::InjectAudio) { + mirroredType = PacketType::ReplicatedInjectAudio; + } else if (message.getType() == PacketType::SilentAudioFrame) { + mirroredType = PacketType::ReplicatedSilentAudioFrame; + } else if (message.getType() == PacketType::NegotiateAudioFormat) { + mirroredType = PacketType::ReplicatedNegotiateAudioFormat; + } else { + return; } + + // construct an NLPacket to send to the replicant that has the contents of the received packet + auto packet = NLPacket::create(mirroredType, message.getSize() + NUM_BYTES_RFC4122_UUID); + packet->write(message.getSourceID().toRfc4122()); + packet->write(message.getMessage()); + + // enumerate the replicant audio mixers and send them the replicated version of this packet + nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { + return node->getType() == NodeType::ReplicantAudioMixer; + }, [&](const SharedNodePointer& node) { + nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); + }); } void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) { From d5b466e3ae36eec5ef4b2621e27d92b18ee5c3b6 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 11:31:35 -0700 Subject: [PATCH 02/23] fix replicant handling slot, add node strings for replicants --- assignment-client/src/audio/AudioMixer.cpp | 3 ++- libraries/networking/src/LimitedNodeList.cpp | 3 ++- libraries/networking/src/Node.cpp | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 1246617540..0927e02655 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -100,10 +100,11 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : PacketType::ReplicatedSilentAudioFrame, PacketType::ReplicatedNegotiateAudioFormat }, - this, "queueMirroredAudioPacket" + this, "queueReplicatedAudioPacket" ); connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); + } void AudioMixer::queueAudioPacket(QSharedPointer message, SharedNodePointer node) { diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 7b0b2d5bc5..afb0dce8af 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -744,7 +744,8 @@ void LimitedNodeList::removeSilentNodes() { SharedNodePointer node = it->second; node->getMutex().lock(); - if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { + if (node->getType() != NodeType::ReplicantAudioMixer && node->getType() != NodeType::ReplicantAvatarMixer + && (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { // call the NodeHash erase to get rid of this node it = _nodeHash.unsafe_erase(it); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index 49f032b823..a023e488c4 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -42,6 +42,8 @@ void NodeType::init() { TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer"); TypeNameHash.insert(NodeType::AssetServer, "Asset Server"); TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); + TypeNameHash.insert(NodeType::ReplicantAudioMixer, "Replicant Audio Mixer"); + TypeNameHash.insert(NodeType::ReplicantAvatarMixer, "Replicant Avatar Mixer"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); } From 2929573cb09f46018121d3815f1ea16f5f864b78 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 11:42:46 -0700 Subject: [PATCH 03/23] do not punch to replicant nodes --- libraries/networking/src/NodeList.cpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 9b7a7c91dd..5e06ad2b5c 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -712,14 +712,18 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::startNodeHolePunch(const SharedNodePointer& node) { - // connect to the correct signal on this node so we know when to ping it - connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); - // start the ping timer for this node - node->startPingTimer(); + // we don't hole punch to replicants, since it is assumed that we have a direct line to them + if (node->getType() != NodeType::ReplicantAudioMixer && node->getType() != NodeType::ReplicantAvatarMixer) { + // connect to the correct signal on this node so we know when to ping it + connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); - // ping this node immediately - pingPunchForInactiveNode(node); + // start the ping timer for this node + node->startPingTimer(); + + // ping this node immediately + pingPunchForInactiveNode(node); + } } void NodeList::handleNodePingTimeout() { @@ -1121,4 +1125,4 @@ void NodeList::setRequestsDomainListData(bool isRequesting) { void NodeList::startThread() { moveToNewNamedThread(this, "NodeList Thread", QThread::TimeCriticalPriority); -} \ No newline at end of file +} From fe668b1bb11d6ad49784abf4c90f7df00448c442 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 12:14:59 -0700 Subject: [PATCH 04/23] make codec negotiation stateless for replicated agents --- assignment-client/src/audio/AudioMixer.cpp | 9 +++-- .../src/audio/AudioMixerClientData.cpp | 33 ++++++++++++++++--- .../src/audio/AudioMixerClientData.h | 2 ++ libraries/networking/src/Node.cpp | 1 + libraries/networking/src/Node.h | 5 --- libraries/networking/src/NodeType.h | 1 + .../networking/src/udt/PacketHeaders.cpp | 3 +- libraries/networking/src/udt/PacketHeaders.h | 1 - 8 files changed, 38 insertions(+), 17 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 0927e02655..d997c3504b 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -97,8 +97,7 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : PacketType::ReplicatedMicrophoneAudioNoEcho, PacketType::ReplicatedMicrophoneAudioWithEcho, PacketType::ReplicatedInjectAudio, - PacketType::ReplicatedSilentAudioFrame, - PacketType::ReplicatedNegotiateAudioFormat + PacketType::ReplicatedSilentAudioFrame }, this, "queueReplicatedAudioPacket" ); @@ -121,10 +120,10 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); - auto node = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, - message->getSenderSockAddr(), message->getSenderSockAddr()); + auto node = nodeList->addOrUpdateNode(nodeID, NodeType::ReplicatedAgent, + message->getSenderSockAddr(), message->getSenderSockAddr(), + DEFAULT_AGENT_PERMISSIONS, true); node->setIsUpstream(true); - node->setIsMirror(true); node->setLastHeardMicrostamp(usecTimestampNow()); getOrCreateClientData(node.data())->queuePacket(message, node); diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index f28e8dfa0b..8ee7f9f383 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -73,6 +73,11 @@ void AudioMixerClientData::processPackets() { case PacketType::ReplicatedMicrophoneAudioWithEcho: case PacketType::ReplicatedInjectAudio: case PacketType::ReplicatedSilentAudioFrame: { + + if (node->getType() == NodeType::ReplicatedAgent && !_codec) { + setupCodecForReplicatedAgent(packet); + } + QMutexLocker lock(&getMutex()); parseData(*packet); @@ -81,9 +86,7 @@ void AudioMixerClientData::processPackets() { break; } case PacketType::NegotiateAudioFormat: - case PacketType::ReplicatedNegotiateAudioFormat: negotiateAudioFormat(*packet, node); - replicatePacket(*packet); break; case PacketType::RequestsDomainListData: parseRequestsDomainListData(*packet); @@ -119,15 +122,20 @@ void AudioMixerClientData::replicatePacket(ReceivedMessage& message) { mirroredType = PacketType::ReplicatedInjectAudio; } else if (message.getType() == PacketType::SilentAudioFrame) { mirroredType = PacketType::ReplicatedSilentAudioFrame; - } else if (message.getType() == PacketType::NegotiateAudioFormat) { - mirroredType = PacketType::ReplicatedNegotiateAudioFormat; } else { return; } // construct an NLPacket to send to the replicant that has the contents of the received packet auto packet = NLPacket::create(mirroredType, message.getSize() + NUM_BYTES_RFC4122_UUID); + + // since this packet will be non-sourced, we add the replicated node's ID here packet->write(message.getSourceID().toRfc4122()); + + // we won't negotiate an audio format with the replicant, because we aren't a listener + // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs + packet->writeString(_selectedCodecName); + packet->write(message.getMessage()); // enumerate the replicant audio mixers and send them the replicated version of this packet @@ -311,7 +319,12 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { || packetType == PacketType::ReplicatedMicrophoneAudioNoEcho || packetType == PacketType::ReplicatedSilentAudioFrame || packetType == PacketType::ReplicatedInjectAudio) { + + // skip past source ID for the replicated packet message.seek(NUM_BYTES_RFC4122_UUID); + + // skip past the codec string + message.readString(); } // check the overflow count before we parse data @@ -660,3 +673,15 @@ bool AudioMixerClientData::shouldIgnore(const SharedNodePointer self, const Shar return shouldIgnore; } + +void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer message) { + // first pull the codec string from the packet + + // read the string for the codec + auto codecString = message->readString(); + + qDebug() << "Manually setting codec for replicated agent" << codecString; + + const std::pair codec = AudioMixer::negotiateCodec({ codecString }); + setupCodec(codec.second, codec.first); +} diff --git a/assignment-client/src/audio/AudioMixerClientData.h b/assignment-client/src/audio/AudioMixerClientData.h index 5ab55dd48e..72761e64f2 100644 --- a/assignment-client/src/audio/AudioMixerClientData.h +++ b/assignment-client/src/audio/AudioMixerClientData.h @@ -108,6 +108,8 @@ public: bool getRequestsDomainListData() { return _requestsDomainListData; } void setRequestsDomainListData(bool requesting) { _requestsDomainListData = requesting; } + void setupCodecForReplicatedAgent(QSharedPointer message); + signals: void injectorStreamFinished(const QUuid& streamIdentifier); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index a023e488c4..a6b6bc18b2 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -44,6 +44,7 @@ void NodeType::init() { TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); TypeNameHash.insert(NodeType::ReplicantAudioMixer, "Replicant Audio Mixer"); TypeNameHash.insert(NodeType::ReplicantAvatarMixer, "Replicant Avatar Mixer"); + TypeNameHash.insert(NodeType::ReplicatedAgent, "Replicated Agent"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); } diff --git a/libraries/networking/src/Node.h b/libraries/networking/src/Node.h index 11f88b0f52..c905c9d551 100644 --- a/libraries/networking/src/Node.h +++ b/libraries/networking/src/Node.h @@ -86,9 +86,6 @@ public: bool isIgnoreRadiusEnabled() const { return _ignoreRadiusEnabled; } - bool isMirror() const { return _isMirror; } - void setIsMirror(bool isMirror) { _isMirror = isMirror; } - private: // privatize copy and assignment operator to disallow Node copying Node(const Node &otherNode); @@ -109,8 +106,6 @@ private: mutable QReadWriteLock _ignoredNodeIDSetLock; std::atomic_bool _ignoreRadiusEnabled; - - bool _isMirror { false }; }; Q_DECLARE_METATYPE(Node*) diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index 7324dcaf14..7c327e08f7 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -27,6 +27,7 @@ namespace NodeType { const NodeType_t EntityScriptServer = 'S'; const NodeType_t ReplicantAudioMixer = 'a'; const NodeType_t ReplicantAvatarMixer = 'w'; + const NodeType_t ReplicatedAgent = 'z'; const NodeType_t Unassigned = 1; void init(); diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index ebc8e80e45..f2f0062861 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -41,8 +41,7 @@ const QSet NON_SOURCED_PACKETS = QSet() << PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode << PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement << PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho - << PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame - << PacketType::ReplicatedNegotiateAudioFormat; + << PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame; PacketVersion versionForPacketType(PacketType packetType) { switch (packetType) { diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index 3ace60d502..448ae812ff 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -115,7 +115,6 @@ public: AdjustAvatarSorting, OctreeFileReplacement, CollisionEventChanges, - ReplicatedNegotiateAudioFormat, ReplicatedMicrophoneAudioNoEcho, ReplicatedMicrophoneAudioWithEcho, ReplicatedInjectAudio, From a4aa9689a6098fcf8a445f03ed878189897f56a2 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 12:51:18 -0700 Subject: [PATCH 05/23] don't construct a packet of exact size for replicated audio packets --- assignment-client/src/audio/AudioMixer.cpp | 1 - assignment-client/src/audio/AudioMixerClientData.cpp | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index d997c3504b..2f2b424948 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -103,7 +103,6 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : ); connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); - } void AudioMixer::queueAudioPacket(QSharedPointer message, SharedNodePointer node) { diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 8ee7f9f383..6ef795f57b 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -127,7 +127,7 @@ void AudioMixerClientData::replicatePacket(ReceivedMessage& message) { } // construct an NLPacket to send to the replicant that has the contents of the received packet - auto packet = NLPacket::create(mirroredType, message.getSize() + NUM_BYTES_RFC4122_UUID); + auto packet = NLPacket::create(mirroredType); // since this packet will be non-sourced, we add the replicated node's ID here packet->write(message.getSourceID().toRfc4122()); From 88af8b584252996a218014f82a946c8e61e34709 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 12:56:15 -0700 Subject: [PATCH 06/23] use new downstream/upstream nomeclature --- assignment-client/src/audio/AudioMixer.cpp | 2 +- assignment-client/src/audio/AudioMixerClientData.cpp | 4 ++-- libraries/networking/src/LimitedNodeList.cpp | 2 +- libraries/networking/src/Node.cpp | 5 ++--- libraries/networking/src/NodeList.cpp | 2 +- libraries/networking/src/NodeType.h | 5 ++--- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 2f2b424948..9fb0a96382 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -119,7 +119,7 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); - auto node = nodeList->addOrUpdateNode(nodeID, NodeType::ReplicatedAgent, + auto node = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, message->getSenderSockAddr(), message->getSenderSockAddr(), DEFAULT_AGENT_PERMISSIONS, true); node->setIsUpstream(true); diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 6ef795f57b..b95ca5b616 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -74,7 +74,7 @@ void AudioMixerClientData::processPackets() { case PacketType::ReplicatedInjectAudio: case PacketType::ReplicatedSilentAudioFrame: { - if (node->getType() == NodeType::ReplicatedAgent && !_codec) { + if (node->isUpstream() && !_codec) { setupCodecForReplicatedAgent(packet); } @@ -140,7 +140,7 @@ void AudioMixerClientData::replicatePacket(ReceivedMessage& message) { // enumerate the replicant audio mixers and send them the replicated version of this packet nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { - return node->getType() == NodeType::ReplicantAudioMixer; + return node->getType() == NodeType::DownstreamAudioMixer; }, [&](const SharedNodePointer& node) { nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); }); diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index afb0dce8af..7ca991667e 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -744,7 +744,7 @@ void LimitedNodeList::removeSilentNodes() { SharedNodePointer node = it->second; node->getMutex().lock(); - if (node->getType() != NodeType::ReplicantAudioMixer && node->getType() != NodeType::ReplicantAvatarMixer + if (node->getType() != NodeType::DownstreamAudioMixer && node->getType() != NodeType::DownstreamAvatarMixer && (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { // call the NodeHash erase to get rid of this node it = _nodeHash.unsafe_erase(it); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index a6b6bc18b2..9331487db9 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -42,9 +42,8 @@ void NodeType::init() { TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer"); TypeNameHash.insert(NodeType::AssetServer, "Asset Server"); TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); - TypeNameHash.insert(NodeType::ReplicantAudioMixer, "Replicant Audio Mixer"); - TypeNameHash.insert(NodeType::ReplicantAvatarMixer, "Replicant Avatar Mixer"); - TypeNameHash.insert(NodeType::ReplicatedAgent, "Replicated Agent"); + TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer"); + TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); } diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 5e06ad2b5c..ddd2137422 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -714,7 +714,7 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { void NodeList::startNodeHolePunch(const SharedNodePointer& node) { // we don't hole punch to replicants, since it is assumed that we have a direct line to them - if (node->getType() != NodeType::ReplicantAudioMixer && node->getType() != NodeType::ReplicantAvatarMixer) { + if (node->getType() != NodeType::DownstreamAudioMixer && node->getType() != NodeType::DownstreamAvatarMixer) { // connect to the correct signal on this node so we know when to ping it connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index 7c327e08f7..7f0d5f0636 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -25,9 +25,8 @@ namespace NodeType { const NodeType_t AssetServer = 'A'; const NodeType_t MessagesMixer = 'm'; const NodeType_t EntityScriptServer = 'S'; - const NodeType_t ReplicantAudioMixer = 'a'; - const NodeType_t ReplicantAvatarMixer = 'w'; - const NodeType_t ReplicatedAgent = 'z'; + const NodeType_t DownstreamAudioMixer = 'a'; + const NodeType_t DownstreamAvatarMixer = 'w'; const NodeType_t Unassigned = 1; void init(); From 91c25d4270646862b186d34b464eff76d17cd00e Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 13:15:02 -0700 Subject: [PATCH 07/23] don't continously set codec for upstream agent --- assignment-client/src/audio/AudioMixerClientData.cpp | 7 +++++-- assignment-client/src/audio/AudioMixerClientData.h | 2 ++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index b95ca5b616..082ac364be 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -74,7 +74,7 @@ void AudioMixerClientData::processPackets() { case PacketType::ReplicatedInjectAudio: case PacketType::ReplicatedSilentAudioFrame: { - if (node->isUpstream() && !_codec) { + if (node->isUpstream() && !_hasSetupCodecForUpstreamNode) { setupCodecForReplicatedAgent(packet); } @@ -261,7 +261,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) { avatarAudioStream->setupCodec(_codec, _selectedCodecName, AudioConstants::MONO); qDebug() << "creating new AvatarAudioStream... codec:" << _selectedCodecName; - connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, this, &AudioMixerClientData::handleMismatchAudioFormat); + connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, + this, &AudioMixerClientData::handleMismatchAudioFormat); auto emplaced = _audioStreams.emplace( QUuid(), @@ -684,4 +685,6 @@ void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer codec = AudioMixer::negotiateCodec({ codecString }); setupCodec(codec.second, codec.first); + + _hasSetupCodecForUpstreamNode = true; } diff --git a/assignment-client/src/audio/AudioMixerClientData.h b/assignment-client/src/audio/AudioMixerClientData.h index 72761e64f2..0f9e4c4c18 100644 --- a/assignment-client/src/audio/AudioMixerClientData.h +++ b/assignment-client/src/audio/AudioMixerClientData.h @@ -185,6 +185,8 @@ private: bool _shouldMuteClient { false }; bool _requestsDomainListData { false }; + + bool _hasSetupCodecForUpstreamNode { false }; }; #endif // hifi_AudioMixerClientData_h From 7ed9483467083190f715e8b348b1017f3cef563e Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 13:57:55 -0700 Subject: [PATCH 08/23] use blocking queued for addOrUpdateNode because of parenting --- assignment-client/src/audio/AudioMixer.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 9fb0a96382..1e93d0515c 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -119,13 +119,13 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); - auto node = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, - message->getSenderSockAddr(), message->getSenderSockAddr(), - DEFAULT_AGENT_PERMISSIONS, true); - node->setIsUpstream(true); - node->setLastHeardMicrostamp(usecTimestampNow()); + auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, + message->getSenderSockAddr(), message->getSenderSockAddr(), + DEFAULT_AGENT_PERMISSIONS, true); + replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); + replicatedNode->setIsUpstream(true); - getOrCreateClientData(node.data())->queuePacket(message, node); + getOrCreateClientData(replicatedNode.data())->queuePacket(message, replicatedNode); } void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer message, SharedNodePointer sendingNode) { From 1868971cfc4c66f40993676c7793ef5c69fc2667 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 14:46:53 -0700 Subject: [PATCH 09/23] fix debug for manual codec in upstream agents --- assignment-client/src/audio/AudioMixerClientData.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 082ac364be..44c51c4ae2 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -681,7 +681,8 @@ void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointerreadString(); - qDebug() << "Manually setting codec for replicated agent" << codecString; + qDebug() << "Manually setting codec for replicated agent" << uuidStringWithoutCurlyBraces(getNodeID()) + << "-" << codecString; const std::pair codec = AudioMixer::negotiateCodec({ codecString }); setupCodec(codec.second, codec.first); From 03a8d7b8c849547ed7f3be7dc5ecef4437da5d46 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 14:56:28 -0700 Subject: [PATCH 10/23] only replicate packets for agents being replicated --- .../src/audio/AudioMixerClientData.cpp | 77 +++++++++++-------- .../src/audio/AudioMixerClientData.h | 2 +- 2 files changed, 45 insertions(+), 34 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 44c51c4ae2..359c7e1b9e 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -80,8 +80,8 @@ void AudioMixerClientData::processPackets() { QMutexLocker lock(&getMutex()); parseData(*packet); - - replicatePacket(*packet); + + optionallyReplicatePacket(*packet, *node); break; } @@ -109,41 +109,52 @@ void AudioMixerClientData::processPackets() { assert(_packetQueue.empty()); } -void AudioMixerClientData::replicatePacket(ReceivedMessage& message) { - auto nodeList = DependencyManager::get(); +void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) { - PacketType mirroredType; + // first, make sure that this is a packet from a node we are supposed to replicate + if (node.isReplicated()) { + auto nodeList = DependencyManager::get(); - if (message.getType() == PacketType::MicrophoneAudioNoEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::InjectAudio) { - mirroredType = PacketType::ReplicatedInjectAudio; - } else if (message.getType() == PacketType::SilentAudioFrame) { - mirroredType = PacketType::ReplicatedSilentAudioFrame; - } else { - return; + // now make sure it's a packet type that we want to replicate + PacketType mirroredType; + + if (message.getType() == PacketType::MicrophoneAudioNoEcho) { + mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { + mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + } else if (message.getType() == PacketType::InjectAudio) { + mirroredType = PacketType::ReplicatedInjectAudio; + } else if (message.getType() == PacketType::SilentAudioFrame) { + mirroredType = PacketType::ReplicatedSilentAudioFrame; + } else { + return; + } + + std::unique_ptr packet; + + // enumerate the replicant audio mixers and send them the replicated version of this packet + nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { + return node->getType() == NodeType::DownstreamAudioMixer; + }, [&](const SharedNodePointer& node) { + // construct the packet only once, if we have any downstream audio mixers to send to + if (!packet) { + // construct an NLPacket to send to the replicant that has the contents of the received packet + packet = NLPacket::create(mirroredType); + + // since this packet will be non-sourced, we add the replicated node's ID here + packet->write(message.getSourceID().toRfc4122()); + + // we won't negotiate an audio format with the replicant, because we aren't a listener + // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs + packet->writeString(_selectedCodecName); + + packet->write(message.getMessage()); + } + + nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); + }); } - // construct an NLPacket to send to the replicant that has the contents of the received packet - auto packet = NLPacket::create(mirroredType); - - // since this packet will be non-sourced, we add the replicated node's ID here - packet->write(message.getSourceID().toRfc4122()); - - // we won't negotiate an audio format with the replicant, because we aren't a listener - // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs - packet->writeString(_selectedCodecName); - - packet->write(message.getMessage()); - - // enumerate the replicant audio mixers and send them the replicated version of this packet - nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { - return node->getType() == NodeType::DownstreamAudioMixer; - }, [&](const SharedNodePointer& node) { - nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); - }); } void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) { diff --git a/assignment-client/src/audio/AudioMixerClientData.h b/assignment-client/src/audio/AudioMixerClientData.h index 0f9e4c4c18..b397ab0b8e 100644 --- a/assignment-client/src/audio/AudioMixerClientData.h +++ b/assignment-client/src/audio/AudioMixerClientData.h @@ -126,7 +126,7 @@ private: QReadWriteLock _streamsLock; AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID - void replicatePacket(ReceivedMessage& packet); + void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node); using IgnoreZone = AABox; class IgnoreZoneMemo { From 4042e0494644cd443ce8c2adca02df0ee3cf6c9a Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 15:32:09 -0700 Subject: [PATCH 11/23] add isDownstream for cleaner conditionals --- libraries/networking/src/LimitedNodeList.cpp | 2 +- libraries/networking/src/Node.cpp | 4 ++++ libraries/networking/src/NodeList.cpp | 2 +- libraries/networking/src/NodeType.h | 1 + 4 files changed, 7 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 7ca991667e..09300cd919 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -744,7 +744,7 @@ void LimitedNodeList::removeSilentNodes() { SharedNodePointer node = it->second; node->getMutex().lock(); - if (node->getType() != NodeType::DownstreamAudioMixer && node->getType() != NodeType::DownstreamAvatarMixer + if (!NodeType::isDownstream(node->getType()) && (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) { // call the NodeHash erase to get rid of this node it = _nodeHash.unsafe_erase(it); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index 9331487db9..26e75f36f5 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -52,6 +52,10 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) { return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME; } +bool NodeType::isDownstream(NodeType_t nodeType) { + return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; +} + Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated, const QUuid& connectionSecret, QObject* parent) : diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index ddd2137422..38d8c79251 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -714,7 +714,7 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { void NodeList::startNodeHolePunch(const SharedNodePointer& node) { // we don't hole punch to replicants, since it is assumed that we have a direct line to them - if (node->getType() != NodeType::DownstreamAudioMixer && node->getType() != NodeType::DownstreamAvatarMixer) { + if (!NodeType::isDownstream(node->getType())) { // connect to the correct signal on this node so we know when to ping it connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index 7f0d5f0636..ef0aa78357 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -31,6 +31,7 @@ namespace NodeType { void init(); const QString& getNodeTypeName(NodeType_t nodeType); + bool isDownstream(NodeType_t nodeType); } typedef QSet NodeSet; From 9fa97d611a7b89c775fe3713cf6b0c3b8fac4098 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 15:36:46 -0700 Subject: [PATCH 12/23] remove invoked addOrUpdate and move node to node list thread --- assignment-client/src/audio/AudioMixer.cpp | 5 +++++ libraries/networking/src/LimitedNodeList.cpp | 3 +++ 2 files changed, 8 insertions(+) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 1e93d0515c..b4e06863db 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -120,8 +120,13 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, +<<<<<<< HEAD message->getSenderSockAddr(), message->getSenderSockAddr(), DEFAULT_AGENT_PERMISSIONS, true); +======= + message->getSenderSockAddr(), message->getSenderSockAddr()); + +>>>>>>> remove invoked addOrUpdate and move node to node list thread replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); replicatedNode->setIsUpstream(true); diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 09300cd919..0494efc7b4 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -588,6 +588,9 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t // we didn't have this node, so add them Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, isReplicated, connectionSecret); + // move the newly constructed node to the LNL thread + newNode->moveToThread(thread()); + if (nodeType == NodeType::AudioMixer) { LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer); } From 4539d615d73df45c174c8dea5733ecc5ed751ca8 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 15:41:34 -0700 Subject: [PATCH 13/23] add downstream server settings handling to ThreadedAssignment --- assignment-client/src/audio/AudioMixer.cpp | 9 ++--- libraries/networking/src/Node.cpp | 11 ++++++ libraries/networking/src/Node.h | 1 + libraries/networking/src/NodeType.h | 2 ++ .../networking/src/ThreadedAssignment.cpp | 35 +++++++++++++++++++ libraries/networking/src/ThreadedAssignment.h | 2 ++ 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index b4e06863db..f8c2cf86e8 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -120,13 +120,8 @@ void AudioMixer::queueReplicatedAudioPacket(QSharedPointer mess QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent, -<<<<<<< HEAD message->getSenderSockAddr(), message->getSenderSockAddr(), DEFAULT_AGENT_PERMISSIONS, true); -======= - message->getSenderSockAddr(), message->getSenderSockAddr()); - ->>>>>>> remove invoked addOrUpdate and move node to node list thread replicatedNode->setLastHeardMicrostamp(usecTimestampNow()); replicatedNode->setIsUpstream(true); @@ -536,7 +531,7 @@ void AudioMixer::clearDomainSettings() { _zoneReverbSettings.clear(); } -void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) { +void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) { qDebug() << "AVX2 Support:" << (cpuSupportsAVX2() ? "enabled" : "disabled"); if (settingsObject.contains(AUDIO_THREADING_GROUP_KEY)) { @@ -754,6 +749,8 @@ void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) { } } } + + parseDownstreamServers(settingsObject, NodeType::AudioMixer); } AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) { diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index 26e75f36f5..3435843814 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -56,6 +56,17 @@ bool NodeType::isDownstream(NodeType_t nodeType) { return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; } +NodeType_t NodeType::downstreamType(NodeType_t primaryType) { + switch (primaryType) { + case AudioMixer: + return DownstreamAudioMixer; + case AvatarMixer: + return DownstreamAvatarMixer; + default: + return Unassigned; + } +} + Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated, const QUuid& connectionSecret, QObject* parent) : diff --git a/libraries/networking/src/Node.h b/libraries/networking/src/Node.h index c905c9d551..fd2cf6b65b 100644 --- a/libraries/networking/src/Node.h +++ b/libraries/networking/src/Node.h @@ -37,6 +37,7 @@ class Node : public NetworkPeer { Q_OBJECT public: + Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated, const QUuid& connectionSecret = QUuid(), diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index ef0aa78357..12e74603ef 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -30,8 +30,10 @@ namespace NodeType { const NodeType_t Unassigned = 1; void init(); + const QString& getNodeTypeName(NodeType_t nodeType); bool isDownstream(NodeType_t nodeType); + NodeType_t downstreamType(NodeType_t primaryType); } typedef QSet NodeSet; diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 9db540cae2..12dc59beed 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -10,6 +10,7 @@ // #include +#include #include #include #include @@ -132,3 +133,37 @@ void ThreadedAssignment::domainSettingsRequestFailed() { qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment."; setFinished(true); } + +void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType) { + static const QString REPLICATION_GROUP_KEY = "replication"; + static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers"; + if (settingsObject.contains(REPLICATION_GROUP_KEY)) { + const QJsonObject replicationObject = settingsObject[REPLICATION_GROUP_KEY].toObject(); + + const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray(); + + auto nodeList = DependencyManager::get(); + + foreach(const QJsonValue& downstreamServerValue, downstreamServers) { + const QJsonArray downstreamServer = downstreamServerValue.toArray(); + + // make sure we have the number of values we need + if (downstreamServer.size() >= 3) { + // make sure this is a downstream server that matches our type + if (downstreamServer[2].toString() == NodeType::getNodeTypeName(nodeType)) { + // read the address and port and construct a HifiSockAddr from them + HifiSockAddr downstreamServerAddr { + downstreamServer[0].toString(), + static_cast(downstreamServer[1].toInt()) + }; + + // manually add the downstream node to our node list + nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), + downstreamServerAddr, downstreamServerAddr); + + } + } + + } + } +} diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h index e9832a2a91..f96755a776 100644 --- a/libraries/networking/src/ThreadedAssignment.h +++ b/libraries/networking/src/ThreadedAssignment.h @@ -40,6 +40,8 @@ signals: protected: void commonInit(const QString& targetName, NodeType_t nodeType); + void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType); + bool _isFinished; QTimer _domainServerTimer; QTimer _statsTimer; From 4688fe4c394fb7c734a28cda3e68b65391010a3f Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 15:54:09 -0700 Subject: [PATCH 14/23] fix comment for change from replicant to downstream --- assignment-client/src/audio/AudioMixerClientData.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 359c7e1b9e..de7bdbc780 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -132,7 +132,7 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c std::unique_ptr packet; - // enumerate the replicant audio mixers and send them the replicated version of this packet + // enumerate the downstream audio mixers and send them the replicated version of this packet nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { return node->getType() == NodeType::DownstreamAudioMixer; }, [&](const SharedNodePointer& node) { From 880157695a708671b44cccd3e32df09f9cd0079a Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 16:18:15 -0700 Subject: [PATCH 15/23] do not attempt to hole punch an upstream node --- libraries/networking/src/NodeList.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 38d8c79251..06548499e8 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -713,8 +713,10 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { void NodeList::startNodeHolePunch(const SharedNodePointer& node) { - // we don't hole punch to replicants, since it is assumed that we have a direct line to them - if (!NodeType::isDownstream(node->getType())) { + // we don't hole punch to downstream servers, since it is assumed that we have a direct line to them + // we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them + + if (!NodeType::isDownstream(node->getType()) && !node->isUpstream()) { // connect to the correct signal on this node so we know when to ping it connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout); From 63c8273a4164cb8c57fc408ee0027abcb0537655 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 16:23:57 -0700 Subject: [PATCH 16/23] do not send keep alive pings to upstream nodes --- libraries/networking/src/NodeList.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 06548499e8..73c405e3aa 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -768,8 +768,10 @@ void NodeList::stopKeepalivePingTimer() { } void NodeList::sendKeepAlivePings() { + // send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node + eachMatchingNode([this](const SharedNodePointer& node)->bool { - return _nodeTypesOfInterest.contains(node->getType()); + return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType()); }, [&](const SharedNodePointer& node) { sendPacket(constructPingPacket(), *node); }); From 682fa247450fe00bcd8130c3531433fee5916ad0 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 16:51:25 -0700 Subject: [PATCH 17/23] fix downstream server parsing from settings --- .../networking/src/ThreadedAssignment.cpp | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 12dc59beed..5aeb076c11 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -145,25 +145,25 @@ void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObjec auto nodeList = DependencyManager::get(); foreach(const QJsonValue& downstreamServerValue, downstreamServers) { - const QJsonArray downstreamServer = downstreamServerValue.toArray(); + const QJsonObject downstreamServer = downstreamServerValue.toObject(); - // make sure we have the number of values we need - if (downstreamServer.size() >= 3) { - // make sure this is a downstream server that matches our type - if (downstreamServer[2].toString() == NodeType::getNodeTypeName(nodeType)) { - // read the address and port and construct a HifiSockAddr from them - HifiSockAddr downstreamServerAddr { - downstreamServer[0].toString(), - static_cast(downstreamServer[1].toInt()) - }; + static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; + static const QString DOWNSTREAM_SERVER_PORT = "port"; + static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; - // manually add the downstream node to our node list - nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), - downstreamServerAddr, downstreamServerAddr); + // make sure we have the settings we need for this downstream server + if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT) + && downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) { + // read the address and port and construct a HifiSockAddr from them + HifiSockAddr downstreamServerAddr { + downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(), + (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt() + }; - } + // manually add the downstream node to our node list + nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), + downstreamServerAddr, downstreamServerAddr); } - } } } From 30d2e9fd237f7833efd63c78f0b4a766c36fc158 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 17:51:54 -0700 Subject: [PATCH 18/23] add unsafeEachNode to iterate nodes when read lock held elsewhere --- .../src/audio/AudioMixerClientData.cpp | 31 ++++++++++--------- libraries/networking/src/LimitedNodeList.h | 10 ++++++ 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index de7bdbc780..c51f240b54 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -113,6 +113,7 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c // first, make sure that this is a packet from a node we are supposed to replicate if (node.isReplicated()) { + auto nodeList = DependencyManager::get(); // now make sure it's a packet type that we want to replicate @@ -133,25 +134,25 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c std::unique_ptr packet; // enumerate the downstream audio mixers and send them the replicated version of this packet - nodeList->eachMatchingNode([&](const SharedNodePointer& node)->bool { - return node->getType() == NodeType::DownstreamAudioMixer; - }, [&](const SharedNodePointer& node) { - // construct the packet only once, if we have any downstream audio mixers to send to - if (!packet) { - // construct an NLPacket to send to the replicant that has the contents of the received packet - packet = NLPacket::create(mirroredType); + nodeList->unsafeEachNode([&](const SharedNodePointer& node) { + if (node->getType() == NodeType::DownstreamAudioMixer) { + // construct the packet only once, if we have any downstream audio mixers to send to + if (!packet) { + // construct an NLPacket to send to the replicant that has the contents of the received packet + packet = NLPacket::create(mirroredType); - // since this packet will be non-sourced, we add the replicated node's ID here - packet->write(message.getSourceID().toRfc4122()); + // since this packet will be non-sourced, we add the replicated node's ID here + packet->write(message.getSourceID().toRfc4122()); - // we won't negotiate an audio format with the replicant, because we aren't a listener - // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs - packet->writeString(_selectedCodecName); + // we won't negotiate an audio format with the replicant, because we aren't a listener + // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs + packet->writeString(_selectedCodecName); + + packet->write(message.getMessage()); + } - packet->write(message.getMessage()); + nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); } - - nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); }); } diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 1ace6b2e23..5e2c88ed94 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -257,6 +257,16 @@ public: return SharedNodePointer(); } + // This is unsafe because it does not take a lock + // Must only be called when you know that a read lock on the node mutex is held + // and will be held for the duration of your iteration + template + void unsafeEachNode(NodeLambda functor) { + for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) { + functor(it->second); + } + } + void putLocalPortIntoSharedMemory(const QString key, QObject* parent, quint16 localPort); bool getLocalServerPortFromSharedMemory(const QString key, quint16& localPort); From ab3a0ddba2b7c9a8ee04a5163dcbd63255dd3a48 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Mon, 12 Jun 2017 19:49:02 -0700 Subject: [PATCH 19/23] don't emit codec mismatch upstream, handle replicated silent frames --- libraries/audio/src/InboundAudioStream.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libraries/audio/src/InboundAudioStream.cpp b/libraries/audio/src/InboundAudioStream.cpp index 88ec7e7bc0..65cccf1fe0 100644 --- a/libraries/audio/src/InboundAudioStream.cpp +++ b/libraries/audio/src/InboundAudioStream.cpp @@ -147,7 +147,8 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { } case SequenceNumberStats::OnTime: { // Packet is on time; parse its data to the ringbuffer - if (message.getType() == PacketType::SilentAudioFrame) { + if (message.getType() == PacketType::SilentAudioFrame + || message.getType() == PacketType::ReplicatedSilentAudioFrame) { // If we recieved a SilentAudioFrame from our sender, we might want to drop // some of the samples in order to catch up to our desired jitter buffer size. writeDroppableSilentFrames(networkFrames); @@ -168,7 +169,10 @@ int InboundAudioStream::parseData(ReceivedMessage& message) { // inform others of the mismatch auto sendingNode = DependencyManager::get()->nodeWithUUID(message.getSourceID()); - emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket); + if (sendingNode) { + emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket); + } + } } break; From aa9574fc5a0316e0ca06e86d65cac0539ccdcd07 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 13 Jun 2017 09:46:12 -0700 Subject: [PATCH 20/23] add re-replication support to audio mixer --- .../src/audio/AudioMixerClientData.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index c51f240b54..d0e749563a 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -119,6 +119,23 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c // now make sure it's a packet type that we want to replicate PacketType mirroredType; + switch (message.getType()) { + case PacketType::MicrophoneAudioNoEcho: + case PacketType::ReplicatedMicrophoneAudioNoEcho: + mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + case PacketType::MicrophoneAudioWithEcho: + case PacketType::ReplicatedMicrophoneAudioWithEcho: + mirroredType = PacketType::ReplicatedMicrophoneAudioWithEcho; + case PacketType::InjectAudio: + case PacketType::ReplicatedInjectAudio: + mirroredType = PacketType::ReplicatedInjectAudio; + case PacketType::SilentAudioFrame: + case PacketType::ReplicatedSilentAudioFrame: + mirroredType = PacketType::ReplicatedSilentAudioFrame; + default: + return; + } + if (message.getType() == PacketType::MicrophoneAudioNoEcho) { mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { @@ -142,7 +159,7 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c packet = NLPacket::create(mirroredType); // since this packet will be non-sourced, we add the replicated node's ID here - packet->write(message.getSourceID().toRfc4122()); + packet->write(node->getUUID().toRfc4122()); // we won't negotiate an audio format with the replicant, because we aren't a listener // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs From eca35ce01300d0b55a010bf529a5e66211274bda Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 13 Jun 2017 11:18:59 -0700 Subject: [PATCH 21/23] fix double check for packet types --- .../src/audio/AudioMixerClientData.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index d0e749563a..e7b45b2d4e 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -123,31 +123,23 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c case PacketType::MicrophoneAudioNoEcho: case PacketType::ReplicatedMicrophoneAudioNoEcho: mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; + break; case PacketType::MicrophoneAudioWithEcho: case PacketType::ReplicatedMicrophoneAudioWithEcho: mirroredType = PacketType::ReplicatedMicrophoneAudioWithEcho; + break; case PacketType::InjectAudio: case PacketType::ReplicatedInjectAudio: mirroredType = PacketType::ReplicatedInjectAudio; + break; case PacketType::SilentAudioFrame: case PacketType::ReplicatedSilentAudioFrame: mirroredType = PacketType::ReplicatedSilentAudioFrame; + break; default: return; } - if (message.getType() == PacketType::MicrophoneAudioNoEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::MicrophoneAudioWithEcho) { - mirroredType = PacketType::ReplicatedMicrophoneAudioNoEcho; - } else if (message.getType() == PacketType::InjectAudio) { - mirroredType = PacketType::ReplicatedInjectAudio; - } else if (message.getType() == PacketType::SilentAudioFrame) { - mirroredType = PacketType::ReplicatedSilentAudioFrame; - } else { - return; - } - std::unique_ptr packet; // enumerate the downstream audio mixers and send them the replicated version of this packet From c1bbb2a084d4f4aa11764ee1f8b4471d85c8e3bc Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 13 Jun 2017 11:24:05 -0700 Subject: [PATCH 22/23] fix incorrect UUID in replicated packets --- assignment-client/src/audio/AudioMixerClientData.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index e7b45b2d4e..3942c3177b 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -143,15 +143,15 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c std::unique_ptr packet; // enumerate the downstream audio mixers and send them the replicated version of this packet - nodeList->unsafeEachNode([&](const SharedNodePointer& node) { - if (node->getType() == NodeType::DownstreamAudioMixer) { + nodeList->unsafeEachNode([&](const SharedNodePointer& downstreamNode) { + if (downstreamNode->getType() == NodeType::DownstreamAudioMixer) { // construct the packet only once, if we have any downstream audio mixers to send to if (!packet) { // construct an NLPacket to send to the replicant that has the contents of the received packet packet = NLPacket::create(mirroredType); // since this packet will be non-sourced, we add the replicated node's ID here - packet->write(node->getUUID().toRfc4122()); + packet->write(node.getUUID().toRfc4122()); // we won't negotiate an audio format with the replicant, because we aren't a listener // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs @@ -160,7 +160,7 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c packet->write(message.getMessage()); } - nodeList->sendUnreliablePacket(*packet, node->getPublicSocket()); + nodeList->sendUnreliablePacket(*packet, downstreamNode->getPublicSocket()); } }); } From 3f3cc89b8d0a9408fc94fc2f0bb4e7ba978eae44 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Tue, 13 Jun 2017 11:58:08 -0700 Subject: [PATCH 23/23] fix for header of re-replicated packets --- .../src/audio/AudioMixerClientData.cpp | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index 3942c3177b..848dc969d8 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -109,6 +109,13 @@ void AudioMixerClientData::processPackets() { assert(_packetQueue.empty()); } +bool isReplicatedPacket(PacketType packetType) { + return packetType == PacketType::ReplicatedMicrophoneAudioNoEcho + || packetType == PacketType::ReplicatedMicrophoneAudioWithEcho + || packetType == PacketType::ReplicatedInjectAudio + || packetType == PacketType::ReplicatedSilentAudioFrame; +} + void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) { // first, make sure that this is a packet from a node we are supposed to replicate @@ -126,7 +133,7 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c break; case PacketType::MicrophoneAudioWithEcho: case PacketType::ReplicatedMicrophoneAudioWithEcho: - mirroredType = PacketType::ReplicatedMicrophoneAudioWithEcho; + mirroredType = PacketType::ReplicatedMicrophoneAudioWithEcho; break; case PacketType::InjectAudio: case PacketType::ReplicatedInjectAudio: @@ -150,12 +157,14 @@ void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, c // construct an NLPacket to send to the replicant that has the contents of the received packet packet = NLPacket::create(mirroredType); - // since this packet will be non-sourced, we add the replicated node's ID here - packet->write(node.getUUID().toRfc4122()); + if (!isReplicatedPacket(message.getType())) { + // since this packet will be non-sourced, we add the replicated node's ID here + packet->write(node.getUUID().toRfc4122()); - // we won't negotiate an audio format with the replicant, because we aren't a listener - // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs - packet->writeString(_selectedCodecName); + // we won't negotiate an audio format with the replicant, because we aren't a listener + // so pack the codec string here so that it can statelessly setup a decoder for this string when it needs + packet->writeString(_selectedCodecName); + } packet->write(message.getMessage()); }