From abf169ebd9fe7328883efc07a6af6dad7e599ced Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Fri, 20 Nov 2015 09:03:21 -0800 Subject: [PATCH 1/5] fix senderID in messages, dry up code --- .../src/messages/MessagesMixer.cpp | 43 +++------------ libraries/networking/src/MessagesClient.cpp | 55 ++++++++++++------- libraries/networking/src/MessagesClient.h | 12 ++-- 3 files changed, 51 insertions(+), 59 deletions(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index d3662f3fb5..a811631617 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -24,9 +25,7 @@ const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet) { - // make sure we hear about node kills so we can tell the other nodes connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); - auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessages"); packetReceiver.registerMessageListener(PacketType::MessagesSubscribe, this, "handleMessagesSubscribe"); @@ -45,42 +44,20 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { void MessagesMixer::handleMessages(QSharedPointer packetList, SharedNodePointer senderNode) { Q_ASSERT(packetList->getType() == PacketType::MessagesData); - QByteArray packetData = packetList->getMessage(); - QBuffer packet{ &packetData }; - packet.open(QIODevice::ReadOnly); + QString channel, message; + QUuid senderID; + MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID); + Q_ASSERT(senderNode->getUUID() == senderID); // NOTE: do we want to reject messages that come from bogus senders - quint16 channelLength; - packet.read(reinterpret_cast(&channelLength), sizeof(channelLength)); - auto channelData = packet.read(channelLength); - QString channel = QString::fromUtf8(channelData); - - quint16 messageLength; - packet.read(reinterpret_cast(&messageLength), sizeof(messageLength)); - auto messageData = packet.read(messageLength); - QString message = QString::fromUtf8(messageData); - auto nodeList = DependencyManager::get(); nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { - return node->getType() == NodeType::Agent && node->getActiveSocket() && _channelSubscribers[channel].contains(node->getUUID()); }, [&](const SharedNodePointer& node) { - - auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - - auto channelUtf8 = channel.toUtf8(); - quint16 channelLength = channelUtf8.length(); - packetList->writePrimitive(channelLength); - packetList->write(channelUtf8); - - auto messageUtf8 = message.toUtf8(); - quint16 messageLength = messageUtf8.length(); - packetList->writePrimitive(messageLength); - packetList->write(messageUtf8); - + auto packetList = MessagesClient::encodeMessagesPacket(channel, message, senderID); nodeList->sendPacketList(std::move(packetList), *node); }); } @@ -107,15 +84,13 @@ void MessagesMixer::sendStatsPacket() { QJsonObject statsObject; QJsonObject messagesObject; auto nodeList = DependencyManager::get(); + // add stats for each listerner nodeList->eachNode([&](const SharedNodePointer& node) { QJsonObject messagesStats; - - // add the key to ask the domain-server for a username replacement, if it has it messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); messagesStats["outbound_kbps"] = node->getOutboundBandwidth(); messagesStats["inbound_kbps"] = node->getInboundBandwidth(); - messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats; }); @@ -125,10 +100,6 @@ void MessagesMixer::sendStatsPacket() { void MessagesMixer::run() { ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); - auto nodeList = DependencyManager::get(); nodeList->addNodeTypeToInterestSet(NodeType::Agent); - - // The messages-mixer currently does currently have any domain settings. If it did, they would be - // synchronously grabbed here. } diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 2e8b3bbb09..fc1729b4c5 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -36,7 +36,7 @@ void MessagesClient::init() { } } -void MessagesClient::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { +void MessagesClient::decodeMessagesPacket(QSharedPointer packetList, QString& channel, QString& message, QUuid& senderID) { QByteArray packetData = packetList->getMessage(); QBuffer packet{ &packetData }; packet.open(QIODevice::ReadOnly); @@ -44,38 +44,55 @@ void MessagesClient::handleMessagesPacket(QSharedPointer packetLis quint16 channelLength; packet.read(reinterpret_cast(&channelLength), sizeof(channelLength)); auto channelData = packet.read(channelLength); - QString channel = QString::fromUtf8(channelData); + channel = QString::fromUtf8(channelData); quint16 messageLength; packet.read(reinterpret_cast(&messageLength), sizeof(messageLength)); auto messageData = packet.read(messageLength); - QString message = QString::fromUtf8(messageData); + message = QString::fromUtf8(messageData); - emit messageReceived(channel, message, senderNode->getUUID()); + QByteArray bytesSenderID = packet.read(NUM_BYTES_RFC4122_UUID); + senderID = QUuid::fromRfc4122(bytesSenderID); } -void MessagesClient::sendMessage(const QString& channel, const QString& message) { +std::unique_ptr MessagesClient::encodeMessagesPacket(QString channel, QString message, QUuid senderID) { + auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); + + auto channelUtf8 = channel.toUtf8(); + quint16 channelLength = channelUtf8.length(); + packetList->writePrimitive(channelLength); + packetList->write(channelUtf8); + + auto messageUtf8 = message.toUtf8(); + quint16 messageLength = messageUtf8.length(); + packetList->writePrimitive(messageLength); + packetList->write(messageUtf8); + + packetList->write(senderID.toRfc4122()); + + return packetList; +} + + +void MessagesClient::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { + QString channel, message; + QUuid senderID; + decodeMessagesPacket(packetList, channel, message, senderID); + emit messageReceived(channel, message, senderID); +} + +void MessagesClient::sendMessage(QString channel, QString message) { auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); if (messagesMixer) { - auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - - auto channelUtf8 = channel.toUtf8(); - quint16 channelLength = channelUtf8.length(); - packetList->writePrimitive(channelLength); - packetList->write(channelUtf8); - - auto messageUtf8 = message.toUtf8(); - quint16 messageLength = messageUtf8.length(); - packetList->writePrimitive(messageLength); - packetList->write(messageUtf8); - + QUuid senderID = nodeList->getSessionUUID(); + auto packetList = encodeMessagesPacket(channel, message, senderID); nodeList->sendPacketList(std::move(packetList), *messagesMixer); } } -void MessagesClient::subscribe(const QString& channel) { +void MessagesClient::subscribe(QString channel) { _subscribedChannels << channel; auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); @@ -87,7 +104,7 @@ void MessagesClient::subscribe(const QString& channel) { } } -void MessagesClient::unsubscribe(const QString& channel) { +void MessagesClient::unsubscribe(QString channel) { _subscribedChannels.remove(channel); auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index a1ae4cb5ba..d5d94f62c4 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -28,12 +28,16 @@ public: Q_INVOKABLE void init(); - Q_INVOKABLE void sendMessage(const QString& channel, const QString& message); - Q_INVOKABLE void subscribe(const QString& channel); - Q_INVOKABLE void unsubscribe(const QString& channel); + Q_INVOKABLE void sendMessage(QString channel, QString message); + Q_INVOKABLE void subscribe(QString channel); + Q_INVOKABLE void unsubscribe(QString channel); + + static void decodeMessagesPacket(QSharedPointer packetList, QString& channel, QString& message, QUuid& senderID); + static std::unique_ptr encodeMessagesPacket(QString channel, QString message, QUuid senderID); + signals: - void messageReceived(const QString& channel, const QString& message, const QUuid& senderUUID); + void messageReceived(QString channel, QString message, QUuid senderUUID); private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); From 0e5e33446e0fcf26d746c6b0734cfc0c1c85614e Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Fri, 20 Nov 2015 09:57:30 -0800 Subject: [PATCH 2/5] handle old protocol case --- libraries/networking/src/MessagesClient.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index fc1729b4c5..95b61cf8d7 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -52,7 +52,11 @@ void MessagesClient::decodeMessagesPacket(QSharedPointer packetLis message = QString::fromUtf8(messageData); QByteArray bytesSenderID = packet.read(NUM_BYTES_RFC4122_UUID); - senderID = QUuid::fromRfc4122(bytesSenderID); + if (bytesSenderID.length() == NUM_BYTES_RFC4122_UUID) { + senderID = QUuid::fromRfc4122(bytesSenderID); + } else { + senderID = QUuid::QUuid(); // packet was missing UUID use default instead + } } std::unique_ptr MessagesClient::encodeMessagesPacket(QString channel, QString message, QUuid senderID) { From e530aa6545b925f35e1651a9c5def71f9f0bc38d Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Fri, 20 Nov 2015 09:58:28 -0800 Subject: [PATCH 3/5] don't assert on sender mismatch --- assignment-client/src/messages/MessagesMixer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index a811631617..684c5f262c 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -47,7 +47,6 @@ void MessagesMixer::handleMessages(QSharedPointer packetList, Shar QString channel, message; QUuid senderID; MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID); - Q_ASSERT(senderNode->getUUID() == senderID); // NOTE: do we want to reject messages that come from bogus senders auto nodeList = DependencyManager::get(); From caa8b0b5b6ff02dca0a7d71bd10cbeb26ccbdc1d Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Fri, 20 Nov 2015 11:07:01 -0800 Subject: [PATCH 4/5] fix unix build --- libraries/networking/src/MessagesClient.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 95b61cf8d7..ef8ecb534c 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -55,7 +55,8 @@ void MessagesClient::decodeMessagesPacket(QSharedPointer packetLis if (bytesSenderID.length() == NUM_BYTES_RFC4122_UUID) { senderID = QUuid::fromRfc4122(bytesSenderID); } else { - senderID = QUuid::QUuid(); // packet was missing UUID use default instead + QUuid emptyUUID; + senderID = emptyUUID; // packet was missing UUID use default instead } } From 7441e20f5842ab2b7d1fe8ff175a05feafd41d5d Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Fri, 20 Nov 2015 11:20:36 -0800 Subject: [PATCH 5/5] cleanup --- .../src/messages/MessagesMixer.cpp | 39 ++++++------------- .../src/messages/MessagesMixer.h | 1 - 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index 684c5f262c..8f2b8a5475 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -12,18 +12,15 @@ #include #include #include - #include #include #include #include - #include "MessagesMixer.h" const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; -MessagesMixer::MessagesMixer(NLPacket& packet) : - ThreadedAssignment(packet) +MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet) { connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); @@ -32,9 +29,6 @@ MessagesMixer::MessagesMixer(NLPacket& packet) : packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe"); } -MessagesMixer::~MessagesMixer() { -} - void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { for (auto& channel : _channelSubscribers) { channel.remove(killedNode->getUUID()); @@ -42,8 +36,6 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { } void MessagesMixer::handleMessages(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesData); - QString channel, message; QUuid senderID; MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID); @@ -62,43 +54,34 @@ void MessagesMixer::handleMessages(QSharedPointer packetList, Shar } void MessagesMixer::handleMessagesSubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesSubscribe); QString channel = QString::fromUtf8(packetList->getMessage()); - qDebug() << "Node [" << senderNode->getUUID() << "] subscribed to channel:" << channel; _channelSubscribers[channel] << senderNode->getUUID(); } void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesUnsubscribe); QString channel = QString::fromUtf8(packetList->getMessage()); - qDebug() << "Node [" << senderNode->getUUID() << "] unsubscribed from channel:" << channel; - if (_channelSubscribers.contains(channel)) { _channelSubscribers[channel].remove(senderNode->getUUID()); } } -// FIXME - make these stats relevant void MessagesMixer::sendStatsPacket() { - QJsonObject statsObject; - QJsonObject messagesObject; - auto nodeList = DependencyManager::get(); + QJsonObject statsObject, messagesMixerObject; // add stats for each listerner - nodeList->eachNode([&](const SharedNodePointer& node) { - QJsonObject messagesStats; - messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); - messagesStats["outbound_kbps"] = node->getOutboundBandwidth(); - messagesStats["inbound_kbps"] = node->getInboundBandwidth(); - messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats; + DependencyManager::get()->eachNode([&](const SharedNodePointer& node) { + QJsonObject clientStats; + clientStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); + clientStats["outbound_kbps"] = node->getOutboundBandwidth(); + clientStats["inbound_kbps"] = node->getInboundBandwidth(); + messagesMixerObject[uuidStringWithoutCurlyBraces(node->getUUID())] = clientStats; }); - statsObject["messages"] = messagesObject; + statsObject["messages"] = messagesMixerObject; ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); } void MessagesMixer::run() { ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); - auto nodeList = DependencyManager::get(); - nodeList->addNodeTypeToInterestSet(NodeType::Agent); -} + DependencyManager::get()->addNodeTypeToInterestSet(NodeType::Agent); +} \ No newline at end of file diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index 65419a8ca6..cf5fc79e17 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -22,7 +22,6 @@ class MessagesMixer : public ThreadedAssignment { Q_OBJECT public: MessagesMixer(NLPacket& packet); - ~MessagesMixer(); public slots: void run();