From 785156ad9fbb462ecea2954f5d1769c88c258fcf Mon Sep 17 00:00:00 2001
From: Ryan Huffman <ryanhuffman@gmail.com>
Date: Wed, 14 Jun 2017 16:54:04 -0700
Subject: [PATCH] Move downstram server adding to DS

---
 assignment-client/src/audio/AudioMixer.cpp    |  9 +-
 assignment-client/src/avatars/AvatarMixer.cpp | 15 ++-
 .../src/avatars/AvatarMixerSlave.cpp          |  4 +
 domain-server/src/DomainServer.cpp            | 94 +++++++++++++++++--
 domain-server/src/DomainServer.h              |  1 +
 libraries/networking/src/Node.cpp             |  5 +
 libraries/networking/src/NodeList.cpp         |  3 +-
 libraries/networking/src/NodeType.h           |  2 +
 .../networking/src/ThreadedAssignment.cpp     | 39 --------
 libraries/networking/src/ThreadedAssignment.h |  2 -
 10 files changed, 113 insertions(+), 61 deletions(-)

diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp
index a98c172ef3..e7323bb272 100644
--- a/assignment-client/src/audio/AudioMixer.cpp
+++ b/assignment-client/src/audio/AudioMixer.cpp
@@ -103,6 +103,11 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
     );
 
     connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
+    connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) {
+        if (node->getType() == NodeType::DownstreamAudioMixer) {
+            node->activatePublicSocket();
+        }
+    });
 }
 
 void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
@@ -391,7 +396,7 @@ void AudioMixer::start() {
     auto nodeList = DependencyManager::get<NodeList>();
 
     // prepare the NodeList
-    nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
+    nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer });
     nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
 
     // parse out any AudioMixer settings
@@ -768,8 +773,6 @@ void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
             }
         }
     }
-
-    parseDownstreamServers(settingsObject, NodeType::AudioMixer);
 }
 
 AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) {
diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp
index 337faf2027..4b85e6b667 100644
--- a/assignment-client/src/avatars/AvatarMixer.cpp
+++ b/assignment-client/src/avatars/AvatarMixer.cpp
@@ -63,6 +63,12 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
 
     auto nodeList = DependencyManager::get<NodeList>();
     connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch);
+    connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) {
+        if (node->getType() == NodeType::DownstreamAvatarMixer) {
+            getOrCreateClientData(node);
+            node->activatePublicSocket();
+        }
+    });
 }
 
 SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) {
@@ -805,7 +811,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node
 
 void AvatarMixer::domainSettingsRequestComplete() {
     auto nodeList = DependencyManager::get<NodeList>();
-    nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
+    nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer });
 
     // parse the settings to pull out the values we need
     parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
@@ -877,11 +883,4 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
 
     qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale
                      << "and a maximum avatar scale of" << _domainMaximumScale;
-
-
-    parseDownstreamServers(domainSettings, NodeType::AvatarMixer, [](Node& node) {
-        if (!node.getLinkedData()) {
-            node.setLinkedData(std::unique_ptr<NodeData> { new AvatarMixerClientData(node.getUUID()) });
-        }
-    });
 }
diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp
index dd9f46224b..398b28f7a6 100644
--- a/assignment-client/src/avatars/AvatarMixerSlave.cpp
+++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp
@@ -444,6 +444,10 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin
     _stats.downstreamMixersBroadcastedTo++;
 
     AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
+    if (!nodeData) {
+        qDebug() << "No node data";
+        return;
+    }
 
     // setup a PacketList for the replicated bulk avatar data
     auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData);
diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp
index 5643a4434a..24ae106b90 100644
--- a/domain-server/src/DomainServer.cpp
+++ b/domain-server/src/DomainServer.cpp
@@ -119,6 +119,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
             &_gatekeeper, &DomainGatekeeper::updateNodePermissions);
     connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
             this, &DomainServer::updateReplicatedNodes);
+    connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
+            this, &DomainServer::updateDownstreamNodes);
 
     setupGroupCacheRefresh();
 
@@ -132,6 +134,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
     setupNodeListAndAssignments();
 
     updateReplicatedNodes();
+    updateDownstreamNodes();
 
     if (_type != NonMetaverse) {
         // if we have a metaverse domain, we'll use an access token for API calls
@@ -2219,14 +2222,90 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
     _unfulfilledAssignments.enqueue(assignment);
 }
 
