From 6b61ec569ca5203e5a3ede40ca12c56dd98f2fb4 Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Tue, 17 Nov 2015 09:47:50 -0800 Subject: [PATCH] more work on channels --- .../src/messages/MessagesMixer.cpp | 135 +++++++----------- .../src/messages/MessagesMixer.h | 21 ++- .../src/messages/MessagesMixerClientData.cpp | 55 ------- .../src/messages/MessagesMixerClientData.h | 105 -------------- libraries/networking/src/MessagesClient.cpp | 12 +- libraries/networking/src/udt/PacketHeaders.h | 4 +- 6 files changed, 73 insertions(+), 259 deletions(-) delete mode 100644 assignment-client/src/messages/MessagesMixerClientData.cpp delete mode 100644 assignment-client/src/messages/MessagesMixerClientData.h diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index d9cc772eb2..16b24088e2 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -25,7 +26,6 @@ #include #include -#include "MessagesMixerClientData.h" #include "MessagesMixer.h" const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer"; @@ -35,7 +35,6 @@ const unsigned int MESSAGES_DATA_SEND_INTERVAL_MSECS = (1.0f / (float) MESSAGES_ MessagesMixer::MessagesMixer(NLPacket& packet) : ThreadedAssignment(packet), - _broadcastThread(), _lastFrameTimestamp(QDateTime::currentMSecsSinceEpoch()), _trailingSleepRatio(1.0f), _performanceThrottlingRatio(0.0f), @@ -48,17 +47,12 @@ MessagesMixer::MessagesMixer(NLPacket& packet) : connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); - //packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesPacket"); - packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessagesPacketList"); + packetReceiver.registerMessageListener(PacketType::MessagesData, this, "handleMessages"); + packetReceiver.registerMessageListener(PacketType::MessagesSubscribe, this, "handleMessagesSubscribe"); + packetReceiver.registerMessageListener(PacketType::MessagesUnsubscribe, this, "handleMessagesUnsubscribe"); } MessagesMixer::~MessagesMixer() { - if (_broadcastTimer) { - _broadcastTimer->deleteLater(); - } - - _broadcastThread.quit(); - _broadcastThread.wait(); } // An 80% chance of sending a identity packet within a 5 second interval. @@ -93,52 +87,55 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { } } -void MessagesMixer::handleMessagesPacketList(QSharedPointer packetList, SharedNodePointer senderNode) { - qDebug() << "MessagesMixer::handleMessagesPacketList()... senderNode:" << senderNode->getUUID(); +void MessagesMixer::handleMessages(QSharedPointer packetList, SharedNodePointer senderNode) { + Q_ASSERT(packetList->getType() == PacketType::MessagesData); + qDebug() << "MessagesMixer::handleMessages()... senderNode:" << senderNode->getUUID(); + + QByteArray packetData = packetList->getMessage(); + QBuffer packet{ &packetData }; + packet.open(QIODevice::ReadOnly); + + quint16 channelLength; + packet.read(reinterpret_cast(&channelLength), sizeof(channelLength)); + auto channelData = packet.read(channelLength); + QString channel = QString::fromUtf8(channelData); + + quint16 messageLength; + packet.read(reinterpret_cast(&messageLength), sizeof(messageLength)); + auto messageData = packet.read(messageLength); + QString message = QString::fromUtf8(messageData); + + auto nodeList = DependencyManager::get(); - //nodeList->updateNodeWithDataFromPacket(packet, senderNode); - QByteArray data = packetList->getMessage(); - auto packetType = packetList->getType(); + qDebug() << "got a messages:" << message << "on channel:" << channel << "from node:" << senderNode->getUUID(); - if (packetType == PacketType::MessagesData) { - QString message = QString::fromUtf8(data); - qDebug() << "got a messages packet:" << message; + // this was an avatar we were sending to other people + // send a kill packet for it to our other nodes + //auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID); + //killPacket->write(killedNode->getUUID().toRfc4122()); + //nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); +} - // this was an avatar we were sending to other people - // send a kill packet for it to our other nodes - //auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID); - //killPacket->write(killedNode->getUUID().toRfc4122()); - //nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); +void MessagesMixer::handleMessagesSubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { + Q_ASSERT(packetList->getType() == PacketType::MessagesSubscribe); + QString channel = QString::fromUtf8(packetList->getMessage()); + qDebug() << "Node [" << senderNode->getUUID() << "] subscribed to channel:" << channel; + _channelSubscribers[channel] << senderNode->getUUID(); +} +void MessagesMixer::handleMessagesUnsubscribe(QSharedPointer packetList, SharedNodePointer senderNode) { + Q_ASSERT(packetList->getType() == PacketType::MessagesUnsubscribe); + QString channel = QString::fromUtf8(packetList->getMessage()); + qDebug() << "Node [" << senderNode->getUUID() << "] unsubscribed from channel:" << channel; + + if (_channelSubscribers.contains(channel)) { + _channelSubscribers[channel].remove(senderNode->getUUID()); } } -void MessagesMixer::handleMessagesPacket(QSharedPointer packet, SharedNodePointer sendingNode) { - qDebug() << "MessagesMixer::handleMessagesPacket()... senderNode:" << sendingNode->getUUID(); - - /* - auto nodeList = DependencyManager::get(); - //nodeList->updateNodeWithDataFromPacket(packet, senderNode); - - QByteArray data = packetList->getMessage(); - auto packetType = packetList->getType(); - - if (packetType == PacketType::MessagesData) { - QString message = QString::fromUtf8(data); - qDebug() << "got a messages packet:" << message; - - // this was an avatar we were sending to other people - // send a kill packet for it to our other nodes - //auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID); - //killPacket->write(killedNode->getUUID().toRfc4122()); - //nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); - - } - */ -} - +// FIXME - make these stats relevant void MessagesMixer::sendStatsPacket() { QJsonObject statsObject; statsObject["average_listeners_last_second"] = (float) _sumListeners / (float) _numStatFrames; @@ -160,18 +157,6 @@ void MessagesMixer::sendStatsPacket() { messagesStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundBandwidth(); messagesStats[NODE_INBOUND_KBPS_STAT_KEY] = node->getInboundBandwidth(); - MessagesMixerClientData* clientData = static_cast(node->getLinkedData()); - if (clientData) { - MutexTryLocker lock(clientData->getMutex()); - if (lock.isLocked()) { - clientData->loadJSONStats(messagesStats); - - // add the diff between the full outbound bandwidth and the measured bandwidth for AvatarData send only - messagesStats["delta_full_vs_avatar_data_kbps"] = - messagesStats[NODE_OUTBOUND_KBPS_STAT_KEY].toDouble() - messagesStats[OUTBOUND_MESSAGES_DATA_STATS_KEY].toDouble(); - } - } - messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats; }); @@ -192,27 +177,15 @@ void MessagesMixer::run() { nodeList->addNodeTypeToInterestSet(NodeType::Agent); nodeList->linkedDataCreateCallback = [] (Node* node) { - node->setLinkedData(new MessagesMixerClientData()); + // no need to link data }; - /* - // setup the timer that will be fired on the broadcast thread - _broadcastTimer = new QTimer; - _broadcastTimer->setInterval(MESSAGES_DATA_SEND_INTERVAL_MSECS); - _broadcastTimer->moveToThread(&_broadcastThread); - - // connect appropriate signals and slots - connect(_broadcastTimer, &QTimer::timeout, this, &MessagesMixer::broadcastMessagesData, Qt::DirectConnection); - connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start())); - */ - // wait until we have the domain-server settings, otherwise we bail DomainHandler& domainHandler = nodeList->getDomainHandler(); qDebug() << "Waiting for domain settings from domain-server."; // block until we get the settingsRequestComplete signal - QEventLoop loop; connect(&domainHandler, &DomainHandler::settingsReceived, &loop, &QEventLoop::quit); connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit); @@ -227,22 +200,16 @@ void MessagesMixer::run() { // parse the settings to pull out the values we need parseDomainServerSettings(domainHandler.getSettingsObject()); - - // start the broadcastThread - //_broadcastThread.start(); } void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { qDebug() << "MessagesMixer::parseDomainServerSettings() domainSettings:" << domainSettings; const QString MESSAGES_MIXER_SETTINGS_KEY = "messages_mixer"; - const QString NODE_SEND_BANDWIDTH_KEY = "max_node_send_bandwidth"; - const float DEFAULT_NODE_SEND_BANDWIDTH = 1.0f; - QJsonValue nodeBandwidthValue = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject()[NODE_SEND_BANDWIDTH_KEY]; - if (!nodeBandwidthValue.isDouble()) { - qDebug() << NODE_SEND_BANDWIDTH_KEY << "is not a double - will continue with default value"; - } - - _maxKbpsPerNode = nodeBandwidthValue.toDouble(DEFAULT_NODE_SEND_BANDWIDTH) * KILO_PER_MEGA; - qDebug() << "The maximum send bandwidth per node is" << _maxKbpsPerNode << "kbps."; + // TODO - if we want options, parse them here... + // + // QJsonValue nodeBandwidthValue = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject()[NODE_SEND_BANDWIDTH_KEY]; + // if (!nodeBandwidthValue.isDouble()) { + // qDebug() << NODE_SEND_BANDWIDTH_KEY << "is not a double - will continue with default value"; + // } } diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index 057796bc54..cd15ea7d2a 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -23,23 +23,22 @@ class MessagesMixer : public ThreadedAssignment { public: MessagesMixer(NLPacket& packet); ~MessagesMixer(); -public slots: - /// runs the avatar mixer - void run(); +public slots: + void run(); void nodeKilled(SharedNodePointer killedNode); - void sendStatsPacket(); private slots: - void handleMessagesPacketList(QSharedPointer packetList, SharedNodePointer senderNode); - void handleMessagesPacket(QSharedPointer packet, SharedNodePointer sendingNode); + void handleMessages(QSharedPointer packetList, SharedNodePointer senderNode); + void handleMessagesSubscribe(QSharedPointer packetList, SharedNodePointer senderNode); + void handleMessagesUnsubscribe(QSharedPointer packetList, SharedNodePointer senderNode); private: - void broadcastMessagesData(); void parseDomainServerSettings(const QJsonObject& domainSettings); - - QThread _broadcastThread; + + + QHash> _channelSubscribers; quint64 _lastFrameTimestamp; @@ -50,10 +49,6 @@ private: int _numStatFrames; int _sumBillboardPackets; int _sumIdentityPackets; - - float _maxKbpsPerNode = 0.0f; - - QTimer* _broadcastTimer = nullptr; }; #endif // hifi_MessagesMixer_h diff --git a/assignment-client/src/messages/MessagesMixerClientData.cpp b/assignment-client/src/messages/MessagesMixerClientData.cpp deleted file mode 100644 index 6aa8f39c22..0000000000 --- a/assignment-client/src/messages/MessagesMixerClientData.cpp +++ /dev/null @@ -1,55 +0,0 @@ -// -// MessagesMixerClientData.cpp -// assignment-client/src/messages -// -// Created by Brad hefta-Gaub on 11/16/2015. -// Copyright 2015 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include - -#include "MessagesMixerClientData.h" - -int MessagesMixerClientData::parseData(NLPacket& packet) { - // pull the sequence number from the data first - packet.readPrimitive(&_lastReceivedSequenceNumber); - - // compute the offset to the data payload - return _avatar.parseDataFromBuffer(packet.readWithoutCopy(packet.bytesLeftToRead())); -} - -bool MessagesMixerClientData::checkAndSetHasReceivedFirstPacketsFrom(const QUuid& uuid) { - if (_hasReceivedFirstPacketsFrom.find(uuid) == _hasReceivedFirstPacketsFrom.end()) { - _hasReceivedFirstPacketsFrom.insert(uuid); - return false; - } - return true; -} - -uint16_t MessagesMixerClientData::getLastBroadcastSequenceNumber(const QUuid& nodeUUID) const { - // return the matching PacketSequenceNumber, or the default if we don't have it - auto nodeMatch = _lastBroadcastSequenceNumbers.find(nodeUUID); - if (nodeMatch != _lastBroadcastSequenceNumbers.end()) { - return nodeMatch->second; - } else { - return 0; - } -} - -void MessagesMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { - jsonObject["display_name"] = _avatar.getDisplayName(); - jsonObject["full_rate_distance"] = _fullRateDistance; - jsonObject["max_av_distance"] = _maxAvatarDistance; - jsonObject["num_avs_sent_last_frame"] = _numAvatarsSentLastFrame; - jsonObject["avg_other_av_starves_per_second"] = getAvgNumOtherAvatarStarvesPerSecond(); - jsonObject["avg_other_av_skips_per_second"] = getAvgNumOtherAvatarSkipsPerSecond(); - jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends; - - jsonObject[OUTBOUND_MESSAGES_DATA_STATS_KEY] = getOutboundAvatarDataKbps(); - jsonObject[INBOUND_MESSAGES_DATA_STATS_KEY] = _avatar.getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT; - - jsonObject["av_data_receive_rate"] = _avatar.getReceiveRate(); -} diff --git a/assignment-client/src/messages/MessagesMixerClientData.h b/assignment-client/src/messages/MessagesMixerClientData.h deleted file mode 100644 index 1667df431f..0000000000 --- a/assignment-client/src/messages/MessagesMixerClientData.h +++ /dev/null @@ -1,105 +0,0 @@ -// -// MessagesMixerClientData.h -// assignment-client/src/messages -// -// Created by Brad hefta-Gaub on 11/16/2015. -// Copyright 2015 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#ifndef hifi_MessagesMixerClientData_h -#define hifi_MessagesMixerClientData_h - -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include -#include -#include - -const QString OUTBOUND_MESSAGES_DATA_STATS_KEY = "outbound_av_data_kbps"; -const QString INBOUND_MESSAGES_DATA_STATS_KEY = "inbound_av_data_kbps"; - -class MessagesMixerClientData : public NodeData { - Q_OBJECT -public: - int parseData(NLPacket& packet); - AvatarData& getAvatar() { return _avatar; } - - bool checkAndSetHasReceivedFirstPacketsFrom(const QUuid& uuid); - - uint16_t getLastBroadcastSequenceNumber(const QUuid& nodeUUID) const; - void setLastBroadcastSequenceNumber(const QUuid& nodeUUID, uint16_t sequenceNumber) - { _lastBroadcastSequenceNumbers[nodeUUID] = sequenceNumber; } - Q_INVOKABLE void removeLastBroadcastSequenceNumber(const QUuid& nodeUUID) { _lastBroadcastSequenceNumbers.erase(nodeUUID); } - - uint16_t getLastReceivedSequenceNumber() const { return _lastReceivedSequenceNumber; } - - quint64 getBillboardChangeTimestamp() const { return _billboardChangeTimestamp; } - void setBillboardChangeTimestamp(quint64 billboardChangeTimestamp) { _billboardChangeTimestamp = billboardChangeTimestamp; } - - quint64 getIdentityChangeTimestamp() const { return _identityChangeTimestamp; } - void setIdentityChangeTimestamp(quint64 identityChangeTimestamp) { _identityChangeTimestamp = identityChangeTimestamp; } - - void setFullRateDistance(float fullRateDistance) { _fullRateDistance = fullRateDistance; } - float getFullRateDistance() const { return _fullRateDistance; } - - void setMaxAvatarDistance(float maxAvatarDistance) { _maxAvatarDistance = maxAvatarDistance; } - float getMaxAvatarDistance() const { return _maxAvatarDistance; } - - void resetNumAvatarsSentLastFrame() { _numAvatarsSentLastFrame = 0; } - void incrementNumAvatarsSentLastFrame() { ++_numAvatarsSentLastFrame; } - int getNumAvatarsSentLastFrame() const { return _numAvatarsSentLastFrame; } - - void recordNumOtherAvatarStarves(int numAvatarsHeldBack) { _otherAvatarStarves.updateAverage((float) numAvatarsHeldBack); } - float getAvgNumOtherAvatarStarvesPerSecond() const { return _otherAvatarStarves.getAverageSampleValuePerSecond(); } - - void recordNumOtherAvatarSkips(int numOtherAvatarSkips) { _otherAvatarSkips.updateAverage((float) numOtherAvatarSkips); } - float getAvgNumOtherAvatarSkipsPerSecond() const { return _otherAvatarSkips.getAverageSampleValuePerSecond(); } - - void incrementNumOutOfOrderSends() { ++_numOutOfOrderSends; } - - int getNumFramesSinceFRDAdjustment() const { return _numFramesSinceAdjustment; } - void incrementNumFramesSinceFRDAdjustment() { ++_numFramesSinceAdjustment; } - void resetNumFramesSinceFRDAdjustment() { _numFramesSinceAdjustment = 0; } - - void recordSentAvatarData(int numBytes) { _avgOtherAvatarDataRate.updateAverage((float) numBytes); } - - float getOutboundAvatarDataKbps() const - { return _avgOtherAvatarDataRate.getAverageSampleValuePerSecond() / (float) BYTES_PER_KILOBIT; } - - void loadJSONStats(QJsonObject& jsonObject) const; -private: - AvatarData _avatar; - - uint16_t _lastReceivedSequenceNumber { 0 }; - std::unordered_map _lastBroadcastSequenceNumbers; - std::unordered_set _hasReceivedFirstPacketsFrom; - - quint64 _billboardChangeTimestamp = 0; - quint64 _identityChangeTimestamp = 0; - - float _fullRateDistance = FLT_MAX; - float _maxAvatarDistance = FLT_MAX; - - int _numAvatarsSentLastFrame = 0; - int _numFramesSinceAdjustment = 0; - - SimpleMovingAverage _otherAvatarStarves; - SimpleMovingAverage _otherAvatarSkips; - int _numOutOfOrderSends = 0; - - SimpleMovingAverage _avgOtherAvatarDataRate; -}; - -#endif // hifi_MessagesMixerClientData_h diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index 7517bb6535..cee7206c31 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -91,7 +91,17 @@ void MessagesClient::sendMessage(const QString& channel, const QString& message) if (messagesMixer) { auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - packetList->write(message.toUtf8()); + + auto channelUtf8 = channel.toUtf8(); + quint16 channelLength = channelUtf8.length(); + packetList->writePrimitive(channelLength); + packetList->write(channelUtf8); + + auto messageUtf8 = message.toUtf8(); + quint16 messageLength = messageUtf8.length(); + packetList->writePrimitive(messageLength); + packetList->write(messageUtf8); + nodeList->sendPacketList(std::move(packetList), *messagesMixer); } } diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index d1287d4a08..e0a847dcc6 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -88,7 +88,9 @@ public: AssetGetInfoReply, DomainDisconnectRequest, DomainServerRemovedNode, - MessagesData + MessagesData, + MessagesSubscribe, + MessagesUnsubscribe }; };