From 717c12fe1696a41b01b123b62e38822f1eb67a4d Mon Sep 17 00:00:00 2001 From: Clement Date: Wed, 27 Feb 2019 18:22:49 -0800 Subject: [PATCH] Add new connection rate limitting Limit rate of connection for new Agents --- assignment-client/src/avatars/AvatarMixer.cpp | 8 +++ .../resources/describe-settings.json | 8 +++ domain-server/src/DomainServer.cpp | 11 ++- libraries/networking/src/NodeList.cpp | 68 ++++++++++++++----- libraries/networking/src/NodeList.h | 24 +++++++ 5 files changed, 96 insertions(+), 23 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 500772c1b5..7c21daefc3 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -904,6 +904,14 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads."; } + { + const QString CONNECTION_RATE = "connection_rate"; + auto nodeList = DependencyManager::get(); + auto defaultConnectionRate = nodeList->getMaxConnectionRate(); + int connectionRate = avatarMixerGroupObject[CONNECTION_RATE].toInt(defaultConnectionRate); + nodeList->setMaxConnectionRate(connectionRate); + } + const QString AVATARS_SETTINGS_KEY = "avatars"; static const QString MIN_HEIGHT_OPTION = "min_avatar_height"; diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index 49023c9af8..140c7d6c17 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1302,6 +1302,14 @@ "placeholder": "1", "default": "1", "advanced": true + }, + { + "name": "connection_rate", + "label": "Connection Rate", + "help": "Number of new agents that can connect to the mixer every second", + "placeholder": "50", + "default": "50", + "advanced": true } ] }, diff --git a/domain-server/src/DomainServer.cpp b/domain-server/src/DomainServer.cpp index 258038b8f1..8ba0cbc115 100644 --- a/domain-server/src/DomainServer.cpp +++ b/domain-server/src/DomainServer.cpp @@ -1243,12 +1243,11 @@ void DomainServer::broadcastNewNode(const SharedNodePointer& addedNode) { limitedNodeList->eachMatchingNode( [this, addedNode](const SharedNodePointer& node)->bool { - if (node->getLinkedData() && node->getActiveSocket() && node != addedNode) { - // is the added Node in this node's interest list? - return isInInterestSet(node, addedNode); - } else { - return false; - } + // is the added Node in this node's interest list? + return node->getLinkedData() + && node->getActiveSocket() + && node != addedNode + && isInInterestSet(node, addedNode); }, [this, &addNodePacket, connectionSecretIndex, addedNode, limitedNodeListWeak](const SharedNodePointer& node) { // send off this packet to the node diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 5e8909db2b..1de710e8ca 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -37,7 +37,9 @@ #include "SharedUtil.h" #include -const int KEEPALIVE_PING_INTERVAL_MS = 1000; +using namespace std::chrono_literals; +static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL = 1s; +static const std::chrono::milliseconds KEEPALIVE_PING_INTERVAL = 1s; NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) : LimitedNodeList(socketListenPort, dtlsListenPort), @@ -104,7 +106,7 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) connect(this, &LimitedNodeList::nodeActivated, this, &NodeList::maybeSendIgnoreSetToNode); // setup our timer to send keepalive pings (it's started and stopped on domain connect/disconnect) - _keepAlivePingTimer.setInterval(KEEPALIVE_PING_INTERVAL_MS); // 1s, Qt::CoarseTimer acceptable + _keepAlivePingTimer.setInterval(KEEPALIVE_PING_INTERVAL); // 1s, Qt::CoarseTimer acceptable connect(&_keepAlivePingTimer, &QTimer::timeout, this, &NodeList::sendKeepAlivePings); connect(&_domainHandler, SIGNAL(connectedToDomain(QUrl)), &_keepAlivePingTimer, SLOT(start())); connect(&_domainHandler, &DomainHandler::disconnectedFromDomain, &_keepAlivePingTimer, &QTimer::stop); @@ -116,6 +118,11 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) // we definitely want STUN to update our public socket, so call the LNL to kick that off startSTUNPublicSocketUpdate(); + // check for local socket updates every so often + QTimer* delayedAddsFlushTimer = new QTimer(this); + connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds); + delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL); + auto& packetReceiver = getPacketReceiver(); packetReceiver.registerListener(PacketType::DomainList, this, "processDomainServerList"); packetReceiver.registerListener(PacketType::Ping, this, "processPingPacket"); @@ -200,7 +207,6 @@ void NodeList::timePingReply(ReceivedMessage& message, const SharedNodePointer& } void NodeList::processPingPacket(QSharedPointer message, SharedNodePointer sendingNode) { - // send back a reply auto replyPacket = constructPingReplyPacket(*message); const HifiSockAddr& senderSockAddr = message->getSenderSockAddr(); @@ -711,27 +717,38 @@ void NodeList::processDomainServerRemovedNode(QSharedPointer me } void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { - // setup variables to read into from QDataStream - qint8 nodeType; - QUuid nodeUUID, connectionSecretUUID; - HifiSockAddr nodePublicSocket, nodeLocalSocket; - NodePermissions permissions; - bool isReplicated; - Node::LocalID sessionLocalID; + NewNodeInfo info; - packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions - >> isReplicated >> sessionLocalID; + packetStream >> info.type + >> info.uuid + >> info.publicSocket + >> info.localSocket + >> info.permissions + >> info.isReplicated + >> info.sessionLocalID + >> info.connectionSecretUUID; // if the public socket address is 0 then it's reachable at the same IP // as the domain server - if (nodePublicSocket.getAddress().isNull()) { - nodePublicSocket.setAddress(_domainHandler.getIP()); + if (info.publicSocket.getAddress().isNull()) { + info.publicSocket.setAddress(_domainHandler.getIP()); } - packetStream >> connectionSecretUUID; + // Throttle connection of new agents. + if (info.type != NodeType::Agent + || _nodesAddedInCurrentTimeSlice < _maxConnectionRate) { - SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, nodeLocalSocket, - sessionLocalID, isReplicated, false, connectionSecretUUID, permissions); + addNewNode(info); + ++_nodesAddedInCurrentTimeSlice; + } else { + delayNodeAdd(info); + } +} + +void NodeList::addNewNode(NewNodeInfo info) { + SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket, + info.sessionLocalID, info.isReplicated, false, + info.connectionSecretUUID, info.permissions); // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server // and always have their public socket as their active socket @@ -739,6 +756,23 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { node->setLastHeardMicrostamp(usecTimestampNow()); node->activatePublicSocket(); } +}; + +void NodeList::delayNodeAdd(NewNodeInfo info) { + _delayedNodeAdds.push_back(info); +}; + +void NodeList::processDelayedAdds() { + _nodesAddedInCurrentTimeSlice = 0; + + auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate); + auto firstNodeToAdd = _delayedNodeAdds.begin(); + auto lastNodeToAdd = firstNodeToAdd + nodesToAdd; + + for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) { + addNewNode(*it); + } + _delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd); } void NodeList::sendAssignment(Assignment& assignment) { diff --git a/libraries/networking/src/NodeList.h b/libraries/networking/src/NodeList.h index e135bc937d..f913b7c3b9 100644 --- a/libraries/networking/src/NodeList.h +++ b/libraries/networking/src/NodeList.h @@ -38,6 +38,8 @@ const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000; +static const size_t DEFAULT_MAX_CONNECTION_RATE { 50 }; + using PacketOrPacketList = std::pair, std::unique_ptr>; using NodePacketOrPacketListPair = std::pair; @@ -93,6 +95,9 @@ public: bool getSendDomainServerCheckInEnabled() { return _sendDomainServerCheckInEnabled; } void setSendDomainServerCheckInEnabled(bool enabled) { _sendDomainServerCheckInEnabled = enabled; } + size_t getMaxConnectionRate() { return _maxConnectionRate; } + void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; } + void removeFromIgnoreMuteSets(const QUuid& nodeID); virtual bool isDomainServer() const override { return false; } @@ -146,6 +151,17 @@ private slots: void maybeSendIgnoreSetToNode(SharedNodePointer node); private: + struct NewNodeInfo { + qint8 type; + QUuid uuid; + HifiSockAddr publicSocket; + HifiSockAddr localSocket; + NodePermissions permissions; + bool isReplicated; + Node::LocalID sessionLocalID; + QUuid connectionSecretUUID; + }; + NodeList() : LimitedNodeList(INVALID_PORT, INVALID_PORT) { assert(false); } // Not implemented, needed for DependencyManager templates compile NodeList(char ownerType, int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT); NodeList(NodeList const&) = delete; // Don't implement, needed to avoid copies of singleton @@ -164,6 +180,10 @@ private: bool sockAddrBelongsToDomainOrNode(const HifiSockAddr& sockAddr); + void addNewNode(NewNodeInfo info); + void delayNodeAdd(NewNodeInfo info); + void processDelayedAdds(); + std::atomic _ownerType; NodeSet _nodeTypesOfInterest; DomainHandler _domainHandler; @@ -181,6 +201,10 @@ private: mutable QReadWriteLock _avatarGainMapLock; tbb::concurrent_unordered_map _avatarGainMap; + size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE }; + size_t _nodesAddedInCurrentTimeSlice { 0 }; + std::vector _delayedNodeAdds; + void sendIgnoreRadiusStateToNode(const SharedNodePointer& destinationNode); #if defined(Q_OS_ANDROID) Setting::Handle _ignoreRadiusEnabled { "IgnoreRadiusEnabled", false };