mirror of
https://github.com/lubosz/overte.git
synced 2025-04-14 10:08:20 +02:00
Merge pull request #1084 from digisomni/feature/messaging-mixer-rate-limiting
Add messaging mixer rate limiting per node functionality.
This commit is contained in:
commit
75ca7d86f9
3 changed files with 90 additions and 4 deletions
|
@ -20,6 +20,7 @@
|
|||
#include <udt/PacketHeaders.h>
|
||||
|
||||
const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer";
|
||||
const int MESSAGES_MIXER_RATE_LIMITER_INTERVAL = 1000; // 1 second
|
||||
|
||||
MessagesMixer::MessagesMixer(ReceivedMessage& message) : ThreadedAssignment(message)
|
||||
{
|
||||
|
@ -44,10 +45,20 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
|
|||
QByteArray data;
|
||||
QUuid senderID;
|
||||
bool isText;
|
||||
auto senderUUID = senderNode->getUUID();
|
||||
MessagesClient::decodeMessagesPacket(receivedMessage, channel, isText, message, data, senderID);
|
||||
|
||||
auto nodeList = DependencyManager::get<NodeList>();
|
||||
|
||||
auto itr = _allSubscribers.find(senderUUID);
|
||||
if (itr == _allSubscribers.end()) {
|
||||
_allSubscribers[senderUUID] = 1;
|
||||
} else if (*itr >= _maxMessagesPerSecond) {
|
||||
return;
|
||||
} else {
|
||||
*itr += 1;
|
||||
}
|
||||
|
||||
nodeList->eachMatchingNode(
|
||||
[&](const SharedNodePointer& node)->bool {
|
||||
return node->getActiveSocket() && _channelSubscribers[channel].contains(node->getUUID());
|
||||
|
@ -60,14 +71,18 @@ void MessagesMixer::handleMessages(QSharedPointer<ReceivedMessage> receivedMessa
|
|||
}
|
||||
|
||||
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
||||
auto senderUUID = senderNode->getUUID();
|
||||
QString channel = QString::fromUtf8(message->getMessage());
|
||||
_channelSubscribers[channel] << senderNode->getUUID();
|
||||
|
||||
_channelSubscribers[channel] << senderUUID;
|
||||
}
|
||||
|
||||
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
|
||||
auto senderUUID = senderNode->getUUID();
|
||||
QString channel = QString::fromUtf8(message->getMessage());
|
||||
|
||||
if (_channelSubscribers.contains(channel)) {
|
||||
_channelSubscribers[channel].remove(senderNode->getUUID());
|
||||
_channelSubscribers[channel].remove(senderUUID);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,7 +103,48 @@ void MessagesMixer::sendStatsPacket() {
|
|||
}
|
||||
|
||||
void MessagesMixer::run() {
|
||||
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
|
||||
auto nodeList = DependencyManager::get<NodeList>();
|
||||
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
|
||||
DomainHandler& domainHandler = nodeList->getDomainHandler();
|
||||
connect(&domainHandler, &DomainHandler::settingsReceived, this, &MessagesMixer::domainSettingsRequestComplete);
|
||||
|
||||
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
|
||||
|
||||
startMaxMessagesProcessor();
|
||||
}
|
||||
|
||||
void MessagesMixer::domainSettingsRequestComplete() {
|
||||
auto nodeList = DependencyManager::get<NodeList>();
|
||||
|
||||
// parse the settings to pull out the values we need
|
||||
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
|
||||
}
|
||||
|
||||
void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
|
||||
const QString MESSAGES_MIXER_SETTINGS_KEY = "messages_mixer";
|
||||
QJsonObject messagesMixerGroupObject = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject();
|
||||
|
||||
const QString NODE_MESSAGES_PER_SECOND_KEY = "max_node_messages_per_second";
|
||||
QJsonValue maxMessagesPerSecondValue = messagesMixerGroupObject.value(NODE_MESSAGES_PER_SECOND_KEY);
|
||||
_maxMessagesPerSecond = maxMessagesPerSecondValue.toInt(DEFAULT_NODE_MESSAGES_PER_SECOND);
|
||||
}
|
||||
|
||||
void MessagesMixer::processMaxMessagesContainer() {
|
||||
_allSubscribers.clear();
|
||||
}
|
||||
|
||||
void MessagesMixer::startMaxMessagesProcessor() {
|
||||
if (_maxMessagesTimer) {
|
||||
stopMaxMessagesProcessor();
|
||||
}
|
||||
|
||||
_maxMessagesTimer = new QTimer();
|
||||
connect(_maxMessagesTimer, &QTimer::timeout, this, &MessagesMixer::processMaxMessagesContainer);
|
||||
_maxMessagesTimer->start(MESSAGES_MIXER_RATE_LIMITER_INTERVAL); // Clear the container every second.
|
||||
}
|
||||
|
||||
void MessagesMixer::stopMaxMessagesProcessor() {
|
||||
_maxMessagesTimer->stop();
|
||||
_maxMessagesTimer->deleteLater();
|
||||
_maxMessagesTimer = nullptr;
|
||||
}
|
||||
|
|
|
@ -32,9 +32,21 @@ private slots:
|
|||
void handleMessages(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||
void handleMessagesSubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||
void handleMessagesUnsubscribe(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
|
||||
void parseDomainServerSettings(const QJsonObject& domainSettings);
|
||||
void domainSettingsRequestComplete();
|
||||
|
||||
void startMaxMessagesProcessor();
|
||||
void stopMaxMessagesProcessor();
|
||||
void processMaxMessagesContainer();
|
||||
|
||||
private:
|
||||
QHash<QString,QSet<QUuid>> _channelSubscribers;
|
||||
QHash<QString, QSet<QUuid>> _channelSubscribers;
|
||||
QHash<QUuid, int> _allSubscribers;
|
||||
|
||||
const int DEFAULT_NODE_MESSAGES_PER_SECOND = 1000;
|
||||
int _maxMessagesPerSecond { 0 };
|
||||
|
||||
QTimer* _maxMessagesTimer { nullptr };
|
||||
};
|
||||
|
||||
#endif // hifi_MessagesMixer_h
|
||||
|
|
|
@ -1488,6 +1488,24 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "messages_mixer",
|
||||
"label": "Messages Mixer",
|
||||
"assignment-types": [
|
||||
4
|
||||
],
|
||||
"settings": [
|
||||
{
|
||||
"name": "max_node_messages_per_second",
|
||||
"type": "int",
|
||||
"label": "Maximum Message Rate",
|
||||
"help": "Maximum message send rate (messages per second) per node",
|
||||
"placeholder": 1000,
|
||||
"default": 1000,
|
||||
"advanced": true
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "entity_server_settings",
|
||||
"label": "Entities",
|
||||
|
|
Loading…
Reference in a new issue