This commit is contained in:
Brad Hefta-Gaub 2015-11-20 11:20:36 -08:00
parent caa8b0b5b6
commit 7441e20f58
2 changed files with 11 additions and 29 deletions

View file

@ -12,18 +12,15 @@
#include <QtCore/QCoreApplication> #include <QtCore/QCoreApplication>
#include <QtCore/QJsonObject> #include <QtCore/QJsonObject>
#include <QBuffer> #include <QBuffer>
#include <LogHandler.h> #include <LogHandler.h>
#include <MessagesClient.h> #include <MessagesClient.h>
#include <NodeList.h> #include <NodeList.h>
#include <udt/PacketHeaders.h> #include <udt/PacketHeaders.h>
#include "MessagesMixer.h" #include "MessagesMixer.h"
const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer";
MessagesMixer::MessagesMixer(NLPacket& packet) : MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet)
ThreadedAssignment(packet)
{ {
connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver(); auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
@ -32,9 +29,6 @@ MessagesMixer::MessagesMixer(NLPacket& packet) :
packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe"); packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe");
} }
MessagesMixer::~MessagesMixer() {
}
void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
for (auto& channel : _channelSubscribers) { for (auto& channel : _channelSubscribers) {
channel.remove(killedNode->getUUID()); channel.remove(killedNode->getUUID());
@ -42,8 +36,6 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
} }
void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesData);
QString channel, message; QString channel, message;
QUuid senderID; QUuid senderID;
MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID); MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID);
@ -62,43 +54,34 @@ void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, Shar
} }
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessagesSubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesSubscribe);
QString channel = QString::fromUtf8(packetList->getMessage()); QString channel = QString::fromUtf8(packetList->getMessage());
qDebug() << "Node [" << senderNode->getUUID() << "] subscribed to channel:" << channel;
_channelSubscribers[channel] << senderNode->getUUID(); _channelSubscribers[channel] << senderNode->getUUID();
} }
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesUnsubscribe);
QString channel = QString::fromUtf8(packetList->getMessage()); QString channel = QString::fromUtf8(packetList->getMessage());
qDebug() << "Node [" << senderNode->getUUID() << "] unsubscribed from channel:" << channel;
if (_channelSubscribers.contains(channel)) { if (_channelSubscribers.contains(channel)) {
_channelSubscribers[channel].remove(senderNode->getUUID()); _channelSubscribers[channel].remove(senderNode->getUUID());
} }
} }
// FIXME - make these stats relevant
void MessagesMixer::sendStatsPacket() { void MessagesMixer::sendStatsPacket() {
QJsonObject statsObject; QJsonObject statsObject, messagesMixerObject;
QJsonObject messagesObject;
auto nodeList = DependencyManager::get<NodeList>();
// add stats for each listerner // add stats for each listerner
nodeList->eachNode([&](const SharedNodePointer& node) { DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node) {
QJsonObject messagesStats; QJsonObject clientStats;
messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); clientStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID());
messagesStats["outbound_kbps"] = node->getOutboundBandwidth(); clientStats["outbound_kbps"] = node->getOutboundBandwidth();
messagesStats["inbound_kbps"] = node->getInboundBandwidth(); clientStats["inbound_kbps"] = node->getInboundBandwidth();
messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats; messagesMixerObject[uuidStringWithoutCurlyBraces(node->getUUID())] = clientStats;
}); });
statsObject["messages"] = messagesObject; statsObject["messages"] = messagesMixerObject;
ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject);
} }
void MessagesMixer::run() { void MessagesMixer::run() {
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
auto nodeList = DependencyManager::get<NodeList>(); DependencyManager::get<NodeList>()->addNodeTypeToInterestSet(NodeType::Agent);
nodeList->addNodeTypeToInterestSet(NodeType::Agent); }
}

View file

@ -22,7 +22,6 @@ class MessagesMixer : public ThreadedAssignment {
Q_OBJECT Q_OBJECT
public: public:
MessagesMixer(NLPacket& packet); MessagesMixer(NLPacket& packet);
~MessagesMixer();
public slots: public slots:
void run(); void run();