This commit is contained in:
Kalila L 2021-03-12 10:50:50 -05:00
parent 3a945f87dd
commit 4b61d3328d
3 changed files with 32 additions and 23 deletions

View file

@ -20,6 +20,7 @@
#include <udt/PacketHeaders.h>
const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer";
const int MESSAGES_MIXER_RATE_LIMITER_INTERVAL = 1000; // 1 second
MessagesMixer::MessagesMixer(ReceivedMessage& message) : ThreadedAssignment(message)
{
@ -44,16 +45,17 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
QByteArray data;
QUuid senderID;
bool isText;
auto senderUUID = senderNode->getUUID();
MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID);
auto nodeList = DependencyManager::get<NodeList>();
if (_allSubscribers.value(senderNode->getUUID()) >= _maxMessagesPerSecond) {
if (_allSubscribers.value(senderUUID) >= _maxMessagesPerSecond) {
// We will reject all messages that go over this limit for that second.
// FIXME: We will add logging options to offer analytics on this later.
return;
}
_allSubscribers[senderNode->getUUID()] = _allSubscribers.value(senderNode->getUUID()) + 1;
_allSubscribers[senderUUID] += 1;
nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool {
@ -67,20 +69,32 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
}
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
auto senderUUID = senderNode->getUUID();
QString channel = QString::fromUtf8(message->getMessage());
_channelSubscribers[channel] << senderNode->getUUID();
_allSubscribers[senderNode->getUUID()] << 0;
_channelSubscribers[channel] << senderUUID;
_allSubscribers[senderUUID] = 0;
}
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
auto senderUUID = senderNode->getUUID();
QString channel = QString::fromUtf8(message->getMessage());
if (_channelSubscribers.contains(channel)) {
_channelSubscribers[channel].remove(senderNode->getUUID());
_channelSubscribers[channel].remove(senderUUID);
}
if (_allSubscribers.contains(senderNode->getUUID())) {
_allSubscribers.remove(senderNode->getUUID());
bool isSenderSubscribed = false;
QList<QSet<QUuid>> allChannels = _channelSubscribers.values();
foreach (const QSet<QUuid> channel, allChannels) {
if (channel.contains(senderUUID)) {
isSenderSubscribed = true;
}
}
if (!isSenderSubscribed && _allSubscribers.contains(senderUUID)) {
_allSubscribers.remove(senderUUID);
}
}
@ -114,8 +128,6 @@ void MessagesMixer::domainSettingsRequestComplete() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
DomainHandler& domainHandler = nodeList->getDomainHandler();
// parse the settings to pull out the values we need
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
}
@ -126,11 +138,7 @@ void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings)
const QString NODE_MESSAGES_PER_SECOND_KEY = "max_node_messages_per_second";
QJsonValue maxMessagesPerSecondValue = messagesMixerGroupObject.value(NODE_MESSAGES_PER_SECOND_KEY);
if (!maxMessagesPerSecondValue.isUndefined()) {
_maxMessagesPerSecond = maxMessagesPerSecondValue.toInt();
} else {
_maxMessagesPerSecond = DEFAULT_NODE_MESSAGES_PER_SECOND;
}
_maxMessagesPerSecond = maxMessagesPerSecondValue.toInt(DEFAULT_NODE_MESSAGES_PER_SECOND);
}
void MessagesMixer::processMaxMessagesContainer() {
@ -138,12 +146,13 @@ void MessagesMixer::processMaxMessagesContainer() {
}
void MessagesMixer::startMaxMessagesProcessor() {
maxMessagesPerSecondTimer = new QTimer();
connect(maxMessagesPerSecondTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer);
maxMessagesPerSecondTimer->start(1000); // Clear the container every second.
_maxMessagesTimer = new QTimer();
connect(_maxMessagesTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer);
_maxMessagesTimer->start(MESSAGES_MIXER_RATE_LIMITER_INTERVAL); // Clear the container every second.
}
void MessagesMixer::stopMaxMessagesProcessor() {
delete maxMessagesPerSecondTimer;
maxMessagesPerSecondTimer = NULL;
_maxMessagesTimer->stop();
_maxMessagesTimer->deleteLater();
_maxMessagesTimer = nullptr;
}

View file

@ -44,9 +44,9 @@ private:
QHash<QUuid, int> _allSubscribers;
const int DEFAULT_NODE_MESSAGES_PER_SECOND = 1000;
int _maxMessagesPerSecond{ 0 };
int _maxMessagesPerSecond { 0 };
QTimer* maxMessagesPerSecondTimer;
QTimer* _maxMessagesTimer;
};
#endif // hifi_MessagesMixer_h

View file

@ -1498,8 +1498,8 @@
{
"name": "max_node_messages_per_second",
"type": "int",
"label": "Per-Node Send Messages",
"help": "Desired maximum send messages (in messages per second) per node",
"label": "Maximum Message Rate",
"help": "Maximum message send rate (messages per second) per node",
"placeholder": 1000,
"default": 1000,
"advanced": true