add upstream nodes to domain nodelist

This commit is contained in:
seefo 2017-06-19 15:07:24 -07:00 committed by Stephen Birarda
parent 71cc76e94f
commit 48b5989b60
5 changed files with 112 additions and 38 deletions

View file

@ -1339,6 +1339,7 @@
{ {
"name": "broadcasting", "name": "broadcasting",
"label": "Broadcasting", "label": "Broadcasting",
"restart": false,
"settings": [ "settings": [
{ {
"name": "users", "name": "users",
@ -1397,7 +1398,7 @@
] ]
}, },
{ {
"name": "broadcasting_servers", "name": "upstream_servers",
"label": "Broadcasting Servers", "label": "Broadcasting Servers",
"assignment-types": [0,1], "assignment-types": [0,1],
"type": "table", "type": "table",

View file

@ -121,6 +121,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
this, &DomainServer::updateReplicatedNodes); this, &DomainServer::updateReplicatedNodes);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated, connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateDownstreamNodes); this, &DomainServer::updateDownstreamNodes);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateUpstreamNodes);
setupGroupCacheRefresh(); setupGroupCacheRefresh();
@ -135,6 +137,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
updateReplicatedNodes(); updateReplicatedNodes();
updateDownstreamNodes(); updateDownstreamNodes();
updateUpstreamNodes();
if (_type != NonMetaverse) { if (_type != NonMetaverse) {
// if we have a metaverse domain, we'll use an access token for API calls // if we have a metaverse domain, we'll use an access token for API calls
@ -2229,53 +2232,83 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting"; static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
void DomainServer::updateDownstreamNodes() { struct ReplicationServerInfo {
NodeType_t nodeType;
HifiSockAddr sockAddr;
};
ReplicationServerInfo serverInformationFromSettings(QVariantMap serverMap, ReplicationServerDirection direction) {
static const QString REPLICATION_SERVER_ADDRESS = "address";
static const QString REPLICATION_SERVER_PORT = "port";
static const QString REPLICATION_SERVER_TYPE = "server_type";
if (serverMap.contains(REPLICATION_SERVER_ADDRESS) && serverMap.contains(REPLICATION_SERVER_PORT)) {
auto nodeType = NodeType::fromString(serverMap[REPLICATION_SERVER_TYPE].toString());
ReplicationServerInfo serverInfo;
if (direction == Upstream) {
serverInfo.nodeType = NodeType::upstreamType(nodeType);
} else if (direction == Downstream) {
serverInfo.nodeType = NodeType::downstreamType(nodeType);
}
// read the address and port and construct a HifiSockAddr from them
serverInfo.sockAddr = {
serverMap[REPLICATION_SERVER_ADDRESS].toString(),
(quint16) serverMap[REPLICATION_SERVER_PORT].toString().toInt()
};
return serverInfo;
}
return { NodeType::Unassigned, HifiSockAddr() };
}
void DomainServer::updateReplicationNodes(ReplicationServerDirection direction) {
auto settings = _settingsManager.getSettingsMap(); auto settings = _settingsManager.getSettingsMap();
if (settings.contains(BROADCASTING_SETTINGS_KEY)) { if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
auto nodeList = DependencyManager::get<LimitedNodeList>(); auto nodeList = DependencyManager::get<LimitedNodeList>();
std::vector<HifiSockAddr> downstreamNodesInSettings; std::vector<HifiSockAddr> replicationNodesInSettings;
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
if (replicationSettings.contains("downstream_servers")) {
auto serversSettings = replicationSettings.value("downstream_servers").toList();
std::vector<HifiSockAddr> knownDownstreamNodes; auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
QString serversKey = direction == Upstream ? "upstream_servers" : "downstream_servers";
QString replicationDirection = direction == Upstream ? "upstream" : "downstream";
if (replicationSettings.contains(serversKey)) {
auto serversSettings = replicationSettings.value(serversKey).toList();
std::vector<HifiSockAddr> knownReplicationNodes;
nodeList->eachNode([&](const SharedNodePointer& otherNode) { nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) { if ((direction == Upstream && NodeType::isUpstream(otherNode->getType()))
knownDownstreamNodes.push_back(otherNode->getPublicSocket()); || (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) {
knownReplicationNodes.push_back(otherNode->getPublicSocket());
} }
}); });
for (auto& server : serversSettings) { for (auto& server : serversSettings) {
auto downstreamServer = server.toMap(); auto replicationServer = serverInformationFromSettings(server.toMap(), direction);
static const QString DOWNSTREAM_SERVER_ADDRESS = "address"; if (!replicationServer.sockAddr.isNull() && replicationServer.nodeType != NodeType::Unassigned) {
static const QString DOWNSTREAM_SERVER_PORT = "port"; // make sure we have the settings we need for this replication server
static const QString DOWNSTREAM_SERVER_TYPE = "server_type"; replicationNodesInSettings.push_back(replicationServer.sockAddr);
// make sure we have the settings we need for this downstream server bool knownNode = find(knownReplicationNodes.cbegin(), knownReplicationNodes.cend(),
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) { replicationServer.sockAddr) != knownReplicationNodes.cend();
auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
auto downstreamNodeType = NodeType::downstreamType(nodeType);
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
downstreamNodesInSettings.push_back(downstreamServerAddr);
bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
downstreamServerAddr) != knownDownstreamNodes.cend();
if (!knownNode) { if (!knownNode) {
// manually add the downstream node to our node list // manually add the replication node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType, auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), replicationServer.nodeType,
downstreamServerAddr, downstreamServerAddr); replicationServer.sockAddr, replicationServer.sockAddr);
node->setIsForcedNeverSilent(true); node->setIsForcedNeverSilent(true);
qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr; qDebug() << "Adding" << (direction == Upstream ? "upstream" : "downstream")
<< "node:" << node->getUUID() << replicationServer.sockAddr;
// manually activate the public socket for the downstream node // manually activate the public socket for the replication node
node->activatePublicSocket(); node->activatePublicSocket();
} }
} }
@ -2288,11 +2321,13 @@ void DomainServer::updateDownstreamNodes() {
// we cannot recursively take the write lock required by handleKillNode) // we cannot recursively take the write lock required by handleKillNode)
std::vector<SharedNodePointer> nodesToKill; std::vector<SharedNodePointer> nodesToKill;
nodeList->eachNode([&](const SharedNodePointer& otherNode) { nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) { if ((direction == Upstream && NodeType::isUpstream(otherNode->getType()))
bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(), || (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) {
otherNode->getPublicSocket()) != downstreamNodesInSettings.cend(); bool nodeInSettings = find(replicationNodesInSettings.cbegin(), replicationNodesInSettings.cend(),
otherNode->getPublicSocket()) != replicationNodesInSettings.cend();
if (!nodeInSettings) { if (!nodeInSettings) {
qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket(); qDebug() << "Removing" << replicationDirection
<< "node:" << otherNode->getUUID() << otherNode->getPublicSocket();
nodesToKill.push_back(otherNode); nodesToKill.push_back(otherNode);
} }
} }
@ -2304,6 +2339,14 @@ void DomainServer::updateDownstreamNodes() {
} }
} }
void DomainServer::updateDownstreamNodes() {
updateReplicationNodes(Downstream);
}
void DomainServer::updateUpstreamNodes() {
updateReplicationNodes(Upstream);
}
void DomainServer::updateReplicatedNodes() { void DomainServer::updateReplicatedNodes() {
// Make sure we have downstream nodes in our list // Make sure we have downstream nodes in our list
auto settings = _settingsManager.getSettingsMap(); auto settings = _settingsManager.getSettingsMap();

View file

@ -39,6 +39,11 @@ typedef QMultiHash<QUuid, WalletTransaction*> TransactionHash;
using Subnet = QPair<QHostAddress, int>; using Subnet = QPair<QHostAddress, int>;
using SubnetList = std::vector<Subnet>; using SubnetList = std::vector<Subnet>;
enum ReplicationServerDirection {
Upstream,
Downstream
};
class DomainServer : public QCoreApplication, public HTTPSRequestHandler { class DomainServer : public QCoreApplication, public HTTPSRequestHandler {
Q_OBJECT Q_OBJECT
public: public:
@ -104,6 +109,7 @@ private slots:
void updateReplicatedNodes(); void updateReplicatedNodes();
void updateDownstreamNodes(); void updateDownstreamNodes();
void updateUpstreamNodes();
signals: signals:
void iceServerChanged(); void iceServerChanged();
@ -170,6 +176,8 @@ private:
QString pathForRedirect(QString path = QString()) const; QString pathForRedirect(QString path = QString()) const;
void updateReplicationNodes(ReplicationServerDirection direction);
SubnetList _acSubnetWhitelist; SubnetList _acSubnetWhitelist;
std::vector<QString> _replicatedUsernames; std::vector<QString> _replicatedUsernames;

View file

@ -42,6 +42,8 @@ void NodeType::init() {
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer"); TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
TypeNameHash.insert(NodeType::AssetServer, "Asset Server"); TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server"); TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server");
TypeNameHash.insert(NodeType::UpstreamAudioMixer, "Upstream Audio Mixer");
TypeNameHash.insert(NodeType::UpstreamAvatarMixer, "Upstream Avatar Mixer");
TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer"); TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer");
TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer"); TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer");
TypeNameHash.insert(NodeType::Unassigned, "Unassigned"); TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
@ -52,8 +54,23 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) {
return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME; return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME;
} }
bool NodeType::isUpstream(NodeType_t nodeType) {
return nodeType == NodeType::UpstreamAudioMixer || nodeType == NodeType::UpstreamAvatarMixer;
}
bool NodeType::isDownstream(NodeType_t nodeType) { bool NodeType::isDownstream(NodeType_t nodeType) {
return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer; return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer;
}
NodeType_t NodeType::upstreamType(NodeType_t primaryType) {
switch (primaryType) {
case AudioMixer:
return UpstreamAudioMixer;
case AvatarMixer:
return UpstreamAvatarMixer;
default:
return Unassigned;
}
} }
NodeType_t NodeType::downstreamType(NodeType_t primaryType) { NodeType_t NodeType::downstreamType(NodeType_t primaryType) {

View file

@ -25,6 +25,8 @@ namespace NodeType {
const NodeType_t AssetServer = 'A'; const NodeType_t AssetServer = 'A';
const NodeType_t MessagesMixer = 'm'; const NodeType_t MessagesMixer = 'm';
const NodeType_t EntityScriptServer = 'S'; const NodeType_t EntityScriptServer = 'S';
const NodeType_t UpstreamAudioMixer = 'B';
const NodeType_t UpstreamAvatarMixer = 'C';
const NodeType_t DownstreamAudioMixer = 'a'; const NodeType_t DownstreamAudioMixer = 'a';
const NodeType_t DownstreamAvatarMixer = 'w'; const NodeType_t DownstreamAvatarMixer = 'w';
const NodeType_t Unassigned = 1; const NodeType_t Unassigned = 1;
@ -32,9 +34,12 @@ namespace NodeType {
void init(); void init();
const QString& getNodeTypeName(NodeType_t nodeType); const QString& getNodeTypeName(NodeType_t nodeType);
bool isUpstream(NodeType_t nodeType);
bool isDownstream(NodeType_t nodeType); bool isDownstream(NodeType_t nodeType);
NodeType_t upstreamType(NodeType_t primaryType);
NodeType_t downstreamType(NodeType_t primaryType); NodeType_t downstreamType(NodeType_t primaryType);
NodeType_t fromString(QString type); NodeType_t fromString(QString type);
} }