diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 6522841dc5..58100a6336 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -954,6 +954,14 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads."; } + { + const QString CONNECTION_RATE = "connection_rate"; + auto nodeList = DependencyManager::get(); + auto defaultConnectionRate = nodeList->getMaxConnectionRate(); + int connectionRate = avatarMixerGroupObject[CONNECTION_RATE].toInt((int)defaultConnectionRate); + nodeList->setMaxConnectionRate(connectionRate); + } + const QString AVATARS_SETTINGS_KEY = "avatars"; static const QString MIN_HEIGHT_OPTION = "min_avatar_height"; diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index 49023c9af8..140c7d6c17 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1302,6 +1302,14 @@ "placeholder": "1", "default": "1", "advanced": true + }, + { + "name": "connection_rate", + "label": "Connection Rate", + "help": "Number of new agents that can connect to the mixer every second", + "placeholder": "50", + "default": "50", + "advanced": true } ] }, diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 85b116129c..8d5cb165cb 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -1243,12 +1243,11 @@ void DomainServer::broadcastNewNode(const SharedNodePointer& addedNode) { limitedNodeList->eachMatchingNode( [this, addedNode](const SharedNodePointer& node)->bool { - if (node->getLinkedData() && node->getActiveSocket() && node != addedNode) { - // is the added Node in this node's interest list? - return isInInterestSet(node, addedNode); - } else { - return false; - } + // is the added Node in this node's interest list? + return node->getLinkedData() + && node->getActiveSocket() + && node != addedNode + && isInInterestSet(node, addedNode); }, [this, &addNodePacket, connectionSecretIndex, addedNode, limitedNodeListWeak](const SharedNodePointer& node) { // send off this packet to the node diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index eaa02f059e..01c7a16166 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -40,6 +40,9 @@ static Setting::Handle LIMITED_NODELIST_LOCAL_PORT("LimitedNodeList.LocalPort", 0); +using namespace std::chrono_literals; +static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL_MS = 1s; + const std::set SOLO_NODE_TYPES = { NodeType::AvatarMixer, NodeType::AudioMixer, @@ -88,6 +91,11 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) : connect(statsSampleTimer, &QTimer::timeout, this, &LimitedNodeList::sampleConnectionStats); statsSampleTimer->start(CONNECTION_STATS_SAMPLE_INTERVAL_MSECS); + // Flush delayed adds every second + QTimer* delayedAddsFlushTimer = new QTimer(this); + connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds); + delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL_MS.count()); + // check the local socket right now updateLocalSocket(); @@ -367,7 +375,7 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe return true; - } else { + } else if (!isDelayedNode(sourceID)){ HIFI_FCDEBUG(networking(), "Packet of type" << headerType << "received from unknown node with Local ID" << sourceLocalID); } @@ -736,6 +744,53 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t return newNodePointer; } +void LimitedNodeList::addNewNode(NewNodeInfo info) { + // Throttle connection of new agents. + if (info.type == NodeType::Agent && _nodesAddedInCurrentTimeSlice >= _maxConnectionRate) { + delayNodeAdd(info); + return; + } + + SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket, + info.sessionLocalID, info.isReplicated, false, + info.connectionSecretUUID, info.permissions); + + ++_nodesAddedInCurrentTimeSlice; +} + +void LimitedNodeList::delayNodeAdd(NewNodeInfo info) { + _delayedNodeAdds.push_back(info); +} + +void LimitedNodeList::removeDelayedAdd(QUuid nodeUUID) { + auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) { + return info.uuid == nodeUUID; + }); + if (it != _delayedNodeAdds.end()) { + _delayedNodeAdds.erase(it); + } +} + +bool LimitedNodeList::isDelayedNode(QUuid nodeUUID) { + auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) { + return info.uuid == nodeUUID; + }); + return it != _delayedNodeAdds.end(); +} + +void LimitedNodeList::processDelayedAdds() { + _nodesAddedInCurrentTimeSlice = 0; + + auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate); + auto firstNodeToAdd = _delayedNodeAdds.begin(); + auto lastNodeToAdd = firstNodeToAdd + nodesToAdd; + + for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) { + addNewNode(*it); + } + _delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd); +} + std::unique_ptr LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) { int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t); @@ -793,13 +848,13 @@ unsigned int LimitedNodeList::broadcastToNodes(std::unique_ptr packet, eachNode([&](const SharedNodePointer& node){ if (node && destinationNodeTypes.contains(node->getType())) { - if (packet->isReliable()) { - auto packetCopy = NLPacket::createCopy(*packet); - sendPacket(std::move(packetCopy), *node); - } else { - sendUnreliablePacket(*packet, *node); - } - ++n; + if (packet->isReliable()) { + auto packetCopy = NLPacket::createCopy(*packet); + sendPacket(std::move(packetCopy), *node); + } else { + sendUnreliablePacket(*packet, *node); + } + ++n; } }); diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 450fad96a9..eb1a3e2dde 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -51,6 +51,8 @@ const int INVALID_PORT = -1; const quint64 NODE_SILENCE_THRESHOLD_MSECS = 5 * 1000; +static const size_t DEFAULT_MAX_CONNECTION_RATE { std::numeric_limits::max() }; + extern const std::set SOLO_NODE_TYPES; const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost"; @@ -316,6 +318,9 @@ public: void sendFakedHandshakeRequestToNode(SharedNodePointer node); #endif + size_t getMaxConnectionRate() const { return _maxConnectionRate; } + void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; } + int getInboundPPS() const { return _inboundPPS; } int getOutboundPPS() const { return _outboundPPS; } float getInboundKbps() const { return _inboundKbps; } @@ -367,7 +372,20 @@ protected slots: void clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr); + void processDelayedAdds(); + protected: + struct NewNodeInfo { + qint8 type; + QUuid uuid; + HifiSockAddr publicSocket; + HifiSockAddr localSocket; + NodePermissions permissions; + bool isReplicated; + Node::LocalID sessionLocalID; + QUuid connectionSecretUUID; + }; + LimitedNodeList(int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT); LimitedNodeList(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton void operator=(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton @@ -390,6 +408,11 @@ protected: bool sockAddrBelongsToNode(const HifiSockAddr& sockAddr); + void addNewNode(NewNodeInfo info); + void delayNodeAdd(NewNodeInfo info); + void removeDelayedAdd(QUuid nodeUUID); + bool isDelayedNode(QUuid nodeUUID); + NodeHash _nodeHash; mutable QReadWriteLock _nodeMutex { QReadWriteLock::Recursive }; udt::Socket _nodeSocket; @@ -440,6 +463,10 @@ private: Node::LocalID _sessionLocalID { 0 }; bool _flagTimeForConnectionStep { false }; // only keep track in interface + size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE }; + size_t _nodesAddedInCurrentTimeSlice { 0 }; + std::vector _delayedNodeAdds; + int _inboundPPS { 0 }; int _outboundPPS { 0 }; float _inboundKbps { 0.0f }; diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 5e8909db2b..d45e466291 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -200,7 +200,6 @@ void NodeList::timePingReply(ReceivedMessage& message, const SharedNodePointer& } void NodeList::processPingPacket(QSharedPointer message, SharedNodePointer sendingNode) { - // send back a reply auto replyPacket = constructPingReplyPacket(*message); const HifiSockAddr& senderSockAddr = message->getSenderSockAddr(); @@ -708,37 +707,28 @@ void NodeList::processDomainServerRemovedNode(QSharedPointer me QUuid nodeUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); qCDebug(networking) << "Received packet from domain-server to remove node with UUID" << uuidStringWithoutCurlyBraces(nodeUUID); killNodeWithUUID(nodeUUID); + removeDelayedAdd(nodeUUID); } void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { - // setup variables to read into from QDataStream - qint8 nodeType; - QUuid nodeUUID, connectionSecretUUID; - HifiSockAddr nodePublicSocket, nodeLocalSocket; - NodePermissions permissions; - bool isReplicated; - Node::LocalID sessionLocalID; + NewNodeInfo info; - packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions - >> isReplicated >> sessionLocalID; + packetStream >> info.type + >> info.uuid + >> info.publicSocket + >> info.localSocket + >> info.permissions + >> info.isReplicated + >> info.sessionLocalID + >> info.connectionSecretUUID; // if the public socket address is 0 then it's reachable at the same IP // as the domain server - if (nodePublicSocket.getAddress().isNull()) { - nodePublicSocket.setAddress(_domainHandler.getIP()); + if (info.publicSocket.getAddress().isNull()) { + info.publicSocket.setAddress(_domainHandler.getIP()); } - packetStream >> connectionSecretUUID; - - SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, nodeLocalSocket, - sessionLocalID, isReplicated, false, connectionSecretUUID, permissions); - - // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server - // and always have their public socket as their active socket - if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) { - node->setLastHeardMicrostamp(usecTimestampNow()); - node->activatePublicSocket(); - } + addNewNode(info); } void NodeList::sendAssignment(Assignment& assignment) { @@ -785,7 +775,6 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::startNodeHolePunch(const SharedNodePointer& node) { - // we don't hole punch to downstream servers, since it is assumed that we have a direct line to them // we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them @@ -799,6 +788,14 @@ void NodeList::startNodeHolePunch(const SharedNodePointer& node) { // ping this node immediately pingPunchForInactiveNode(node); } + + // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server + // and always have their public socket as their active socket + if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) { + node->setLastHeardMicrostamp(usecTimestampNow()); + node->activatePublicSocket(); + } + } void NodeList::handleNodePingTimeout() {