diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index d3662f3fb5..8f2b8a5475 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -12,30 +12,23 @@ #include #include #include - #include +#include #include #include - #include "MessagesMixer.h" const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; -MessagesMixer::MessagesMixer(NLPacket& packet) : - ThreadedAssignment(packet) +MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet) { - // make sure we hear about node kills so we can tell the other nodes connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); - auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessages"); packetReceiver.registerMessageListener(PacketType::MessagesSubscribe, this, "handleMessagesSubscribe"); packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe"); } -MessagesMixer::~MessagesMixer() { -} - void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { for (auto& channel : _channelSubscribers) { channel.remove(killedNode->getUUID()); @@ -43,92 +36,52 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { } void MessagesMixer::handleMessages(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesData); + QString channel, message; + QUuid senderID; + MessagesClient::decodeMessagesPacket(packetList, channel, message, senderID); - QByteArray packetData = packetList->getMessage(); - QBuffer packet{ &packetData }; - packet.open(QIODevice::ReadOnly); - - quint16 channelLength; - packet.read(reinterpret_cast(&channelLength), sizeof(channelLength)); - auto channelData = packet.read(channelLength); - QString channel = QString::fromUtf8(channelData); - - quint16 messageLength; - packet.read(reinterpret_cast(&messageLength), sizeof(messageLength)); - auto messageData = packet.read(messageLength); - QString message = QString::fromUtf8(messageData); - auto nodeList = DependencyManager::get(); nodeList->eachMatchingNode( [&](const SharedNodePointer& node)->bool { - return node->getType() == NodeType::Agent && node->getActiveSocket() && _channelSubscribers[channel].contains(node->getUUID()); }, [&](const SharedNodePointer& node) { - - auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - - auto channelUtf8 = channel.toUtf8(); - quint16 channelLength = channelUtf8.length(); - packetList->writePrimitive(channelLength); - packetList->write(channelUtf8); - - auto messageUtf8 = message.toUtf8(); - quint16 messageLength = messageUtf8.length(); - packetList->writePrimitive(messageLength); - packetList->write(messageUtf8); - + auto packetList = MessagesClient::encodeMessagesPacket(channel, message, senderID); nodeList->sendPacketList(std::move(packetList), *node); }); } void MessagesMixer::handleMessagesSubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesSubscribe); QString channel = QString::fromUtf8(packetList->getMessage()); - qDebug() << "Node [" << senderNode->getUUID() << "] subscribed to channel:" << channel; _channelSubscribers[channel] << senderNode->getUUID(); } void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { - Q_ASSERT(packetList->getType() == PacketType::MessagesUnsubscribe); QString channel = QString::fromUtf8(packetList->getMessage()); - qDebug() << "Node [" << senderNode->getUUID() << "] unsubscribed from channel:" << channel; - if (_channelSubscribers.contains(channel)) { _channelSubscribers[channel].remove(senderNode->getUUID()); } } -// FIXME - make these stats relevant void MessagesMixer::sendStatsPacket() { - QJsonObject statsObject; - QJsonObject messagesObject; - auto nodeList = DependencyManager::get(); + QJsonObject statsObject, messagesMixerObject; + // add stats for each listerner - nodeList->eachNode([&](const SharedNodePointer& node) { - QJsonObject messagesStats; - - // add the key to ask the domain-server for a username replacement, if it has it - messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); - messagesStats["outbound_kbps"] = node->getOutboundBandwidth(); - messagesStats["inbound_kbps"] = node->getInboundBandwidth(); - - messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats; + DependencyManager::get()->eachNode([&](const SharedNodePointer& node) { + QJsonObject clientStats; + clientStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID()); + clientStats["outbound_kbps"] = node->getOutboundBandwidth(); + clientStats["inbound_kbps"] = node->getInboundBandwidth(); + messagesMixerObject[uuidStringWithoutCurlyBraces(node->getUUID())] = clientStats; }); - statsObject["messages"] = messagesObject; + statsObject["messages"] = messagesMixerObject; ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject); } void MessagesMixer::run() { ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer); - - auto nodeList = DependencyManager::get(); - nodeList->addNodeTypeToInterestSet(NodeType::Agent); - - // The messages-mixer currently does currently have any domain settings. If it did, they would be - // synchronously grabbed here. -} + DependencyManager::get()->addNodeTypeToInterestSet(NodeType::Agent); +} \ No newline at end of file diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index 65419a8ca6..cf5fc79e17 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -22,7 +22,6 @@ class MessagesMixer : public ThreadedAssignment { Q_OBJECT public: MessagesMixer(NLPacket& packet); - ~MessagesMixer(); public slots: void run(); diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 2e8b3bbb09..ef8ecb534c 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -36,7 +36,7 @@ void MessagesClient::init() { } } -void MessagesClient::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { +void MessagesClient::decodeMessagesPacket(QSharedPointer packetList, QString& channel, QString& message, QUuid& senderID) { QByteArray packetData = packetList->getMessage(); QBuffer packet{ &packetData }; packet.open(QIODevice::ReadOnly); @@ -44,38 +44,60 @@ void MessagesClient::handleMessagesPacket(QSharedPointer packetLis quint16 channelLength; packet.read(reinterpret_cast(&channelLength), sizeof(channelLength)); auto channelData = packet.read(channelLength); - QString channel = QString::fromUtf8(channelData); + channel = QString::fromUtf8(channelData); quint16 messageLength; packet.read(reinterpret_cast(&messageLength), sizeof(messageLength)); auto messageData = packet.read(messageLength); - QString message = QString::fromUtf8(messageData); + message = QString::fromUtf8(messageData); - emit messageReceived(channel, message, senderNode->getUUID()); + QByteArray bytesSenderID = packet.read(NUM_BYTES_RFC4122_UUID); + if (bytesSenderID.length() == NUM_BYTES_RFC4122_UUID) { + senderID = QUuid::fromRfc4122(bytesSenderID); + } else { + QUuid emptyUUID; + senderID = emptyUUID; // packet was missing UUID use default instead + } } -void MessagesClient::sendMessage(const QString& channel, const QString& message) { +std::unique_ptr MessagesClient::encodeMessagesPacket(QString channel, QString message, QUuid senderID) { + auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); + + auto channelUtf8 = channel.toUtf8(); + quint16 channelLength = channelUtf8.length(); + packetList->writePrimitive(channelLength); + packetList->write(channelUtf8); + + auto messageUtf8 = message.toUtf8(); + quint16 messageLength = messageUtf8.length(); + packetList->writePrimitive(messageLength); + packetList->write(messageUtf8); + + packetList->write(senderID.toRfc4122()); + + return packetList; +} + + +void MessagesClient::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { + QString channel, message; + QUuid senderID; + decodeMessagesPacket(packetList, channel, message, senderID); + emit messageReceived(channel, message, senderID); +} + +void MessagesClient::sendMessage(QString channel, QString message) { auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); if (messagesMixer) { - auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - - auto channelUtf8 = channel.toUtf8(); - quint16 channelLength = channelUtf8.length(); - packetList->writePrimitive(channelLength); - packetList->write(channelUtf8); - - auto messageUtf8 = message.toUtf8(); - quint16 messageLength = messageUtf8.length(); - packetList->writePrimitive(messageLength); - packetList->write(messageUtf8); - + QUuid senderID = nodeList->getSessionUUID(); + auto packetList = encodeMessagesPacket(channel, message, senderID); nodeList->sendPacketList(std::move(packetList), *messagesMixer); } } -void MessagesClient::subscribe(const QString& channel) { +void MessagesClient::subscribe(QString channel) { _subscribedChannels << channel; auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); @@ -87,7 +109,7 @@ void MessagesClient::subscribe(const QString& channel) { } } -void MessagesClient::unsubscribe(const QString& channel) { +void MessagesClient::unsubscribe(QString channel) { _subscribedChannels.remove(channel); auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index a1ae4cb5ba..d5d94f62c4 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -28,12 +28,16 @@ public: Q_INVOKABLE void init(); - Q_INVOKABLE void sendMessage(const QString& channel, const QString& message); - Q_INVOKABLE void subscribe(const QString& channel); - Q_INVOKABLE void unsubscribe(const QString& channel); + Q_INVOKABLE void sendMessage(QString channel, QString message); + Q_INVOKABLE void subscribe(QString channel); + Q_INVOKABLE void unsubscribe(QString channel); + + static void decodeMessagesPacket(QSharedPointer packetList, QString& channel, QString& message, QUuid& senderID); + static std::unique_ptr encodeMessagesPacket(QString channel, QString message, QUuid senderID); + signals: - void messageReceived(const QString& channel, const QString& message, const QUuid& senderUUID); + void messageReceived(QString channel, QString message, QUuid senderUUID); private slots: void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode);