Merge pull request #10709 from birarda/feat/downstream-in-nodelist

send downstream nodes in domain list, fix identity packet bug
This commit is contained in:
Clément Brisset 2017-06-15 15:32:26 -07:00 committed by GitHub
commit 8c0bb4140e
14 changed files with 175 additions and 95 deletions

View file

@ -103,6 +103,11 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
);
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
if (node->getType() == NodeType::DownstreamAudioMixer) {
node->activatePublicSocket();
}
});
}
void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
@ -393,7 +398,7 @@ void AudioMixer::start() {
auto nodeList = DependencyManager::get<NodeList>();
// prepare the NodeList
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer });
nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
// parse out any AudioMixer settings
@ -770,8 +775,6 @@ void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
}
}
}
parseDownstreamServers(settingsObject, NodeType::AudioMixer);
}
AudioMixer::Timer::Timing::Timing(uint64_t& sum) : _sum(sum) {

View file

@ -63,6 +63,12 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
auto nodeList = DependencyManager::get<NodeList>();
connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
if (node->getType() == NodeType::DownstreamAvatarMixer) {
getOrCreateClientData(node);
node->activatePublicSocket();
}
});
}
SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) {
@ -219,7 +225,9 @@ void AvatarMixer::start() {
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
manageDisplayName(node);
if (node->getType() == NodeType::Agent && !node->isUpstream()) {
manageDisplayName(node);
}
++_sumListeners;
});
}, &lockWait, &nodeTransform, &functor);
@ -802,7 +810,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node
void AvatarMixer::domainSettingsRequestComplete() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer });
// parse the settings to pull out the values we need
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
@ -874,11 +882,4 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
qCDebug(avatars) << "This domain requires a minimum avatar scale of" << _domainMinimumScale
<< "and a maximum avatar scale of" << _domainMaximumScale;
parseDownstreamServers(domainSettings, NodeType::AvatarMixer, [](Node& node) {
if (!node.getLinkedData()) {
node.setLinkedData(std::unique_ptr<NodeData> { new AvatarMixerClientData(node.getUUID()) });
}
});
}

View file

@ -440,6 +440,9 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin
_stats.downstreamMixersBroadcastedTo++;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
if (!nodeData) {
return;
}
// setup a PacketList for the replicated bulk avatar data
auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData);
@ -476,7 +479,6 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin
auto lastBroadcastTime = nodeData->getLastBroadcastTime(agentNode->getUUID());
if (lastBroadcastTime <= agentNodeData->getIdentityChangeTimestamp()
|| (start - lastBroadcastTime) >= REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US) {
qDebug() << "Sending identity packet for " << agentNode->getUUID() << " to " << node->getUUID();
sendReplicatedIdentityPacket(agentNodeData, node);
nodeData->setLastBroadcastTime(agentNode->getUUID(), start);
}
@ -540,22 +542,24 @@ void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePoin
}
});
quint64 startPacketSending = usecTimestampNow();
if (avatarPacketList->getNumPackets() > 0) {
quint64 startPacketSending = usecTimestampNow();
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
// send the replicated bulk avatar data
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket());
// send the replicated bulk avatar data
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket());
// record the bytes sent for other avatar data in the AvatarMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
// record the bytes sent for other avatar data in the AvatarMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
}
}

View file

