diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index a98c172ef3..e7323bb272 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -103,6 +103,11 @@ AudioMixer::AudioMixer(ReceivedMessage& message) : ); connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled); + connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) { + if (node->getType() == NodeType::DownstreamAudioMixer) { + node->activatePublicSocket(); + } + }); } void AudioMixer::queueAudioPacket(QSharedPointer message, SharedNodePointer node) { @@ -391,7 +396,7 @@ void AudioMixer::start() { auto nodeList = DependencyManager::get(); // prepare the NodeList - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer }); nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); }; // parse out any AudioMixer settings @@ -768,8 +773,6 @@ void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) { } } } - - parseDownstreamServers(settingsObject, NodeType::AudioMixer); } AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) { diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 337faf2027..4b85e6b667 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -63,6 +63,12 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) : auto nodeList = DependencyManager::get(); connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch); + connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) { + if (node->getType() == NodeType::DownstreamAvatarMixer) { + getOrCreateClientData(node); + node->activatePublicSocket(); + } + }); } SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) { @@ -805,7 +811,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node void AvatarMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); - nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer }); // parse the settings to pull out the values we need parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); @@ -877,11 +883,4 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale << "and a maximum avatar scale of" << _domainMaximumScale; - - - parseDownstreamServers(domainSettings, NodeType::AvatarMixer, [](Node& node) { - if (!node.getLinkedData()) { - node.setLinkedData(std::unique_ptr { new AvatarMixerClientData(node.getUUID()) }); - } - }); } diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index dd9f46224b..398b28f7a6 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -444,6 +444,10 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin _stats.downstreamMixersBroadcastedTo++; AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + if (!nodeData) { + qDebug() << "No node data"; + return; + } // setup a PacketList for the replicated bulk avatar data auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData); diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 5643a4434a..24ae106b90 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -119,6 +119,8 @@ DomainServer::DomainServer(int argc, char* argv[]) : &_gatekeeper, &DomainGatekeeper::updateNodePermissions); connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, this, &DomainServer::updateReplicatedNodes); + connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, + this, &DomainServer::updateDownstreamNodes); setupGroupCacheRefresh(); @@ -132,6 +134,7 @@ DomainServer::DomainServer(int argc, char* argv[]) : setupNodeListAndAssignments(); updateReplicatedNodes(); + updateDownstreamNodes(); if (_type != NonMetaverse) { // if we have a metaverse domain, we'll use an access token for API calls @@ -2219,14 +2222,90 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer& _unfulfilledAssignments.enqueue(assignment); } -void DomainServer::updateReplicatedNodes() { - static const QString REPLICATION_SETTINGS_KEY = "replication"; - _replicatedUsernames.clear(); +static const QString REPLICATION_SETTINGS_KEY = "replication"; + +void DomainServer::updateDownstreamNodes() { auto settings = _settingsManager.getSettingsMap(); if (settings.contains(REPLICATION_SETTINGS_KEY)) { auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap(); - if (replicationSettings.contains("users")) { - auto usersSettings = replicationSettings.value("users").toList(); + if (replicationSettings.contains("downstream_servers")) { + auto serversSettings = replicationSettings.value("downstream_servers").toList(); + + auto nodeList = DependencyManager::get(); + std::vector knownDownstreamNodes; + nodeList->eachNode([&](const SharedNodePointer& otherNode) { + if (NodeType::isDownstream(otherNode->getType())) { + knownDownstreamNodes.push_back(otherNode->getPublicSocket()); + } + }); + + std::vector downstreamNodesInSettings; + + for (auto& server : serversSettings) { + auto downstreamServer = server.toMap(); + + static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; + static const QString DOWNSTREAM_SERVER_PORT = "port"; + static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; + + // make sure we have the settings we need for this downstream server + if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) { + + auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString()); + auto downstreamNodeType = NodeType::downstreamType(nodeType); + + // read the address and port and construct a HifiSockAddr from them + HifiSockAddr downstreamServerAddr { + downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(), + (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt() + }; + downstreamNodesInSettings.push_back(downstreamServerAddr); + + bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(), + downstreamServerAddr) != knownDownstreamNodes.cend(); + if (!knownNode) { + // manually add the downstream node to our node list + auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType, + downstreamServerAddr, downstreamServerAddr); + + qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr; + + // manually activate the public socket for the downstream node + node->activatePublicSocket(); + } + } + + } + std::vector nodesToKill; + nodeList->eachNode([&](const SharedNodePointer& otherNode) { + if (NodeType::isDownstream(otherNode->getType())) { + bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(), + otherNode->getPublicSocket()) != downstreamNodesInSettings.cend(); + if (!nodeInSettings) { + qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket(); + nodesToKill.push_back(otherNode); + } + } + }); + for (auto& node : nodesToKill) { + handleKillNode(node); + } + } + } +} + +void DomainServer::updateReplicatedNodes() { + // Make sure we have downstream nodes in our list + // TODO Move this to a different function + _replicatedUsernames.clear(); + auto settings = _settingsManager.getSettingsMap(); + + static const QString REPLICATED_USERS_KEY = "users"; + _replicatedUsernames.clear(); + if (settings.contains(REPLICATION_SETTINGS_KEY)) { + auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap(); + if (replicationSettings.contains(REPLICATED_USERS_KEY)) { + auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList(); for (auto& username : usersSettings) { _replicatedUsernames.push_back(username.toString().toLower()); } @@ -2239,11 +2318,10 @@ void DomainServer::updateReplicatedNodes() { }, [this](const SharedNodePointer& otherNode) { auto shouldReplicate = shouldReplicateNode(*otherNode); auto isReplicated = otherNode->isReplicated(); - qDebug() << "Checking " << otherNode->getPermissions().getVerifiedUserName(); if (isReplicated && !shouldReplicate) { - qDebug() << "Setting node to NOT be replicated: " << otherNode->getUUID(); + qDebug() << "Setting node to NOT be replicated:" << otherNode->getUUID(); } else if (!isReplicated && shouldReplicate) { - qDebug() << "Setting node to replicated: " << otherNode->getUUID(); + qDebug() << "Setting node to replicated:" << otherNode->getUUID(); } otherNode->setIsReplicated(shouldReplicate); } diff --git a/domain-server/src/DomainServer.h b/domain-server/src/DomainServer.h index 92df7a88e0..7e43397e9c 100644 --- a/domain-server/src/DomainServer.h +++ b/domain-server/src/DomainServer.h @@ -103,6 +103,7 @@ private slots: void handleOctreeFileReplacement(QByteArray octreeFile); void updateReplicatedNodes(); + void updateDownstreamNodes(); signals: void iceServerChanged(); diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index 3435843814..f74be8adcd 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -67,6 +67,11 @@ NodeType_t NodeType::downstreamType(NodeType_t primaryType) { } } +NodeType_t NodeType::fromString(QString type) { + return TypeNameHash.key(type, NodeType::Unassigned); +} + + Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated, const QUuid& connectionSecret, QObject* parent) : diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 73c405e3aa..80576f473a 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -771,7 +771,8 @@ void NodeList::sendKeepAlivePings() { // send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node eachMatchingNode([this](const SharedNodePointer& node)->bool { - return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType()); + auto type = node->getType(); + return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type); }, [&](const SharedNodePointer& node) { sendPacket(constructPingPacket(), *node); }); diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h index 12e74603ef..dacd5e1a68 100644 --- a/libraries/networking/src/NodeType.h +++ b/libraries/networking/src/NodeType.h @@ -34,6 +34,8 @@ namespace NodeType { const QString& getNodeTypeName(NodeType_t nodeType); bool isDownstream(NodeType_t nodeType); NodeType_t downstreamType(NodeType_t primaryType); + + NodeType_t fromString(QString type); } typedef QSet NodeSet; diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp index 3e679f643a..18e4593c91 100644 --- a/libraries/networking/src/ThreadedAssignment.cpp +++ b/libraries/networking/src/ThreadedAssignment.cpp @@ -133,42 +133,3 @@ void ThreadedAssignment::domainSettingsRequestFailed() { qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment."; setFinished(true); } - -void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, DownstreamNodeFoundCallback callback) { - static const QString REPLICATION_GROUP_KEY = "replication"; - static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers"; - if (settingsObject.contains(REPLICATION_GROUP_KEY)) { - const QJsonObject replicationObject = settingsObject[REPLICATION_GROUP_KEY].toObject(); - - const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray(); - - auto nodeList = DependencyManager::get(); - - foreach(const QJsonValue& downstreamServerValue, downstreamServers) { - const QJsonObject downstreamServer = downstreamServerValue.toObject(); - - static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; - static const QString DOWNSTREAM_SERVER_PORT = "port"; - static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; - - // make sure we have the settings we need for this downstream server - if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT) - && downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) { - // read the address and port and construct a HifiSockAddr from them - HifiSockAddr downstreamServerAddr { - downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(), - (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt() - }; - - // manually add the downstream node to our node list - auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType), - downstreamServerAddr, downstreamServerAddr); - - // manually activate the public socket for the downstream node - node->activatePublicSocket(); - - callback(*node); - } - } - } -} diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h index 0cc7b2f40c..8b35acaac5 100644 --- a/libraries/networking/src/ThreadedAssignment.h +++ b/libraries/networking/src/ThreadedAssignment.h @@ -42,8 +42,6 @@ signals: protected: void commonInit(const QString& targetName, NodeType_t nodeType); - void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, - DownstreamNodeFoundCallback callback = [](Node& downstreamNode) {}); bool _isFinished; QTimer _domainServerTimer;