Move downstram server adding to DS

This commit is contained in:
Ryan Huffman 2017-06-14 16:54:04 -07:00
parent 7f75a5f7f5
commit 785156ad9f
10 changed files with 113 additions and 61 deletions

View file

@ -103,6 +103,11 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
);
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) {
if (node->getType() == NodeType::DownstreamAudioMixer) {
node->activatePublicSocket();
}
});
}
void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
@ -391,7 +396,7 @@ void AudioMixer::start() {
auto nodeList = DependencyManager::get<NodeList>();
// prepare the NodeList
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer });
nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
// parse out any AudioMixer settings
@ -768,8 +773,6 @@ void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
}
}
}
parseDownstreamServers(settingsObject, NodeType::AudioMixer);
}
AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) {

View file

@ -63,6 +63,12 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
auto nodeList = DependencyManager::get<NodeList>();
connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](SharedNodePointer node) {
if (node->getType() == NodeType::DownstreamAvatarMixer) {
getOrCreateClientData(node);
node->activatePublicSocket();
}
});
}
SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) {
@ -805,7 +811,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node
void AvatarMixer::domainSettingsRequestComplete() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer });
// parse the settings to pull out the values we need
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
@ -877,11 +883,4 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale
<< "and a maximum avatar scale of" << _domainMaximumScale;
parseDownstreamServers(domainSettings, NodeType::AvatarMixer, [](Node& node) {
if (!node.getLinkedData()) {
node.setLinkedData(std::unique_ptr<NodeData> { new AvatarMixerClientData(node.getUUID()) });
}
});
}

View file

@ -444,6 +444,10 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin
_stats.downstreamMixersBroadcastedTo++;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
if (!nodeData) {
qDebug() << "No node data";
return;
}
// setup a PacketList for the replicated bulk avatar data
auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData);

View file

@ -119,6 +119,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
&_gatekeeper, &DomainGatekeeper::updateNodePermissions);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateReplicatedNodes);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateDownstreamNodes);
setupGroupCacheRefresh();
@ -132,6 +134,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
setupNodeListAndAssignments();
updateReplicatedNodes();
updateDownstreamNodes();
if (_type != NonMetaverse) {
// if we have a metaverse domain, we'll use an access token for API calls
@ -2219,14 +2222,90 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
_unfulfilledAssignments.enqueue(assignment);
}
void DomainServer::updateReplicatedNodes() {
static const QString REPLICATION_SETTINGS_KEY = "replication";
_replicatedUsernames.clear();
static const QString REPLICATION_SETTINGS_KEY = "replication";
void DomainServer::updateDownstreamNodes() {
auto settings = _settingsManager.getSettingsMap();
if (settings.contains(REPLICATION_SETTINGS_KEY)) {
auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap();
if (replicationSettings.contains("users")) {
auto usersSettings = replicationSettings.value("users").toList();
if (replicationSettings.contains("downstream_servers")) {
auto serversSettings = replicationSettings.value("downstream_servers").toList();
auto nodeList = DependencyManager::get<LimitedNodeList>();
std::vector<HifiSockAddr> knownDownstreamNodes;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
knownDownstreamNodes.push_back(otherNode->getPublicSocket());
}
});
std::vector<HifiSockAddr> downstreamNodesInSettings;
for (auto& server : serversSettings) {
auto downstreamServer = server.toMap();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) {
auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
auto downstreamNodeType = NodeType::downstreamType(nodeType);
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
downstreamNodesInSettings.push_back(downstreamServerAddr);
bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
downstreamServerAddr) != knownDownstreamNodes.cend();
if (!knownNode) {
// manually add the downstream node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType,
downstreamServerAddr, downstreamServerAddr);
qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr;
// manually activate the public socket for the downstream node
node->activatePublicSocket();
}
}
}
std::vector<SharedNodePointer> nodesToKill;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(),
otherNode->getPublicSocket()) != downstreamNodesInSettings.cend();
if (!nodeInSettings) {
qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket();
nodesToKill.push_back(otherNode);
}
}
});
for (auto& node : nodesToKill) {
handleKillNode(node);
}
}
}
}
void DomainServer::updateReplicatedNodes() {
// Make sure we have downstream nodes in our list
// TODO Move this to a different function
_replicatedUsernames.clear();
auto settings = _settingsManager.getSettingsMap();
static const QString REPLICATED_USERS_KEY = "users";
_replicatedUsernames.clear();
if (settings.contains(REPLICATION_SETTINGS_KEY)) {
auto replicationSettings = settings.value(REPLICATION_SETTINGS_KEY).toMap();
if (replicationSettings.contains(REPLICATED_USERS_KEY)) {
auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList();
for (auto& username : usersSettings) {
_replicatedUsernames.push_back(username.toString().toLower());
}
@ -2239,11 +2318,10 @@ void DomainServer::updateReplicatedNodes() {
}, [this](const SharedNodePointer& otherNode) {
auto shouldReplicate = shouldReplicateNode(*otherNode);
auto isReplicated = otherNode->isReplicated();
qDebug() << "Checking " << otherNode->getPermissions().getVerifiedUserName();
if (isReplicated && !shouldReplicate) {
qDebug() << "Setting node to NOT be replicated: " << otherNode->getUUID();
qDebug() << "Setting node to NOT be replicated:" << otherNode->getUUID();
} else if (!isReplicated && shouldReplicate) {
qDebug() << "Setting node to replicated: " << otherNode->getUUID();
qDebug() << "Setting node to replicated:" << otherNode->getUUID();
}
otherNode->setIsReplicated(shouldReplicate);
}

View file

@ -103,6 +103,7 @@ private slots:
void handleOctreeFileReplacement(QByteArray octreeFile);
void updateReplicatedNodes();
void updateDownstreamNodes();
signals:
void iceServerChanged();

View file

@ -67,6 +67,11 @@ NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
}
}
NodeType_t NodeType::fromString(QString type) {
return TypeNameHash.key(type, NodeType::Unassigned);
}
Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated,
const QUuid& connectionSecret, QObject* parent) :