@ -39,7 +39,8 @@ var Settings = {
ACCESS_TOKEN_SELECTOR: '[name="metaverse.access_token"]',
PLACES_TABLE_ID: 'places-table',
FORM_ID: 'settings-form',
INVALID_ROW_CLASS: 'invalid-input'
INVALID_ROW_CLASS: 'invalid-input',
DATA_ROW_INDEX: 'data-row-index'
};
var viewHelpers = {
@ -223,10 +224,10 @@ $(document).ready(function(){
// set focus to the first input in the new row
$target.closest('table').find('tr.inputs input:first').focus();
}
var tableRows = sibling.parent();
var tableBody = tableRows.parent();
// if theres no more siblings, we should jump to a new row
if (sibling.next().length == 0 && tableRows.nextAll().length == 1) {
tableBody.find("." + Settings.ADD_ROW_BUTTON_CLASS).click();
@ -1005,7 +1006,7 @@ function saveSettings() {
var password = formJSON["security"]["http_password"];
var verify_password = formJSON["security"]["verify_http_password"];
// if they've only emptied out the default password field, we should go ahead and acknowledge
// if they've only emptied out the default password field, we should go ahead and acknowledge
// the verify password field
if (password != undefined && verify_password == undefined) {
verify_password = "";
@ -1158,8 +1159,9 @@ function makeTable(setting, keypath, setting_value) {
}
html += "<tr class='" + Settings.DATA_ROW_CLASS + "' " +
(isCategorized ? ("data-category='" + categoryValue + "'") : "") + " " +
(isArray ? "" : "name='" + keypath + "." + rowIndexOrName + "'") + ">";
(isCategorized ? ("data-category='" + categoryValue + "'") : "") + " " +
(isArray ? "" : "name='" + keypath + "." + rowIndexOrName + "'") +
(isArray ? Settings.DATA_ROW_INDEX + "='" + (row_num - 1) + "'" : "" ) + ">";
if (setting.numbered === true) {
html += "<td class='numbered'>" + row_num + "</td>"
@ -1292,12 +1294,12 @@ function makeTableHiddenInputs(setting, initialValues, categoryValue) {
} else if (col.type === "select") {
html += "<td class='" + Settings.DATA_COL_CLASS + "'name='" + col.name + "'>"
html += "<select style='display: none;' class='form-control' data-hidden-input='" + col.name + "'>'"
for (var i in col.options) {
var option = col.options[i];
html += "<option value='" + option.value + "' " + (option.value == defaultValue ? 'selected' : '') + ">" + option.label + "</option>";
}
html += "</select>";
html += "<input type='hidden' class='table-dropdown form-control trigger-change' name='" + col.name + "' value='" + defaultValue + "'></td>";
} else {
@ -1398,6 +1400,15 @@ function addTableRow(row) {
var setting_name = table.attr("name");
row.addClass(Settings.DATA_ROW_CLASS + " " + Settings.NEW_ROW_CLASS);
// if this is an array, add the row index (which is the index of the last row + 1)
// as a data attribute to the row
var row_index = 0;
if (isArray) {
var previous_row_index = parseInt(row.siblings('.' + Settings.DATA_ROW_CLASS + ':last').attr(Settings.DATA_ROW_INDEX), 10);
row_index = previous_row_index + 1;
row.attr(Settings.DATA_ROW_INDEX, row_index);
}
var focusChanged = false;
_.each(row.children(), function(element) {
@ -1430,7 +1441,6 @@ function addTableRow(row) {
var isDropdown = input.hasClass("table-dropdown");
if (isArray) {
var row_index = row.siblings('.' + Settings.DATA_ROW_CLASS).length
var key = $(element).attr('name');
// are there multiple columns or just one?
@ -1438,16 +1448,13 @@ function addTableRow(row) {
var num_columns = row.children('.' + Settings.DATA_COL_CLASS).length
var newName = setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : "");
if (isCheckbox) {
input.attr("name", newName)
} else {
if (isDropdown) {
// default values for hidden inputs inside child selects gets cleared so we need to remind it
var selectElement = $(element).children("select");
selectElement.attr("data-hidden-input", newName);
$(element).children("input").val(selectElement.val());
}
input.attr("name", newName);
input.attr("name", newName);
if (isDropdown) {
// default values for hidden inputs inside child selects gets cleared so we need to remind it
var selectElement = $(element).children("select");
selectElement.attr("data-hidden-input", newName);
$(element).children("input").val(selectElement.val());
}
} else {
// because the name of the setting in question requires the key
@ -1462,10 +1469,10 @@ function addTableRow(row) {
input.focus();
focusChanged = true;
}
// if we are adding a dropdown, we should go ahead and make its select
// element is visible
if (isDropdown) {
if (isDropdown) {
$(element).children("select").attr("style", "");
}

View file

@ -119,6 +119,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
&_gatekeeper, &DomainGatekeeper::updateNodePermissions);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateReplicatedNodes);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateDownstreamNodes);
setupGroupCacheRefresh();
@ -132,6 +134,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
setupNodeListAndAssignments();
updateReplicatedNodes();
updateDownstreamNodes();
if (_type != NonMetaverse) {
// if we have a metaverse domain, we'll use an access token for API calls
@ -2219,14 +2222,90 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
_unfulfilledAssignments.enqueue(assignment);
}
void DomainServer::updateReplicatedNodes() {
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
_replicatedUsernames.clear();
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
void DomainServer::updateDownstreamNodes() {
auto settings = _settingsManager.getSettingsMap();
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
auto nodeList = DependencyManager::get<LimitedNodeList>();
std::vector<HifiSockAddr> downstreamNodesInSettings;
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
if (replicationSettings.contains("users")) {
auto usersSettings = replicationSettings.value("users").toList();
if (replicationSettings.contains("downstream_servers")) {
auto serversSettings = replicationSettings.value("downstream_servers").toList();
std::vector<HifiSockAddr> knownDownstreamNodes;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
knownDownstreamNodes.push_back(otherNode->getPublicSocket());
}
});
for (auto& server : serversSettings) {
auto downstreamServer = server.toMap();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) {
auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
auto downstreamNodeType = NodeType::downstreamType(nodeType);
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
downstreamNodesInSettings.push_back(downstreamServerAddr);
bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
downstreamServerAddr) != knownDownstreamNodes.cend();
if (!knownNode) {
// manually add the downstream node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType,
downstreamServerAddr, downstreamServerAddr);
node->setIsForcedNeverSilent(true);
qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr;
// manually activate the public socket for the downstream node
node->activatePublicSocket();
}
}
}
}
std::vector<SharedNodePointer> nodesToKill;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(),
otherNode->getPublicSocket()) != downstreamNodesInSettings.cend();
if (!nodeInSettings) {
qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket();
nodesToKill.push_back(otherNode);
}
}
});
for (auto& node : nodesToKill) {
handleKillNode(node);
}
}
}
void DomainServer::updateReplicatedNodes() {
// Make sure we have downstream nodes in our list
// TODO Move this to a different function
_replicatedUsernames.clear();
auto settings = _settingsManager.getSettingsMap();
static const QString REPLICATED_USERS_KEY = "users";
_replicatedUsernames.clear();
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
if (replicationSettings.contains(REPLICATED_USERS_KEY)) {
auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList();
for (auto& username : usersSettings) {
_replicatedUsernames.push_back(username.toString().toLower());
}
@ -2240,6 +2319,9 @@ void DomainServer::updateReplicatedNodes() {
auto shouldReplicate = shouldReplicateNode(*otherNode);
auto isReplicated = otherNode->isReplicated();
if (isReplicated && !shouldReplicate) {
qDebug() << "Setting node to NOT be replicated:" << otherNode->getUUID();
} else if (!isReplicated && shouldReplicate) {
qDebug() << "Setting node to replicated:" << otherNode->getUUID();
qDebug() << "Setting node to NOT be replicated:"
<< otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID();
} else if (!isReplicated && shouldReplicate) {

View file

@ -103,6 +103,7 @@ private slots:
void handleOctreeFileReplacement(QByteArray octreeFile);
void updateReplicatedNodes();
void updateDownstreamNodes();
signals:
void iceServerChanged();

View file

@ -128,7 +128,12 @@ AvatarSharedPointer AvatarHashMap::parseAvatarData(QSharedPointer<ReceivedMessag
void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
// peek the avatar UUID from the incoming packet
QUuid identityUUID = message->peek(NUM_BYTES_RFC4122_UUID);
QUuid identityUUID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID));
if (identityUUID.isNull()) {
qCDebug(avatars) << "Refusing to process identity packet for null avatar ID";
return;
}
// make sure this isn't for an ignored avatar
auto nodeList = DependencyManager::get<NodeList>();

View file

@ -749,7 +749,7 @@ void LimitedNodeList::removeSilentNodes() {
SharedNodePointer node = it->second;
node->getMutex().lock();
if (!NodeType::isDownstream(node->getType())
if (!node->isForcedNeverSilent()
&& (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) {
// call the NodeHash erase to get rid of this node
it = _nodeHash.unsafe_erase(it);

View file

@ -76,6 +76,9 @@ public:
float getOutboundBandwidth() const; // in kbps
float getInboundBandwidth() const; // in kbps
bool isForcedNeverSilent() const { return _isForcedNeverSilent; }
void setIsForcedNeverSilent(bool isForcedNeverSilent) { _isForcedNeverSilent = isForcedNeverSilent; }
friend QDataStream& operator<<(QDataStream& out, const NetworkPeer& peer);
friend QDataStream& operator>>(QDataStream& in, NetworkPeer& peer);
public slots:
@ -103,6 +106,8 @@ protected:
QTimer* _pingTimer = NULL;
int _connectionAttempts;
bool _isForcedNeverSilent { false };
};
QDebug operator<<(QDebug debug, const NetworkPeer &peer);

View file

@ -67,6 +67,11 @@ NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
}
}
NodeType_t NodeType::fromString(QString type) {
return TypeNameHash.key(type, NodeType::Unassigned);
}
Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
const HifiSockAddr& localSocket, const NodePermissions& permissions, bool isReplicated,
const QUuid& connectionSecret, QObject* parent) :

View file

@ -668,6 +668,11 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket,
nodeLocalSocket, permissions, isReplicated, connectionUUID);
// nodes that are downstream of our own type are kept alive when we hear about them from the domain server
if (node->getType() == NodeType::downstreamType(_ownerType)) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
}
void NodeList::sendAssignment(Assignment& assignment) {
@ -771,7 +776,8 @@ void NodeList::sendKeepAlivePings() {
// send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node
eachMatchingNode([this](const SharedNodePointer& node)->bool {
return !node->isUpstream() && _nodeTypesOfInterest.contains(node->getType());
auto type = node->getType();
return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type);
}, [&](const SharedNodePointer& node) {
sendPacket(constructPingPacket(), *node);
});

View file

@ -34,6 +34,8 @@ namespace NodeType {
const QString& getNodeTypeName(NodeType_t nodeType);
bool isDownstream(NodeType_t nodeType);
NodeType_t downstreamType(NodeType_t primaryType);
NodeType_t fromString(QString type);
}
typedef QSet<NodeType_t> NodeSet;

View file

@ -133,42 +133,3 @@ void ThreadedAssignment::domainSettingsRequestFailed() {
qCDebug(networking) << "Failed to retreive settings object from domain-server. Bailing on assignment.";
setFinished(true);
}
void ThreadedAssignment::parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType, DownstreamNodeFoundCallback callback) {
static const QString BROADCASTING_GROUP_KEY = "broadcasting";
static const QString DOWNSTREAM_SERVERS_SETTING_KEY = "downstream_servers";
if (settingsObject.contains(BROADCASTING_GROUP_KEY)) {
const QJsonObject replicationObject = settingsObject[BROADCASTING_GROUP_KEY].toObject();
const QJsonArray downstreamServers = replicationObject[DOWNSTREAM_SERVERS_SETTING_KEY].toArray();
auto nodeList = DependencyManager::get<NodeList>();
foreach(const QJsonValue& downstreamServerValue, downstreamServers) {
const QJsonObject downstreamServer = downstreamServerValue.toObject();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)
&& downstreamServer[DOWNSTREAM_SERVER_TYPE].toString() == NodeType::getNodeTypeName(nodeType)) {
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
// manually add the downstream node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), NodeType::downstreamType(nodeType),
downstreamServerAddr, downstreamServerAddr);
// manually activate the public socket for the downstream node
node->activatePublicSocket();
callback(*node);
}
}
}
}

View file

@ -42,8 +42,6 @@ signals:
protected:
void commonInit(const QString& targetName, NodeType_t nodeType);
void parseDownstreamServers(const QJsonObject& settingsObject, NodeType_t nodeType,
DownstreamNodeFoundCallback callback = [](Node& downstreamNode) {});
bool _isFinished;
QTimer _domainServerTimer;