mirror of
https://github.com/overte-org/overte.git
synced 2025-04-13 16:05:17 +02:00
Add messaging mixer rate limiting per node functionality.
This commit is contained in:
parent
14292a042b
commit
3a945f87dd
3 changed files with 86 additions and 1 deletions
|
@ -47,6 +47,13 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
|
||||||
MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID);
|
MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID);
|
||||||
|
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
|
if (_allSubscribers.value(senderNode->getUUID()) >= _maxMessagesPerSecond) {
|
||||||
|
// We will reject all messages that go over this limit for that second.
|
||||||
|
// FIXME: We will add logging options to offer analytics on this later.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_allSubscribers[senderNode->getUUID()] = _allSubscribers.value(senderNode->getUUID()) + 1;
|
||||||
|
|
||||||
nodeList->eachMatchingNode(
|
nodeList->eachMatchingNode(
|
||||||
[&](const SharedNodePointer& node)->bool {
|
[&](const SharedNodePointer& node)->bool {
|
||||||
|
@ -62,6 +69,8 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
|
||||||
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
||||||
QString channel = QString::fromUtf8(message->getMessage());
|
QString channel = QString::fromUtf8(message->getMessage());
|
||||||
_channelSubscribers[channel] << senderNode->getUUID();
|
_channelSubscribers[channel] << senderNode->getUUID();
|
||||||
|
|
||||||
|
_allSubscribers[senderNode->getUUID()] << 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
||||||
|
@ -69,6 +78,10 @@ void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> me
|
||||||
if (_channelSubscribers.contains(channel)) {
|
if (_channelSubscribers.contains(channel)) {
|
||||||
_channelSubscribers[channel].remove(senderNode->getUUID());
|
_channelSubscribers[channel].remove(senderNode->getUUID());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (_allSubscribers.contains(senderNode->getUUID())) {
|
||||||
|
_allSubscribers.remove(senderNode->getUUID());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessagesMixer::sendStatsPacket() {
|
void MessagesMixer::sendStatsPacket() {
|
||||||
|
@ -88,7 +101,49 @@ void MessagesMixer::sendStatsPacket() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void MessagesMixer::run() {
|
void MessagesMixer::run() {
|
||||||
|
// wait until we have the domain-server settings, otherwise we bail
|
||||||
|
DomainHandler& domainHandler = DependencyManager::get<NodeList>()->getDomainHandler();
|
||||||
|
connect(&domainHandler, &DomainHandler::settingsReceived, this, &MessagesMixer::domainSettingsRequestComplete);
|
||||||
|
|
||||||
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
|
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
|
||||||
|
|
||||||
|
startMaxMessagesProcessor();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessagesMixer::domainSettingsRequestComplete() {
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
|
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
|
||||||
|
|
||||||
|
DomainHandler& domainHandler = nodeList->getDomainHandler();
|
||||||
|
|
||||||
|
// parse the settings to pull out the values we need
|
||||||
|
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
|
||||||
|
const QString MESSAGES_MIXER_SETTINGS_KEY = "messages_mixer";
|
||||||
|
QJsonObject messagesMixerGroupObject = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject();
|
||||||
|
|
||||||
|
const QString NODE_MESSAGES_PER_SECOND_KEY = "max_node_messages_per_second";
|
||||||
|
QJsonValue maxMessagesPerSecondValue = messagesMixerGroupObject.value(NODE_MESSAGES_PER_SECOND_KEY);
|
||||||
|
if (!maxMessagesPerSecondValue.isUndefined()) {
|
||||||
|
_maxMessagesPerSecond = maxMessagesPerSecondValue.toInt();
|
||||||
|
} else {
|
||||||
|
_maxMessagesPerSecond = DEFAULT_NODE_MESSAGES_PER_SECOND;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessagesMixer::processMaxMessagesContainer() {
|
||||||
|
_allSubscribers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessagesMixer::startMaxMessagesProcessor() {
|
||||||
|
maxMessagesPerSecondTimer = new QTimer();
|
||||||
|
connect(maxMessagesPerSecondTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer);
|
||||||
|
maxMessagesPerSecondTimer->start(1000); // Clear the container every second.
|
||||||
|
}
|
||||||
|
|
||||||
|
void MessagesMixer::stopMaxMessagesProcessor() {
|
||||||
|
delete maxMessagesPerSecondTimer;
|
||||||
|
maxMessagesPerSecondTimer = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,9 +32,21 @@ private slots:
|
||||||
void handleMessages(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
void handleMessages(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||||
void handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
void handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||||
void handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
void handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||||
|
void parseDomainServerSettings(const QJsonObject& domainSettings);
|
||||||
|
void domainSettingsRequestComplete();
|
||||||
|
|
||||||
|
void startMaxMessagesProcessor();
|
||||||
|
void stopMaxMessagesProcessor();
|
||||||
|
void processMaxMessagesContainer();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
QHash<QString,QSet<QUuid>> _channelSubscribers;
|
QHash<QString, QSet<QUuid>> _channelSubscribers;
|
||||||
|
QHash<QUuid, int> _allSubscribers;
|
||||||
|
|
||||||
|
const int DEFAULT_NODE_MESSAGES_PER_SECOND = 1000;
|
||||||
|
int _maxMessagesPerSecond{ 0 };
|
||||||
|
|
||||||
|
QTimer* maxMessagesPerSecondTimer;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif // hifi_MessagesMixer_h
|
#endif // hifi_MessagesMixer_h
|
||||||
|
|
|
@ -1488,6 +1488,24 @@
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "messages_mixer",
|
||||||
|
"label": "Messages Mixer",
|
||||||
|
"assignment-types": [
|
||||||
|
4
|
||||||
|
],
|
||||||
|
"settings": [
|
||||||
|
{
|
||||||
|
"name": "max_node_messages_per_second",
|
||||||
|
"type": "int",
|
||||||
|
"label": "Per-Node Send Messages",
|
||||||
|
"help": "Desired maximum send messages (in messages per second) per node",
|
||||||
|
"placeholder": 1000,
|
||||||
|
"default": 1000,
|
||||||
|
"advanced": true
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name": "entity_server_settings",
|
"name": "entity_server_settings",
|
||||||
"label": "Entities",
|
"label": "Entities",
|
||||||
|
|
Loading…
Reference in a new issue