From 3a945f87ddef1b07c0455d979ed23232907139dc Mon Sep 17 00:00:00 2001 From: Kalila L Date: Fri, 26 Feb 2021 21:01:22 -0500 Subject: [PATCH] Add messaging mixer rate limiting per node functionality. --- .../src/messages/MessagesMixer.cpp | 55 +++++++++++++++++++ .../src/messages/MessagesMixer.h | 14 ++++- .../resources/describe-settings.json | 18 ++++++ 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index bcf4881fcf..a4f5a65ce6 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -47,6 +47,13 @@ void MessagesMixer::handleMessages(QSharedPointer receivedMessa MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID); auto nodeList = DependencyManager::get(); + if (_allSubscribers.value(senderNode->getUUID()) >= _maxMessagesPerSecond) { + // We will reject all messages that go over this limit for that second. + // FIXME: We will add logging options to offer analytics on this later. + return; + } + + _allSubscribers[senderNode->getUUID()] = _allSubscribers.value(senderNode->getUUID()) + 1; nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { @@ -62,6 +69,8 @@ void MessagesMixer::handleMessages(QSharedPointer receivedMessa void MessagesMixer::handleMessagesSubscribe(QSharedPointer message, SharedNodePointer senderNode) { QString channel = QString::fromUtf8(message->getMessage()); _channelSubscribers[channel] << senderNode->getUUID(); + + _allSubscribers[senderNode->getUUID()] << 0; } void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer message, SharedNodePointer senderNode) { @@ -69,6 +78,10 @@ void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer me if (_channelSubscribers.contains(channel)) { _channelSubscribers[channel].remove(senderNode->getUUID()); } + + if (_allSubscribers.contains(senderNode->getUUID())) { + _allSubscribers.remove(senderNode->getUUID()); + } } void MessagesMixer::sendStatsPacket() { @@ -88,7 +101,49 @@ void MessagesMixer::sendStatsPacket() { } void MessagesMixer::run() { + // wait until we have the domain-server settings, otherwise we bail + DomainHandler& domainHandler = DependencyManager::get()->getDomainHandler(); + connect(&domainHandler, &DomainHandler::settingsReceived, this, &MessagesMixer::domainSettingsRequestComplete); + ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); + + startMaxMessagesProcessor(); +} + +void MessagesMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); + + DomainHandler& domainHandler = nodeList->getDomainHandler(); + + // parse the settings to pull out the values we need + parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); +} + +void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { + const QString MESSAGES_MIXER_SETTINGS_KEY = "messages_mixer"; + QJsonObject messagesMixerGroupObject = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject(); + + const QString NODE_MESSAGES_PER_SECOND_KEY = "max_node_messages_per_second"; + QJsonValue maxMessagesPerSecondValue = messagesMixerGroupObject.value(NODE_MESSAGES_PER_SECOND_KEY); + if (!maxMessagesPerSecondValue.isUndefined()) { + _maxMessagesPerSecond = maxMessagesPerSecondValue.toInt(); + } else { + _maxMessagesPerSecond = DEFAULT_NODE_MESSAGES_PER_SECOND; + } +} + +void MessagesMixer::processMaxMessagesContainer() { + _allSubscribers.clear(); +} + +void MessagesMixer::startMaxMessagesProcessor() { + maxMessagesPerSecondTimer = new QTimer(); + connect(maxMessagesPerSecondTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer); + maxMessagesPerSecondTimer->start(1000); // Clear the container every second. +} + +void MessagesMixer::stopMaxMessagesProcessor() { + delete maxMessagesPerSecondTimer; + maxMessagesPerSecondTimer = NULL; } diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index 800d42199b..68c21e2c57 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -32,9 +32,21 @@ private slots: void handleMessages(QSharedPointer message, SharedNodePointer senderNode); void handleMessagesSubscribe(QSharedPointer message, SharedNodePointer senderNode); void handleMessagesUnsubscribe(QSharedPointer message, SharedNodePointer senderNode); + void parseDomainServerSettings(const QJsonObject& domainSettings); + void domainSettingsRequestComplete(); + + void startMaxMessagesProcessor(); + void stopMaxMessagesProcessor(); + void processMaxMessagesContainer(); private: - QHash> _channelSubscribers; + QHash> _channelSubscribers; + QHash _allSubscribers; + + const int DEFAULT_NODE_MESSAGES_PER_SECOND = 1000; + int _maxMessagesPerSecond{ 0 }; + + QTimer* maxMessagesPerSecondTimer; }; #endif // hifi_MessagesMixer_h diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index bb7acf344c..b2b56f8eff 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1488,6 +1488,24 @@ } ] }, + { + "name": "messages_mixer", + "label": "Messages Mixer", + "assignment-types": [ + 4 + ], + "settings": [ + { + "name": "max_node_messages_per_second", + "type": "int", + "label": "Per-Node Send Messages", + "help": "Desired maximum send messages (in messages per second) per node", + "placeholder": 1000, + "default": 1000, + "advanced": true + } + ] + }, { "name": "entity_server_settings", "label": "Entities",