Add new connection rate limitting

Limit rate of connection for new Agents
This commit is contained in:
Clement 2019-02-27 18:22:49 -08:00
parent abf2226750
commit 717c12fe16
5 changed files with 96 additions and 23 deletions
assignment-client/src/avatars
domain-server
libraries/networking/src

View file

@ -904,6 +904,14 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads.";
}
{
const QString CONNECTION_RATE = "connection_rate";
auto nodeList = DependencyManager::get<NodeList>();
auto defaultConnectionRate = nodeList->getMaxConnectionRate();
int connectionRate = avatarMixerGroupObject[CONNECTION_RATE].toInt(defaultConnectionRate);
nodeList->setMaxConnectionRate(connectionRate);
}
const QString AVATARS_SETTINGS_KEY = "avatars";
static const QString MIN_HEIGHT_OPTION = "min_avatar_height";

View file

@ -1302,6 +1302,14 @@
"placeholder": "1",
"default": "1",
"advanced": true
},
{
"name": "connection_rate",
"label": "Connection Rate",
"help": "Number of new agents that can connect to the mixer every second",
"placeholder": "50",
"default": "50",
"advanced": true
}
]
},

View file

@ -1243,12 +1243,11 @@ void DomainServer::broadcastNewNode(const SharedNodePointer& addedNode) {
limitedNodeList->eachMatchingNode(
[this, addedNode](const SharedNodePointer& node)->bool {
if (node->getLinkedData() && node->getActiveSocket() && node != addedNode) {
// is the added Node in this node's interest list?
return isInInterestSet(node, addedNode);
} else {
return false;
}
// is the added Node in this node's interest list?
return node->getLinkedData()
&& node->getActiveSocket()
&& node != addedNode
&& isInInterestSet(node, addedNode);
},
[this, &addNodePacket, connectionSecretIndex, addedNode, limitedNodeListWeak](const SharedNodePointer& node) {
// send off this packet to the node

View file

@ -37,7 +37,9 @@
#include "SharedUtil.h"
#include <Trace.h>
const int KEEPALIVE_PING_INTERVAL_MS = 1000;
using namespace std::chrono_literals;
static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL = 1s;
static const std::chrono::milliseconds KEEPALIVE_PING_INTERVAL = 1s;
NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) :
LimitedNodeList(socketListenPort, dtlsListenPort),
@ -104,7 +106,7 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort)
connect(this, &LimitedNodeList::nodeActivated, this, &NodeList::maybeSendIgnoreSetToNode);
// setup our timer to send keepalive pings (it's started and stopped on domain connect/disconnect)
_keepAlivePingTimer.setInterval(KEEPALIVE_PING_INTERVAL_MS); // 1s, Qt::CoarseTimer acceptable
_keepAlivePingTimer.setInterval(KEEPALIVE_PING_INTERVAL); // 1s, Qt::CoarseTimer acceptable
connect(&_keepAlivePingTimer, &QTimer::timeout, this, &NodeList::sendKeepAlivePings);
connect(&_domainHandler, SIGNAL(connectedToDomain(QUrl)), &_keepAlivePingTimer, SLOT(start()));
connect(&_domainHandler, &DomainHandler::disconnectedFromDomain, &_keepAlivePingTimer, &QTimer::stop);
@ -116,6 +118,11 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort)
// we definitely want STUN to update our public socket, so call the LNL to kick that off
startSTUNPublicSocketUpdate();
// check for local socket updates every so often
QTimer* delayedAddsFlushTimer = new QTimer(this);
connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds);
delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL);
auto& packetReceiver = getPacketReceiver();
packetReceiver.registerListener(PacketType::DomainList, this, "processDomainServerList");
packetReceiver.registerListener(PacketType::Ping, this, "processPingPacket");
@ -200,7 +207,6 @@ void NodeList::timePingReply(ReceivedMessage& message, const SharedNodePointer&
}
void NodeList::processPingPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
// send back a reply
auto replyPacket = constructPingReplyPacket(*message);
const HifiSockAddr& senderSockAddr = message->getSenderSockAddr();
@ -711,27 +717,38 @@ void NodeList::processDomainServerRemovedNode(QSharedPointer<ReceivedMessage> me
}
void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
// setup variables to read into from QDataStream
qint8 nodeType;
QUuid nodeUUID, connectionSecretUUID;
HifiSockAddr nodePublicSocket, nodeLocalSocket;
NodePermissions permissions;
bool isReplicated;
Node::LocalID sessionLocalID;
NewNodeInfo info;
packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions
>> isReplicated >> sessionLocalID;
packetStream >> info.type
>> info.uuid
>> info.publicSocket
>> info.localSocket
>> info.permissions
>> info.isReplicated
>> info.sessionLocalID
>> info.connectionSecretUUID;
// if the public socket address is 0 then it's reachable at the same IP
// as the domain server
if (nodePublicSocket.getAddress().isNull()) {
nodePublicSocket.setAddress(_domainHandler.getIP());
if (info.publicSocket.getAddress().isNull()) {
info.publicSocket.setAddress(_domainHandler.getIP());
}
packetStream >> connectionSecretUUID;
// Throttle connection of new agents.
if (info.type != NodeType::Agent
|| _nodesAddedInCurrentTimeSlice < _maxConnectionRate) {
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, nodeLocalSocket,
sessionLocalID, isReplicated, false, connectionSecretUUID, permissions);
addNewNode(info);
++_nodesAddedInCurrentTimeSlice;
} else {
delayNodeAdd(info);
}
}
void NodeList::addNewNode(NewNodeInfo info) {
SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket,
info.sessionLocalID, info.isReplicated, false,
info.connectionSecretUUID, info.permissions);
// nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server
// and always have their public socket as their active socket
@ -739,6 +756,23 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
node->setLastHeardMicrostamp(usecTimestampNow());
node->activatePublicSocket();
}
};
void NodeList::delayNodeAdd(NewNodeInfo info) {
_delayedNodeAdds.push_back(info);
};
void NodeList::processDelayedAdds() {
_nodesAddedInCurrentTimeSlice = 0;
auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate);
auto firstNodeToAdd = _delayedNodeAdds.begin();
auto lastNodeToAdd = firstNodeToAdd + nodesToAdd;
for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) {
addNewNode(*it);
}
_delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd);
}
void NodeList::sendAssignment(Assignment& assignment) {

View file

@ -38,6 +38,8 @@
const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000;
static const size_t DEFAULT_MAX_CONNECTION_RATE { 50 };
using PacketOrPacketList = std::pair<std::unique_ptr<NLPacket>, std::unique_ptr<NLPacketList>>;
using NodePacketOrPacketListPair = std::pair<SharedNodePointer, PacketOrPacketList>;
@ -93,6 +95,9 @@ public:
bool getSendDomainServerCheckInEnabled() { return _sendDomainServerCheckInEnabled; }
void setSendDomainServerCheckInEnabled(bool enabled) { _sendDomainServerCheckInEnabled = enabled; }
size_t getMaxConnectionRate() { return _maxConnectionRate; }
void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; }
void removeFromIgnoreMuteSets(const QUuid& nodeID);
virtual bool isDomainServer() const override { return false; }
@ -146,6 +151,17 @@ private slots:
void maybeSendIgnoreSetToNode(SharedNodePointer node);
private:
struct NewNodeInfo {
qint8 type;
QUuid uuid;
HifiSockAddr publicSocket;
HifiSockAddr localSocket;
NodePermissions permissions;
bool isReplicated;
Node::LocalID sessionLocalID;
QUuid connectionSecretUUID;
};
NodeList() : LimitedNodeList(INVALID_PORT, INVALID_PORT) { assert(false); } // Not implemented, needed for DependencyManager templates compile
NodeList(char ownerType, int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT);
NodeList(NodeList const&) = delete; // Don't implement, needed to avoid copies of singleton
@ -164,6 +180,10 @@ private:
bool sockAddrBelongsToDomainOrNode(const HifiSockAddr& sockAddr);
void addNewNode(NewNodeInfo info);
void delayNodeAdd(NewNodeInfo info);
void processDelayedAdds();
std::atomic<NodeType_t> _ownerType;
NodeSet _nodeTypesOfInterest;
DomainHandler _domainHandler;
@ -181,6 +201,10 @@ private:
mutable QReadWriteLock _avatarGainMapLock;
tbb::concurrent_unordered_map<QUuid, float, UUIDHasher> _avatarGainMap;
size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE };
size_t _nodesAddedInCurrentTimeSlice { 0 };
std::vector<NewNodeInfo> _delayedNodeAdds;
void sendIgnoreRadiusStateToNode(const SharedNodePointer& destinationNode);
#if defined(Q_OS_ANDROID)
Setting::Handle<bool> _ignoreRadiusEnabled { "IgnoreRadiusEnabled", false };