View file

@ -771,7 +771,8 @@ void NodeList::sendKeepAlivePings() {
// send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node
eachMatchingNode([this](const SharedNodePointer& node)->bool {
return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType());
auto type = node->getType();
return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type);
}, [&](const SharedNodePointer& node) {
sendPacket(constructPingPacket(), *node);
});

View file

@ -34,6 +34,8 @@ namespace NodeType {
const QString& getNodeTypeName(NodeType_t nodeType);
bool isDownstream(NodeType_t nodeType);
NodeType_t downstreamType(NodeType_t primaryType);
NodeType_t fromString(QString type);
}
typedef QSet<NodeType_t> NodeSet;

View file

@ -133,42 +133,3 @@ void ThreadedAssignment::domainSettingsRequestFailed() {
qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment.";
setFinished(true);
}
void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, DownstreamNodeFoundCallback callback) {
static const QString REPLICATION_GROUP_KEY = "replication";
static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers";
if (settingsObject.contains(REPLICATION_GROUP_KEY)) {
const QJsonObject replicationObject = settingsObject[REPLICATION_GROUP_KEY].toObject();
const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray();
auto nodeList = DependencyManager::get<NodeList>();
foreach(const QJsonValue& downstreamServerValue, downstreamServers) {
const QJsonObject downstreamServer = downstreamServerValue.toObject();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)
&& downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) {
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
// manually add the downstream node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType),
downstreamServerAddr, downstreamServerAddr);
// manually activate the public socket for the downstream node
node->activatePublicSocket();
callback(*node);
}
}
}
}

View file

@ -42,8 +42,6 @@ signals:
protected:
void commonInit(const QString& targetName, NodeType_t nodeType);
void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType,
DownstreamNodeFoundCallback callback = [](Node& downstreamNode) {});
bool _isFinished;
QTimer _domainServerTimer;