Merge pull request #6448 from ZappoMan/messaging

fix senderID in messages, dry up code
This commit is contained in:
Brad Davis 2015-11-20 11:42:30 -08:00
commit a356f96a7b
4 changed files with 66 additions and 88 deletions

View file

@ -12,30 +12,23 @@
#include <QtCore/QCoreApplication> #include <QtCore/QCoreApplication>
#include <QtCore/QJsonObject> #include <QtCore/QJsonObject>
#include <QBuffer> #include <QBuffer>
#include <LogHandler.h> #include <LogHandler.h>
#include <MessagesClient.h>
#include <NodeList.h> #include <NodeList.h>
#include <udt/PacketHeaders.h> #include <udt/PacketHeaders.h>
#include "MessagesMixer.h" #include "MessagesMixer.h"
const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer";
MessagesMixer::MessagesMixer(NLPacket& packet) : MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet)
ThreadedAssignment(packet)
{ {
// make sure we hear about node kills so we can tell the other nodes
connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver(); auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessages"); packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessages");
packetReceiver.registerMessageListener(PacketType::MessagesSubscribe, this, "handleMessagesSubscribe"); packetReceiver.registerMessageListener(PacketType::MessagesSubscribe, this, "handleMessagesSubscribe");
packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe"); packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe");
} }
MessagesMixer::~MessagesMixer() {
}
void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
for (auto& channel : _channelSubscribers) { for (auto& channel : _channelSubscribers) {
channel.remove(killedNode->getUUID()); channel.remove(killedNode->getUUID());
@ -43,92 +36,52 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
} }
void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessages(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesData); QString channel, message;
QUuid senderID;
MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID);
QByteArray packetData = packetList->getMessage();
QBuffer packet{ &packetData };
packet.open(QIODevice::ReadOnly);
quint16 channelLength;
packet.read(reinterpret_cast<char*>(&channelLength), sizeof(channelLength));
auto channelData = packet.read(channelLength);
QString channel = QString::fromUtf8(channelData);
quint16 messageLength;
packet.read(reinterpret_cast<char*>(&messageLength), sizeof(messageLength));
auto messageData = packet.read(messageLength);
QString message = QString::fromUtf8(messageData);
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachMatchingNode( nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool { [&](const SharedNodePointer& node)->bool {
return node->getType() == NodeType::Agent && node->getActiveSocket() && return node->getType() == NodeType::Agent && node->getActiveSocket() &&
_channelSubscribers[channel].contains(node->getUUID()); _channelSubscribers[channel].contains(node->getUUID());
}, },
[&](const SharedNodePointer& node) { [&](const SharedNodePointer& node) {
auto packetList = MessagesClient::encodeMessagesPacket(channel, message, senderID);
auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true);
auto channelUtf8 = channel.toUtf8();
quint16 channelLength = channelUtf8.length();
packetList->writePrimitive(channelLength);
packetList->write(channelUtf8);
auto messageUtf8 = message.toUtf8();
quint16 messageLength = messageUtf8.length();
packetList->writePrimitive(messageLength);
packetList->write(messageUtf8);
nodeList->sendPacketList(std::move(packetList), *node); nodeList->sendPacketList(std::move(packetList), *node);
}); });
} }
void MessagesMixer::handleMessagesSubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessagesSubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesSubscribe);
QString channel = QString::fromUtf8(packetList->getMessage()); QString channel = QString::fromUtf8(packetList->getMessage());
qDebug() << "Node [" << senderNode->getUUID() << "] subscribed to channel:" << channel;
_channelSubscribers[channel] << senderNode->getUUID(); _channelSubscribers[channel] << senderNode->getUUID();
} }
void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
Q_ASSERT(packetList->getType() == PacketType::MessagesUnsubscribe);
QString channel = QString::fromUtf8(packetList->getMessage()); QString channel = QString::fromUtf8(packetList->getMessage());
qDebug() << "Node [" << senderNode->getUUID() << "] unsubscribed from channel:" << channel;
if (_channelSubscribers.contains(channel)) { if (_channelSubscribers.contains(channel)) {
_channelSubscribers[channel].remove(senderNode->getUUID()); _channelSubscribers[channel].remove(senderNode->getUUID());
} }
} }
// FIXME - make these stats relevant
void MessagesMixer::sendStatsPacket() { void MessagesMixer::sendStatsPacket() {
QJsonObject statsObject; QJsonObject statsObject, messagesMixerObject;
QJsonObject messagesObject;
auto nodeList = DependencyManager::get<NodeList>();
// add stats for each listerner // add stats for each listerner
nodeList->eachNode([&](const SharedNodePointer& node) { DependencyManager::get<NodeList>()->eachNode([&](const SharedNodePointer& node) {
QJsonObject messagesStats; QJsonObject clientStats;
clientStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID());
// add the key to ask the domain-server for a username replacement, if it has it clientStats["outbound_kbps"] = node->getOutboundBandwidth();
messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); clientStats["inbound_kbps"] = node->getInboundBandwidth();
messagesStats["outbound_kbps"] = node->getOutboundBandwidth(); messagesMixerObject[uuidStringWithoutCurlyBraces(node->getUUID())] = clientStats;
messagesStats["inbound_kbps"] = node->getInboundBandwidth();
messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats;
}); });
statsObject["messages"] = messagesObject; statsObject["messages"] = messagesMixerObject;
ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject);
} }
void MessagesMixer::run() { void MessagesMixer::run() {
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
DependencyManager::get<NodeList>()->addNodeTypeToInterestSet(NodeType::Agent);
auto nodeList = DependencyManager::get<NodeList>(); }
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
// The messages-mixer currently does currently have any domain settings. If it did, they would be
// synchronously grabbed here.
}

