diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index ab4b20b9cc..5e0cd740e6 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -103,11 +103,6 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : ); connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); - connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) { - if (node->getType() == NodeType::DownstreamAudioMixer) { - node->activatePublicSocket(); - } - }); } void AudioMixer::queueAudioPacket(QSharedPointer message, SharedNodePointer node) { @@ -389,7 +384,10 @@ void AudioMixer::start() { auto nodeList = DependencyManager::get(); // prepare the NodeList - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ + NodeType::Agent, NodeType::EntityScriptServer, + NodeType::UpstreamAudioMixer, NodeType::DownstreamAudioMixer + }); nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); }; // parse out any AudioMixer settings diff --git a/assignment-client/src/audio/AudioMixerClientData.cpp b/assignment-client/src/audio/AudioMixerClientData.cpp index bf19a02d9a..0467b82d40 100644 --- a/assignment-client/src/audio/AudioMixerClientData.cpp +++ b/assignment-client/src/audio/AudioMixerClientData.cpp @@ -67,7 +67,6 @@ void AudioMixerClientData::processPackets() { case PacketType::MicrophoneAudioNoEcho: case PacketType::MicrophoneAudioWithEcho: case PacketType::InjectAudio: - case PacketType::AudioStreamStats: case PacketType::SilentAudioFrame: case PacketType::ReplicatedMicrophoneAudioNoEcho: case PacketType::ReplicatedMicrophoneAudioWithEcho: @@ -80,11 +79,17 @@ void AudioMixerClientData::processPackets() { QMutexLocker lock(&getMutex()); parseData(*packet); - + optionallyReplicatePacket(*packet, *node); break; } + case PacketType::AudioStreamStats: { + QMutexLocker lock(&getMutex()); + parseData(*packet); + + break; + } case PacketType::NegotiateAudioFormat: negotiateAudioFormat(*packet, node); break; diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 28ede7a77d..40cc3a1c0e 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -66,7 +66,6 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) : connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) { if (node->getType() == NodeType::DownstreamAvatarMixer) { getOrCreateClientData(node); - node->activatePublicSocket(); } }); } @@ -165,10 +164,6 @@ void AvatarMixer::queueIncomingPacket(QSharedPointer message, S _queueIncomingPacketElapsedTime += (end - start); } - -AvatarMixer::~AvatarMixer() { -} - void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) { if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) { QByteArray individualData = nodeData->getAvatar().identityByteArray(); @@ -862,7 +857,10 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node void AvatarMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ + NodeType::Agent, NodeType::EntityScriptServer, + NodeType::UpstreamAvatarMixer, NodeType::DownstreamAvatarMixer + }); // parse the settings to pull out the values we need parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); diff --git a/assignment-client/src/avatars/AvatarMixer.h b/assignment-client/src/avatars/AvatarMixer.h index 58d4487652..ecff7597f6 100644 --- a/assignment-client/src/avatars/AvatarMixer.h +++ b/assignment-client/src/avatars/AvatarMixer.h @@ -28,7 +28,6 @@ class AvatarMixer : public ThreadedAssignment { Q_OBJECT public: AvatarMixer(ReceivedMessage& message); - ~AvatarMixer(); public slots: /// runs the avatar mixer void run() override; diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 8092518454..b2c6b6a341 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -83,9 +83,9 @@ int AvatarMixerSlave::sendReplicatedIdentityPacket(const AvatarMixerClientData* if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) { QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray(); individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious - auto identityPacket = NLPacket::create(PacketType::ReplicatedAvatarIdentity); + auto identityPacket = NLPacketList::create(PacketType::ReplicatedAvatarIdentity, QByteArray(), true, true); identityPacket->write(individualData); - DependencyManager::get()->sendUnreliablePacket(*identityPacket, *destinationNode); + DependencyManager::get()->sendPacketList(std::move(identityPacket), *destinationNode); _stats.numIdentityPackets++; return individualData.size(); } else { diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index c7dc97f595..bc67a31c02 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1339,6 +1339,7 @@ { "name": "broadcasting", "label": "Broadcasting", + "restart": false, "settings": [ { "name": "users", @@ -1395,6 +1396,46 @@ ] } ] + }, + { + "name": "upstream_servers", + "label": "Broadcasting Servers", + "assignment-types": [0,1], + "type": "table", + "advanced": true, + "can_add_new_rows": true, + "help": "Servers that broadcast data to this domain", + "numbered": false, + "columns": [ + { + "name": "address", + "label": "Address", + "can_set": true + }, + { + "name": "port", + "label": "Port", + "can_set": true + }, + { + "name": "server_type", + "label": "Server Type", + "type": "select", + "placeholder": "Audio Mixer", + "default": "Audio Mixer", + "can_set": true, + "options": [ + { + "value": "Audio Mixer", + "label": "Audio Mixer" + }, + { + "value": "Avatar Mixer", + "label": "Avatar Mixer" + } + ] + } + ] } ] } diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 7c14d2e924..f842b251d4 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -121,6 +121,8 @@ DomainServer::DomainServer(int argc, char* argv[]) : this, &DomainServer::updateReplicatedNodes); connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, this, &DomainServer::updateDownstreamNodes); + connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, + this, &DomainServer::updateUpstreamNodes); setupGroupCacheRefresh(); @@ -135,6 +137,7 @@ DomainServer::DomainServer(int argc, char* argv[]) : updateReplicatedNodes(); updateDownstreamNodes(); + updateUpstreamNodes(); if (_type != NonMetaverse) { // if we have a metaverse domain, we'll use an access token for API calls @@ -2229,53 +2232,84 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer& static const QString BROADCASTING_SETTINGS_KEY = "broadcasting"; -void DomainServer::updateDownstreamNodes() { +struct ReplicationServerInfo { + NodeType_t nodeType; + HifiSockAddr sockAddr; +}; + +ReplicationServerInfo serverInformationFromSettings(QVariantMap serverMap, ReplicationServerDirection direction) { + static const QString REPLICATION_SERVER_ADDRESS = "address"; + static const QString REPLICATION_SERVER_PORT = "port"; + static const QString REPLICATION_SERVER_TYPE = "server_type"; + + if (serverMap.contains(REPLICATION_SERVER_ADDRESS) && serverMap.contains(REPLICATION_SERVER_PORT) + && serverMap.contains(REPLICATION_SERVER_TYPE)) { + + auto nodeType = NodeType::fromString(serverMap[REPLICATION_SERVER_TYPE].toString()); + + ReplicationServerInfo serverInfo; + + if (direction == Upstream) { + serverInfo.nodeType = NodeType::upstreamType(nodeType); + } else if (direction == Downstream) { + serverInfo.nodeType = NodeType::downstreamType(nodeType); + } + + // read the address and port and construct a HifiSockAddr from them + serverInfo.sockAddr = { + serverMap[REPLICATION_SERVER_ADDRESS].toString(), + (quint16) serverMap[REPLICATION_SERVER_PORT].toString().toInt() + }; + + return serverInfo; + } + + return { NodeType::Unassigned, HifiSockAddr() }; +} + +void DomainServer::updateReplicationNodes(ReplicationServerDirection direction) { auto settings = _settingsManager.getSettingsMap(); + if (settings.contains(BROADCASTING_SETTINGS_KEY)) { auto nodeList = DependencyManager::get(); - std::vector downstreamNodesInSettings; - auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap(); - if (replicationSettings.contains("downstream_servers")) { - auto serversSettings = replicationSettings.value("downstream_servers").toList(); + std::vector replicationNodesInSettings; - std::vector knownDownstreamNodes; + auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap(); + + QString serversKey = direction == Upstream ? "upstream_servers" : "downstream_servers"; + QString replicationDirection = direction == Upstream ? "upstream" : "downstream"; + + if (replicationSettings.contains(serversKey)) { + auto serversSettings = replicationSettings.value(serversKey).toList(); + + std::vector knownReplicationNodes; nodeList->eachNode([&](const SharedNodePointer& otherNode) { - if (NodeType::isDownstream(otherNode->getType())) { - knownDownstreamNodes.push_back(otherNode->getPublicSocket()); + if ((direction == Upstream && NodeType::isUpstream(otherNode->getType())) + || (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) { + knownReplicationNodes.push_back(otherNode->getPublicSocket()); } }); for (auto& server : serversSettings) { - auto downstreamServer = server.toMap(); + auto replicationServer = serverInformationFromSettings(server.toMap(), direction); - static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; - static const QString DOWNSTREAM_SERVER_PORT = "port"; - static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; + if (!replicationServer.sockAddr.isNull() && replicationServer.nodeType != NodeType::Unassigned) { + // make sure we have the settings we need for this replication server + replicationNodesInSettings.push_back(replicationServer.sockAddr); - // make sure we have the settings we need for this downstream server - if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) { - - auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString()); - auto downstreamNodeType = NodeType::downstreamType(nodeType); - - // read the address and port and construct a HifiSockAddr from them - HifiSockAddr downstreamServerAddr { - downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(), - (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt() - }; - downstreamNodesInSettings.push_back(downstreamServerAddr); - - bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(), - downstreamServerAddr) != knownDownstreamNodes.cend(); + bool knownNode = find(knownReplicationNodes.cbegin(), knownReplicationNodes.cend(), + replicationServer.sockAddr) != knownReplicationNodes.cend(); if (!knownNode) { - // manually add the downstream node to our node list - auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType, - downstreamServerAddr, downstreamServerAddr); + // manually add the replication node to our node list + auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), replicationServer.nodeType, + replicationServer.sockAddr, replicationServer.sockAddr, + false, direction == Upstream); node->setIsForcedNeverSilent(true); - qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr; + qDebug() << "Adding" << (direction == Upstream ? "upstream" : "downstream") + << "node:" << node->getUUID() << replicationServer.sockAddr; - // manually activate the public socket for the downstream node + // manually activate the public socket for the replication node node->activatePublicSocket(); } } @@ -2288,11 +2322,13 @@ void DomainServer::updateDownstreamNodes() { // we cannot recursively take the write lock required by handleKillNode) std::vector nodesToKill; nodeList->eachNode([&](const SharedNodePointer& otherNode) { - if (NodeType::isDownstream(otherNode->getType())) { - bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(), - otherNode->getPublicSocket()) != downstreamNodesInSettings.cend(); + if ((direction == Upstream && NodeType::isUpstream(otherNode->getType())) + || (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) { + bool nodeInSettings = find(replicationNodesInSettings.cbegin(), replicationNodesInSettings.cend(), + otherNode->getPublicSocket()) != replicationNodesInSettings.cend(); if (!nodeInSettings) { - qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket(); + qDebug() << "Removing" << replicationDirection + << "node:" << otherNode->getUUID() << otherNode->getPublicSocket(); nodesToKill.push_back(otherNode); } } @@ -2304,6 +2340,14 @@ void DomainServer::updateDownstreamNodes() { } } +void DomainServer::updateDownstreamNodes() { + updateReplicationNodes(Downstream); +} + +void DomainServer::updateUpstreamNodes() { + updateReplicationNodes(Upstream); +} + void DomainServer::updateReplicatedNodes() { // Make sure we have downstream nodes in our list auto settings = _settingsManager.getSettingsMap(); diff --git a/domain-server/src/DomainServer.h b/domain-server/src/DomainServer.h index 7e43397e9c..8851e3380b 100644 --- a/domain-server/src/DomainServer.h +++ b/domain-server/src/DomainServer.h @@ -39,6 +39,11 @@ typedef QMultiHash TransactionHash; using Subnet = QPair; using SubnetList = std::vector; +enum ReplicationServerDirection { + Upstream, + Downstream +}; + class DomainServer : public QCoreApplication, public HTTPSRequestHandler { Q_OBJECT public: @@ -104,6 +109,7 @@ private slots: void updateReplicatedNodes(); void updateDownstreamNodes(); + void updateUpstreamNodes(); signals: void iceServerChanged(); @@ -170,6 +176,8 @@ private: QString pathForRedirect(QString path = QString()) const; + void updateReplicationNodes(ReplicationServerDirection direction); + SubnetList _acSubnetWhitelist; std::vector _replicatedUsernames; diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index d2bae28820..eab0e5e41f 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -257,8 +257,40 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe PacketType headerType = NLPacket::typeInHeader(packet); if (NON_SOURCED_PACKETS.contains(headerType)) { - emit dataReceived(NodeType::Unassigned, packet.getPayloadSize()); - return true; + if (REPLICATED_PACKET_MAPPING.key(headerType) != PacketType::Unknown) { + // this is a replicated packet type - make sure the socket that sent it to us matches + // one from one of our current upstream nodes + + NodeType_t sendingNodeType { NodeType::Unassigned }; + + eachNodeBreakable([&packet, &sendingNodeType](const SharedNodePointer& node){ + if (NodeType::isUpstream(node->getType()) && node->getPublicSocket() == packet.getSenderSockAddr()) { + sendingNodeType = node->getType(); + return false; + } else { + return true; + } + }); + + if (sendingNodeType != NodeType::Unassigned) { + emit dataReceived(sendingNodeType, packet.getPayloadSize()); + return true; + } else { + static const QString UNSOLICITED_REPLICATED_REGEX = + "Replicated packet of type \\d+ \\([\\sa-zA-Z:]+\\) received from unknown upstream"; + static QString repeatedMessage + = LogHandler::getInstance().addRepeatedMessageRegex(UNSOLICITED_REPLICATED_REGEX); + + qCDebug(networking) << "Replicated packet of type" << headerType + << "received from unknown upstream" << packet.getSenderSockAddr(); + + return false; + } + + } else { + emit dataReceived(NodeType::Unassigned, packet.getPayloadSize()); + return true; + } } else { QUuid sourceID = NLPacket::sourceIDInHeader(packet); @@ -583,14 +615,14 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t matchingNode->setPermissions(permissions); matchingNode->setConnectionSecret(connectionSecret); matchingNode->setIsReplicated(isReplicated); - matchingNode->setIsUpstream(isUpstream); + matchingNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType)); return matchingNode; } else { // we didn't have this node, so add them Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); newNode->setIsReplicated(isReplicated); - newNode->setIsUpstream(isUpstream); + newNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType)); newNode->setConnectionSecret(connectionSecret); newNode->setPermissions(permissions); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index ea1b6e0eb5..d7f8f404e6 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -42,6 +42,8 @@ void NodeType::init() { TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer"); TypeNameHash.insert(NodeType::AssetServer, "Asset Server"); TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); + TypeNameHash.insert(NodeType::UpstreamAudioMixer, "Upstream Audio Mixer"); + TypeNameHash.insert(NodeType::UpstreamAvatarMixer, "Upstream Avatar Mixer"); TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer"); TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); @@ -52,8 +54,23 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) { return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME; } +bool NodeType::isUpstream(NodeType_t nodeType) { + return nodeType == NodeType::UpstreamAudioMixer || nodeType == NodeType::UpstreamAvatarMixer; +} + bool NodeType::isDownstream(NodeType_t nodeType) { - return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; + return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; +} + +NodeType_t NodeType::upstreamType(NodeType_t primaryType) { + switch (primaryType) { + case AudioMixer: + return UpstreamAudioMixer; + case AvatarMixer: + return UpstreamAvatarMixer; + default: + return Unassigned; + } } NodeType_t NodeType::downstreamType(NodeType_t primaryType) { diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index f502cb2a3c..e8506e5263 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -669,9 +669,11 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, nodeLocalSocket, isReplicated, false, connectionUUID, permissions); - // nodes that are downstream of our own type are kept alive when we hear about them from the domain server - if (node->getType() == NodeType::downstreamType(_ownerType)) { + // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server + // and always have their public socket as their active socket + if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) { node->setLastHeardMicrostamp(usecTimestampNow()); + node->activatePublicSocket(); } } diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index dacd5e1a68..0130e92cfc 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -25,6 +25,8 @@ namespace NodeType { const NodeType_t AssetServer = 'A'; const NodeType_t MessagesMixer = 'm'; const NodeType_t EntityScriptServer = 'S'; + const NodeType_t UpstreamAudioMixer = 'B'; + const NodeType_t UpstreamAvatarMixer = 'C'; const NodeType_t DownstreamAudioMixer = 'a'; const NodeType_t DownstreamAvatarMixer = 'w'; const NodeType_t Unassigned = 1; @@ -32,9 +34,12 @@ namespace NodeType { void init(); const QString& getNodeTypeName(NodeType_t nodeType); + bool isUpstream(NodeType_t nodeType); bool isDownstream(NodeType_t nodeType); + NodeType_t upstreamType(NodeType_t primaryType); NodeType_t downstreamType(NodeType_t primaryType); + NodeType_t fromString(QString type); } diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index fb416cac4d..ad15d04db1 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -51,6 +51,7 @@ const QHash REPLICATED_PACKET_MAPPING { { PacketType::SilentAudioFrame, PacketType::ReplicatedSilentAudioFrame }, { PacketType::AvatarIdentity, PacketType::ReplicatedAvatarIdentity }, { PacketType::KillAvatar, PacketType::ReplicatedKillAvatar }, + { PacketType::BulkAvatarData, PacketType::ReplicatedBulkAvatarData } }; PacketVersion versionForPacketType(PacketType packetType) {