diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index a4f5a65ce6..c54229b7f2 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -20,6 +20,7 @@ #include const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; +const int MESSAGES_MIXER_RATE_LIMITER_INTERVAL = 1000; // 1 second MessagesMixer::MessagesMixer(ReceivedMessage& message) : ThreadedAssignment(message) { @@ -44,16 +45,17 @@ void MessagesMixer::handleMessages(QSharedPointer receivedMessa QByteArray data; QUuid senderID; bool isText; + auto senderUUID = senderNode->getUUID(); MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID); auto nodeList = DependencyManager::get(); - if (_allSubscribers.value(senderNode->getUUID()) >= _maxMessagesPerSecond) { + if (_allSubscribers.value(senderUUID) >= _maxMessagesPerSecond) { // We will reject all messages that go over this limit for that second. // FIXME: We will add logging options to offer analytics on this later. return; } - _allSubscribers[senderNode->getUUID()] = _allSubscribers.value(senderNode->getUUID()) + 1; + _allSubscribers[senderUUID] += 1; nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { @@ -67,20 +69,32 @@ void MessagesMixer::handleMessages(QSharedPointer receivedMessa } void MessagesMixer::handleMessagesSubscribe(QSharedPointer message, SharedNodePointer senderNode) { + auto senderUUID = senderNode->getUUID(); QString channel = QString::fromUtf8(message->getMessage()); - _channelSubscribers[channel] << senderNode->getUUID(); - _allSubscribers[senderNode->getUUID()] << 0; + _channelSubscribers[channel] << senderUUID; + + _allSubscribers[senderUUID] = 0; } void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer message, SharedNodePointer senderNode) { + auto senderUUID = senderNode->getUUID(); QString channel = QString::fromUtf8(message->getMessage()); + if (_channelSubscribers.contains(channel)) { - _channelSubscribers[channel].remove(senderNode->getUUID()); + _channelSubscribers[channel].remove(senderUUID); } - if (_allSubscribers.contains(senderNode->getUUID())) { - _allSubscribers.remove(senderNode->getUUID()); + bool isSenderSubscribed = false; + QList> allChannels = _channelSubscribers.values(); + foreach (const QSet channel, allChannels) { + if (channel.contains(senderUUID)) { + isSenderSubscribed = true; + } + } + + if (!isSenderSubscribed && _allSubscribers.contains(senderUUID)) { + _allSubscribers.remove(senderUUID); } } @@ -114,8 +128,6 @@ void MessagesMixer::domainSettingsRequestComplete() { auto nodeList = DependencyManager::get(); nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer }); - DomainHandler& domainHandler = nodeList->getDomainHandler(); - // parse the settings to pull out the values we need parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject()); } @@ -126,11 +138,7 @@ void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) const QString NODE_MESSAGES_PER_SECOND_KEY = "max_node_messages_per_second"; QJsonValue maxMessagesPerSecondValue = messagesMixerGroupObject.value(NODE_MESSAGES_PER_SECOND_KEY); - if (!maxMessagesPerSecondValue.isUndefined()) { - _maxMessagesPerSecond = maxMessagesPerSecondValue.toInt(); - } else { - _maxMessagesPerSecond = DEFAULT_NODE_MESSAGES_PER_SECOND; - } + _maxMessagesPerSecond = maxMessagesPerSecondValue.toInt(DEFAULT_NODE_MESSAGES_PER_SECOND); } void MessagesMixer::processMaxMessagesContainer() { @@ -138,12 +146,13 @@ void MessagesMixer::processMaxMessagesContainer() { } void MessagesMixer::startMaxMessagesProcessor() { - maxMessagesPerSecondTimer = new QTimer(); - connect(maxMessagesPerSecondTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer); - maxMessagesPerSecondTimer->start(1000); // Clear the container every second. + _maxMessagesTimer = new QTimer(); + connect(_maxMessagesTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer); + _maxMessagesTimer->start(MESSAGES_MIXER_RATE_LIMITER_INTERVAL); // Clear the container every second. } void MessagesMixer::stopMaxMessagesProcessor() { - delete maxMessagesPerSecondTimer; - maxMessagesPerSecondTimer = NULL; + _maxMessagesTimer->stop(); + _maxMessagesTimer->deleteLater(); + _maxMessagesTimer = nullptr; } diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index 68c21e2c57..db64dbd0c8 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -44,9 +44,9 @@ private: QHash _allSubscribers; const int DEFAULT_NODE_MESSAGES_PER_SECOND = 1000; - int _maxMessagesPerSecond{ 0 }; + int _maxMessagesPerSecond { 0 }; - QTimer* maxMessagesPerSecondTimer; + QTimer* _maxMessagesTimer; }; #endif // hifi_MessagesMixer_h diff --git a/domain-server/resources/describe-settings.json b/domain-server/resources/describe-settings.json index b2b56f8eff..32868c9b80 100644 --- a/domain-server/resources/describe-settings.json +++ b/domain-server/resources/describe-settings.json @@ -1498,8 +1498,8 @@ { "name": "max_node_messages_per_second", "type": "int", - "label": "Per-Node Send Messages", - "help": "Desired maximum send messages (in messages per second) per node", + "label": "Maximum Message Rate", + "help": "Maximum message send rate (messages per second) per node", "placeholder": 1000, "default": 1000, "advanced": true