View file

@ -22,7 +22,6 @@ class MessagesMixer : public ThreadedAssignment {
Q_OBJECT Q_OBJECT
public: public:
MessagesMixer(NLPacket& packet); MessagesMixer(NLPacket& packet);
~MessagesMixer();
public slots: public slots:
void run(); void run();

View file

@ -36,7 +36,7 @@ void MessagesClient::init() {
} }
} }
void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) { void MessagesClient::decodeMessagesPacket(QSharedPointer<NLPacketList> packetList, QString& channel, QString& message, QUuid& senderID) {
QByteArray packetData = packetList->getMessage(); QByteArray packetData = packetList->getMessage();
QBuffer packet{ &packetData }; QBuffer packet{ &packetData };
packet.open(QIODevice::ReadOnly); packet.open(QIODevice::ReadOnly);
@ -44,38 +44,60 @@ void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacketList> packetLis
quint16 channelLength; quint16 channelLength;
packet.read(reinterpret_cast<char*>(&channelLength), sizeof(channelLength)); packet.read(reinterpret_cast<char*>(&channelLength), sizeof(channelLength));
auto channelData = packet.read(channelLength); auto channelData = packet.read(channelLength);
QString channel = QString::fromUtf8(channelData); channel = QString::fromUtf8(channelData);
quint16 messageLength; quint16 messageLength;
packet.read(reinterpret_cast<char*>(&messageLength), sizeof(messageLength)); packet.read(reinterpret_cast<char*>(&messageLength), sizeof(messageLength));
auto messageData = packet.read(messageLength); auto messageData = packet.read(messageLength);
QString message = QString::fromUtf8(messageData); message = QString::fromUtf8(messageData);
emit messageReceived(channel, message, senderNode->getUUID()); QByteArray bytesSenderID = packet.read(NUM_BYTES_RFC4122_UUID);
if (bytesSenderID.length() == NUM_BYTES_RFC4122_UUID) {
senderID = QUuid::fromRfc4122(bytesSenderID);
} else {
QUuid emptyUUID;
senderID = emptyUUID; // packet was missing UUID use default instead
}
} }
void MessagesClient::sendMessage(const QString& channel, const QString& message) { std::unique_ptr<NLPacketList> MessagesClient::encodeMessagesPacket(QString channel, QString message, QUuid senderID) {
auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true);
auto channelUtf8 = channel.toUtf8();
quint16 channelLength = channelUtf8.length();
packetList->writePrimitive(channelLength);
packetList->write(channelUtf8);
auto messageUtf8 = message.toUtf8();
quint16 messageLength = messageUtf8.length();
packetList->writePrimitive(messageLength);
packetList->write(messageUtf8);
packetList->write(senderID.toRfc4122());
return packetList;
}
void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
QString channel, message;
QUuid senderID;
decodeMessagesPacket(packetList, channel, message, senderID);
emit messageReceived(channel, message, senderID);
}
void MessagesClient::sendMessage(QString channel, QString message) {
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
if (messagesMixer) { if (messagesMixer) {
auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); QUuid senderID = nodeList->getSessionUUID();
auto packetList = encodeMessagesPacket(channel, message, senderID);
auto channelUtf8 = channel.toUtf8();
quint16 channelLength = channelUtf8.length();
packetList->writePrimitive(channelLength);
packetList->write(channelUtf8);
auto messageUtf8 = message.toUtf8();
quint16 messageLength = messageUtf8.length();
packetList->writePrimitive(messageLength);
packetList->write(messageUtf8);
nodeList->sendPacketList(std::move(packetList), *messagesMixer); nodeList->sendPacketList(std::move(packetList), *messagesMixer);
} }
} }
void MessagesClient::subscribe(const QString& channel) { void MessagesClient::subscribe(QString channel) {
_subscribedChannels << channel; _subscribedChannels << channel;
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
@ -87,7 +109,7 @@ void MessagesClient::subscribe(const QString& channel) {
} }
} }
void MessagesClient::unsubscribe(const QString& channel) { void MessagesClient::unsubscribe(QString channel) {
_subscribedChannels.remove(channel); _subscribedChannels.remove(channel);
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);

View file

@ -28,12 +28,16 @@ public:
Q_INVOKABLE void init(); Q_INVOKABLE void init();
Q_INVOKABLE void sendMessage(const QString& channel, const QString& message); Q_INVOKABLE void sendMessage(QString channel, QString message);
Q_INVOKABLE void subscribe(const QString& channel); Q_INVOKABLE void subscribe(QString channel);
Q_INVOKABLE void unsubscribe(const QString& channel); Q_INVOKABLE void unsubscribe(QString channel);
static void decodeMessagesPacket(QSharedPointer<NLPacketList> packetList, QString& channel, QString& message, QUuid& senderID);
static std::unique_ptr<NLPacketList> encodeMessagesPacket(QString channel, QString message, QUuid senderID);
signals: signals:
void messageReceived(const QString& channel, const QString& message, const QUuid& senderUUID); void messageReceived(QString channel, QString message, QUuid senderUUID);
private slots: private slots:
void handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode); void handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);