-void DomainServer::updateReplicatedNodes() {
-    static const QString REPLICATION_SETTINGS_KEY = "replication";
-    _replicatedUsernames.clear();
+static const QString REPLICATION_SETTINGS_KEY = "replication";
+
+void DomainServer::updateDownstreamNodes() {
     auto settings = _settingsManager.getSettingsMap();
     if (settings.contains(REPLICATION_SETTINGS_KEY)) {
         auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap();
-        if (replicationSettings.contains("users")) {
-            auto usersSettings = replicationSettings.value("users").toList();
+        if (replicationSettings.contains("downstream_servers")) {
+            auto serversSettings = replicationSettings.value("downstream_servers").toList();
+
+            auto nodeList = DependencyManager::get<LimitedNodeList>();
+            std::vector<HifiSockAddr> knownDownstreamNodes;
+            nodeList->eachNode([&](const SharedNodePointer& otherNode) {
+                if (NodeType::isDownstream(otherNode->getType())) {
+                    knownDownstreamNodes.push_back(otherNode->getPublicSocket());
+                }
+            });
+
+            std::vector<HifiSockAddr> downstreamNodesInSettings;
+
+            for (auto& server : serversSettings) {
+                auto downstreamServer = server.toMap();
+
+                static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
+                static const QString DOWNSTREAM_SERVER_PORT = "port";
+                static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
+
+                // make sure we have the settings we need for this downstream server
+                if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) {
+
+                    auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
+                    auto downstreamNodeType = NodeType::downstreamType(nodeType);
+
+                    // read the address and port and construct a HifiSockAddr from them
+                    HifiSockAddr downstreamServerAddr {
+                        downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
+                        (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
+                    };
+                    downstreamNodesInSettings.push_back(downstreamServerAddr);
+
+                    bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
+                                          downstreamServerAddr) != knownDownstreamNodes.cend();
+                    if (!knownNode) {
+                        // manually add the downstream node to our node list
+                        auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType,
+                                                              downstreamServerAddr, downstreamServerAddr);
+
+                        qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr;
+
+                        // manually activate the public socket for the downstream node
+                        node->activatePublicSocket();
+                    }
+                }
+
+            }
+            std::vector<SharedNodePointer> nodesToKill;
+            nodeList->eachNode([&](const SharedNodePointer& otherNode) {
+                if (NodeType::isDownstream(otherNode->getType())) {
+                    bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(),
+                                               otherNode->getPublicSocket()) != downstreamNodesInSettings.cend();
+                    if (!nodeInSettings) {
+                        qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket();
+                        nodesToKill.push_back(otherNode);
+                    }
+                }
+            });
+            for (auto& node : nodesToKill) {
+                handleKillNode(node);
+            }
+        }
+    }
+}
+
+void DomainServer::updateReplicatedNodes() {
+    // Make sure we have downstream nodes in our list
+    // TODO Move this to a different function
+    _replicatedUsernames.clear();
+    auto settings = _settingsManager.getSettingsMap();
+
+    static const QString REPLICATED_USERS_KEY = "users";
+    _replicatedUsernames.clear();
+    if (settings.contains(REPLICATION_SETTINGS_KEY)) {
+        auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap();
+        if (replicationSettings.contains(REPLICATED_USERS_KEY)) {
+            auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList();
             for (auto& username : usersSettings) {
                 _replicatedUsernames.push_back(username.toString().toLower());
             }
@@ -2239,11 +2318,10 @@ void DomainServer::updateReplicatedNodes() {
         }, [this](const SharedNodePointer& otherNode) {
             auto shouldReplicate = shouldReplicateNode(*otherNode);
             auto isReplicated = otherNode->isReplicated();
-            qDebug() << "Checking " << otherNode->getPermissions().getVerifiedUserName();
             if (isReplicated && !shouldReplicate) {
-                qDebug() << "Setting node to NOT be replicated: " << otherNode->getUUID();
+                qDebug() << "Setting node to NOT be replicated:" << otherNode->getUUID();
             } else if (!isReplicated && shouldReplicate) {
-                qDebug() << "Setting node to replicated: " << otherNode->getUUID();
+                qDebug() << "Setting node to replicated:" << otherNode->getUUID();
             }
             otherNode->setIsReplicated(shouldReplicate);
         }
diff --git a/domain-server/src/DomainServer.h b/domain-server/src/DomainServer.h
index 92df7a88e0..7e43397e9c 100644
--- a/domain-server/src/DomainServer.h
+++ b/domain-server/src/DomainServer.h
@@ -103,6 +103,7 @@ private slots:
     void handleOctreeFileReplacement(QByteArray octreeFile);
 
     void updateReplicatedNodes();
+    void updateDownstreamNodes();
 
 signals:
     void iceServerChanged();
diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp
index 3435843814..f74be8adcd 100644
--- a/libraries/networking/src/Node.cpp
+++ b/libraries/networking/src/Node.cpp
@@ -67,6 +67,11 @@ NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
     }
 }
 
+NodeType_t NodeType::fromString(QString type) {
+    return TypeNameHash.key(type, NodeType::Unassigned);
+}
+
+
 Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
            const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated,
            const QUuid& connectionSecret, QObject* parent) :
diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp
index 73c405e3aa..80576f473a 100644
--- a/libraries/networking/src/NodeList.cpp
+++ b/libraries/networking/src/NodeList.cpp
@@ -771,7 +771,8 @@ void NodeList::sendKeepAlivePings() {
     // send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node
 
     eachMatchingNode([this](const SharedNodePointer& node)->bool {
-        return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType());
+        auto type = node->getType();
+        return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type);
     }, [&](const SharedNodePointer& node) {
         sendPacket(constructPingPacket(), *node);
     });
diff --git a/libraries/networking/src/NodeType.h b/libraries/networking/src/NodeType.h
index 12e74603ef..dacd5e1a68 100644
--- a/libraries/networking/src/NodeType.h
+++ b/libraries/networking/src/NodeType.h
@@ -34,6 +34,8 @@ namespace NodeType {
     const QString& getNodeTypeName(NodeType_t nodeType);
     bool isDownstream(NodeType_t nodeType);
     NodeType_t downstreamType(NodeType_t primaryType);
+
+    NodeType_t fromString(QString type);
 }
 
 typedef QSet<NodeType_t> NodeSet;
diff --git a/libraries/networking/src/ThreadedAssignment.cpp b/libraries/networking/src/ThreadedAssignment.cpp
index 3e679f643a..18e4593c91 100644
--- a/libraries/networking/src/ThreadedAssignment.cpp
+++ b/libraries/networking/src/ThreadedAssignment.cpp
@@ -133,42 +133,3 @@ void ThreadedAssignment::domainSettingsRequestFailed() {
     qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment.";
     setFinished(true);
 }
-
-void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, DownstreamNodeFoundCallback callback) {
-    static const QString REPLICATION_GROUP_KEY = "replication";
-    static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers";
-    if (settingsObject.contains(REPLICATION_GROUP_KEY)) {
-        const QJsonObject replicationObject = settingsObject[REPLICATION_GROUP_KEY].toObject();
-
-        const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray();
-
-        auto nodeList = DependencyManager::get<NodeList>();
-
-        foreach(const QJsonValue& downstreamServerValue, downstreamServers) {
-            const QJsonObject downstreamServer = downstreamServerValue.toObject();
-
-            static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
-            static const QString DOWNSTREAM_SERVER_PORT = "port";
-            static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
-
-            // make sure we have the settings we need for this downstream server
-            if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)
-                && downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) {
-                // read the address and port and construct a HifiSockAddr from them
-                HifiSockAddr downstreamServerAddr {
-                    downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
-                    (quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
-                };
-
-                // manually add the downstream node to our node list
-                auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType),
-                                                      downstreamServerAddr, downstreamServerAddr);
-
-                // manually activate the public socket for the downstream node
-                node->activatePublicSocket();
-
-                callback(*node);
-            }
-        }
-    }
-}
diff --git a/libraries/networking/src/ThreadedAssignment.h b/libraries/networking/src/ThreadedAssignment.h
index 0cc7b2f40c..8b35acaac5 100644
--- a/libraries/networking/src/ThreadedAssignment.h
+++ b/libraries/networking/src/ThreadedAssignment.h
@@ -42,8 +42,6 @@ signals:
 
 protected:
     void commonInit(const QString& targetName, NodeType_t nodeType);
-    void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType,
-                                DownstreamNodeFoundCallback callback = [](Node& downstreamNode) {});
 
     bool _isFinished;
     QTimer _domainServerTimer;