From 12f206e2f04bdac6f3c980069f4e6037e51b611b Mon Sep 17 00:00:00 2001 From: Brad Hefta-Gaub Date: Mon, 16 Nov 2015 17:00:03 -0800 Subject: [PATCH] more work on messages --- .../src/messages/MessagesMixer.cpp | 364 ++---------------- .../src/messages/MessagesMixer.h | 2 +- interface/src/Application.cpp | 11 + libraries/networking/src/MessagesClient.cpp | 25 +- libraries/networking/src/MessagesClient.h | 2 +- 5 files changed, 47 insertions(+), 357 deletions(-) diff --git a/assignment-client/src/messages/MessagesMixer.cpp b/assignment-client/src/messages/MessagesMixer.cpp index 70b0c1b2cf..6177850532 100644 --- a/assignment-client/src/messages/MessagesMixer.cpp +++ b/assignment-client/src/messages/MessagesMixer.cpp @@ -48,7 +48,7 @@ MessagesMixer::MessagesMixer(NLPacket& packet) : connect(DependencyManager::get().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled); auto& packetReceiver = DependencyManager::get()->getPacketReceiver(); - packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesDataPacket"); + packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesPacket"); } MessagesMixer::~MessagesMixer() { @@ -64,341 +64,13 @@ MessagesMixer::~MessagesMixer() { // assuming 60 htz update rate. const float BILLBOARD_AND_IDENTITY_SEND_PROBABILITY = 1.0f / 187.0f; -void MessagesMixer::broadcastMessagesData() { - qDebug() << "MessagesMixer::broadcastMessagesData()..."; - - int idleTime = QDateTime::currentMSecsSinceEpoch() - _lastFrameTimestamp; - - ++_numStatFrames; - - const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f; - const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f; - - const float RATIO_BACK_OFF = 0.02f; - - const int TRAILING_AVERAGE_FRAMES = 100; - int framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES; - - const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES; - const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO; - - // NOTE: The following code calculates the _performanceThrottlingRatio based on how much the messages-mixer was - // able to sleep. This will eventually be used to ask for an additional messages-mixer to help out. Currently the value - // is unused as it is assumed this should not be hit before the messages-mixer hits the desired bandwidth limit per client. - // It is reported in the domain-server stats for the messages-mixer. - - _trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio) - + (idleTime * CURRENT_FRAME_RATIO / (float) MESSAGES_DATA_SEND_INTERVAL_MSECS); - - float lastCutoffRatio = _performanceThrottlingRatio; - bool hasRatioChanged = false; - - if (framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) { - if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) { - // we're struggling - change our performance throttling ratio - _performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio)); - - qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was" - << lastCutoffRatio << "and is now" << _performanceThrottlingRatio; - hasRatioChanged = true; - } else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) { - // we've recovered and can back off the performance throttling - _performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF; - - if (_performanceThrottlingRatio < 0) { - _performanceThrottlingRatio = 0; - } - - qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was" - << lastCutoffRatio << "and is now" << _performanceThrottlingRatio; - hasRatioChanged = true; - } - - if (hasRatioChanged) { - framesSinceCutoffEvent = 0; - } - } - - if (!hasRatioChanged) { - ++framesSinceCutoffEvent; - } - - auto nodeList = DependencyManager::get(); - - // setup for distributed random floating point values - std::random_device randomDevice; - std::mt19937 generator(randomDevice()); - std::uniform_real_distribution distribution; - - qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode()"; - - nodeList->eachMatchingNode( - [&](const SharedNodePointer& node)->bool { - if (!node->getLinkedData()) { - return false; - } - if (node->getType() != NodeType::Agent) { - return false; - } - if (!node->getActiveSocket()) { - return false; - } - return true; - }, - [&](const SharedNodePointer& node) { - MessagesMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); - MutexTryLocker lock(nodeData->getMutex()); - if (!lock.isLocked()) { - return; - } - ++_sumListeners; - - AvatarData& avatar = nodeData->getAvatar(); - glm::vec3 myPosition = avatar.getPosition(); - - // reset the internal state for correct random number distribution - distribution.reset(); - - // reset the max distance for this frame - float maxAvatarDistanceThisFrame = 0.0f; - - // reset the number of sent avatars - nodeData->resetNumAvatarsSentLastFrame(); - - // keep a counter of the number of considered avatars - int numOtherAvatars = 0; - - // keep track of outbound data rate specifically for avatar data - int numAvatarDataBytes = 0; - - // keep track of the number of other avatars held back in this frame - int numAvatarsHeldBack = 0; - - // keep track of the number of other avatar frames skipped - int numAvatarsWithSkippedFrames = 0; - - // use the data rate specifically for avatar data for FRD adjustment checks - float avatarDataRateLastSecond = nodeData->getOutboundAvatarDataKbps(); - - // Check if it is time to adjust what we send this client based on the observed - // bandwidth to this node. We do this once a second, which is also the window for - // the bandwidth reported by node->getOutboundBandwidth(); - if (nodeData->getNumFramesSinceFRDAdjustment() > MESSAGES_MIXER_BROADCAST_FRAMES_PER_SECOND) { - - const float FRD_ADJUSTMENT_ACCEPTABLE_RATIO = 0.8f; - const float HYSTERISIS_GAP = (1 - FRD_ADJUSTMENT_ACCEPTABLE_RATIO); - const float HYSTERISIS_MIDDLE_PERCENTAGE = (1 - (HYSTERISIS_GAP * 0.5f)); - - // get the current full rate distance so we can work with it - float currentFullRateDistance = nodeData->getFullRateDistance(); - - if (avatarDataRateLastSecond > _maxKbpsPerNode) { - - // is the FRD greater than the farthest avatar? - // if so, before we calculate anything, set it to that distance - currentFullRateDistance = std::min(currentFullRateDistance, nodeData->getMaxAvatarDistance()); - - // we're adjusting the full rate distance to target a bandwidth in the middle - // of the hysterisis gap - currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond; - - nodeData->setFullRateDistance(currentFullRateDistance); - nodeData->resetNumFramesSinceFRDAdjustment(); - } else if (currentFullRateDistance < nodeData->getMaxAvatarDistance() - && avatarDataRateLastSecond < _maxKbpsPerNode * FRD_ADJUSTMENT_ACCEPTABLE_RATIO) { - // we are constrained AND we've recovered to below the acceptable ratio - // lets adjust the full rate distance to target a bandwidth in the middle of the hyterisis gap - currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond; - - nodeData->setFullRateDistance(currentFullRateDistance); - nodeData->resetNumFramesSinceFRDAdjustment(); - } - } else { - nodeData->incrementNumFramesSinceFRDAdjustment(); - } - - // setup a PacketList for the avatarPackets - auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData); - - // this is an AGENT we have received head data from - // send back a packet with other active node data to this node - nodeList->eachMatchingNode( - [&](const SharedNodePointer& otherNode)->bool { - if (!otherNode->getLinkedData()) { - return false; - } - if (otherNode->getUUID() == node->getUUID()) { - return false; - } - - return true; - }, - [&](const SharedNodePointer& otherNode) { - ++numOtherAvatars; - - MessagesMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - MutexTryLocker lock(otherNodeData->getMutex()); - if (!lock.isLocked()) { - return; - } - - // make sure we send out identity and billboard packets to and from new arrivals. - bool forceSend = !otherNodeData->checkAndSetHasReceivedFirstPacketsFrom(node->getUUID()); - - // we will also force a send of billboard or identity packet - // if either has changed in the last frame - if (otherNodeData->getBillboardChangeTimestamp() > 0 - && (forceSend - || otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp - || distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - - QByteArray rfcUUID = otherNode->getUUID().toRfc4122(); - QByteArray billboard = otherNodeData->getAvatar().getBillboard(); - - auto billboardPacket = NLPacket::create(PacketType::AvatarBillboard, rfcUUID.size() + billboard.size()); - billboardPacket->write(rfcUUID); - billboardPacket->write(billboard); - - nodeList->sendPacket(std::move(billboardPacket), *node); - - ++_sumBillboardPackets; - } - - if (otherNodeData->getIdentityChangeTimestamp() > 0 - && (forceSend - || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp - || distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - - QByteArray individualData = otherNodeData->getAvatar().identityByteArray(); - - auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size()); - - individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122()); - - identityPacket->write(individualData); - - nodeList->sendPacket(std::move(identityPacket), *node); - - ++_sumIdentityPackets; - } - - AvatarData& otherAvatar = otherNodeData->getAvatar(); - // Decide whether to send this avatar's data based on it's distance from us - - // The full rate distance is the distance at which EVERY update will be sent for this avatar - // at twice the full rate distance, there will be a 50% chance of sending this avatar's update - glm::vec3 otherPosition = otherAvatar.getPosition(); - float distanceToAvatar = glm::length(myPosition - otherPosition); - - // potentially update the max full rate distance for this frame - maxAvatarDistanceThisFrame = std::max(maxAvatarDistanceThisFrame, distanceToAvatar); - - if (distanceToAvatar != 0.0f - && distribution(generator) > (nodeData->getFullRateDistance() / distanceToAvatar)) { - return; - } - - AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(otherNode->getUUID()); - AvatarDataSequenceNumber lastSeqFromSender = otherNodeData->getLastReceivedSequenceNumber(); - - if (lastSeqToReceiver > lastSeqFromSender && lastSeqToReceiver != UINT16_MAX) { - // we got out out of order packets from the sender, track it - otherNodeData->incrementNumOutOfOrderSends(); - } - - // make sure we haven't already sent this data from this sender to this receiver - // or that somehow we haven't sent - if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) { - ++numAvatarsHeldBack; - return; - } else if (lastSeqFromSender - lastSeqToReceiver > 1) { - // this is a skip - we still send the packet but capture the presence of the skip so we see it happening - ++numAvatarsWithSkippedFrames; - } - - // we're going to send this avatar - - // increment the number of avatars sent to this reciever - nodeData->incrementNumAvatarsSentLastFrame(); - - // set the last sent sequence number for this sender on the receiver - nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(), - otherNodeData->getLastReceivedSequenceNumber()); - - // start a new segment in the PacketList for this avatar - avatarPacketList->startSegment(); - - numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122()); - numAvatarDataBytes += - avatarPacketList->write(otherAvatar.toByteArray(false, distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO)); - - avatarPacketList->endSegment(); - }); - - // close the current packet so that we're always sending something - avatarPacketList->closeCurrentPacket(true); - - // send the avatar data PacketList - nodeList->sendPacketList(std::move(avatarPacketList), *node); - - // record the bytes sent for other avatar data in the MessagesMixerClientData - nodeData->recordSentAvatarData(numAvatarDataBytes); - - // record the number of avatars held back this frame - nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack); - nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames); - - if (numOtherAvatars == 0) { - // update the full rate distance to FLOAT_MAX since we didn't have any other avatars to send - nodeData->setMaxAvatarDistance(FLT_MAX); - } else { - nodeData->setMaxAvatarDistance(maxAvatarDistanceThisFrame); - } - } - ); - - qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode() for encode..."; - - // We're done encoding this version of the otherAvatars. Update their "lastSent" joint-states so - // that we can notice differences, next time around. - nodeList->eachMatchingNode( - [&](const SharedNodePointer& otherNode)->bool { - if (!otherNode->getLinkedData()) { - return false; - } - if (otherNode->getType() != NodeType::Agent) { - return false; - } - if (!otherNode->getActiveSocket()) { - return false; - } - return true; - }, - [&](const SharedNodePointer& otherNode) { - MessagesMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); - MutexTryLocker lock(otherNodeData->getMutex()); - if (!lock.isLocked()) { - return; - } - AvatarData& otherAvatar = otherNodeData->getAvatar(); - otherAvatar.doneEncoding(false); - }); - - _lastFrameTimestamp = QDateTime::currentMSecsSinceEpoch(); -} - void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { + qDebug() << "MessagesMixer::nodeKilled()... node:" << killedNode->getUUID(); + if (killedNode->getType() == NodeType::Agent && killedNode->getLinkedData()) { auto nodeList = DependencyManager::get(); - // this was an avatar we were sending to other people - // send a kill packet for it to our other nodes - auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID); - killPacket->write(killedNode->getUUID().toRfc4122()); - - nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); - // we also want to remove sequence number data for this avatar on our other avatars // so invoke the appropriate method on the MessagesMixerClientData for other avatars nodeList->eachMatchingNode( @@ -414,18 +86,32 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) { return true; }, [&](const SharedNodePointer& node) { - QMetaObject::invokeMethod(node->getLinkedData(), - "removeLastBroadcastSequenceNumber", - Qt::AutoConnection, - Q_ARG(const QUuid&, QUuid(killedNode->getUUID()))); + qDebug() << "eachMatchingNode()... node:" << node->getUUID(); } ); } } -void MessagesMixer::handleMessagesDataPacket(QSharedPointer packet, SharedNodePointer senderNode) { +void MessagesMixer::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { + qDebug() << "MessagesMixer::handleMessagesPacket()... senderNode:" << senderNode->getUUID(); + auto nodeList = DependencyManager::get(); - nodeList->updateNodeWithDataFromPacket(packet, senderNode); + //nodeList->updateNodeWithDataFromPacket(packet, senderNode); + + QByteArray data = packetList->getMessage(); + auto packetType = packetList->getType(); + + if (packetType == PacketType::MessagesData) { + QString message = QString::fromUtf8(data); + qDebug() << "got a messages packet:" << message; + + // this was an avatar we were sending to other people + // send a kill packet for it to our other nodes + //auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID); + //killPacket->write(killedNode->getUUID().toRfc4122()); + //nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent); + + } } void MessagesMixer::sendStatsPacket() { @@ -484,6 +170,7 @@ void MessagesMixer::run() { node->setLinkedData(new MessagesMixerClientData()); }; + /* // setup the timer that will be fired on the broadcast thread _broadcastTimer = new QTimer; _broadcastTimer->setInterval(MESSAGES_DATA_SEND_INTERVAL_MSECS); @@ -492,6 +179,7 @@ void MessagesMixer::run() { // connect appropriate signals and slots connect(_broadcastTimer, &QTimer::timeout, this, &MessagesMixer::broadcastMessagesData, Qt::DirectConnection); connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start())); + */ // wait until we have the domain-server settings, otherwise we bail DomainHandler& domainHandler = nodeList->getDomainHandler(); @@ -516,7 +204,7 @@ void MessagesMixer::run() { parseDomainServerSettings(domainHandler.getSettingsObject()); // start the broadcastThread - _broadcastThread.start(); + //_broadcastThread.start(); } void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) { diff --git a/assignment-client/src/messages/MessagesMixer.h b/assignment-client/src/messages/MessagesMixer.h index d96a20dd18..0d390b83a4 100644 --- a/assignment-client/src/messages/MessagesMixer.h +++ b/assignment-client/src/messages/MessagesMixer.h @@ -32,7 +32,7 @@ public slots: void sendStatsPacket(); private slots: - void handleMessagesDataPacket(QSharedPointer packet, SharedNodePointer senderNode); + void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); private: void broadcastMessagesData(); diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 96b8ab74a8..692b372ae4 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -70,6 +70,7 @@ #include #include #include +#include #include #include #include @@ -339,6 +340,7 @@ bool setupEssentials(int& argc, char** argv) { DependencyManager::set(); DependencyManager::set(); DependencyManager::set(); + DependencyManager::set(); DependencyManager::set(); DependencyManager::set(); return true; @@ -484,6 +486,14 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer) : connect(assetThread, &QThread::started, assetClient.data(), &AssetClient::init); assetThread->start(); + // Setup MessagesClient + auto messagesClient = DependencyManager::get(); + QThread* messagesThread = new QThread; + messagesThread->setObjectName("Messages Client Thread"); + messagesClient->moveToThread(messagesThread); + connect(messagesThread, &QThread::started, messagesClient.data(), &MessagesClient::init); + messagesThread->start(); + const DomainHandler& domainHandler = nodeList->getDomainHandler(); connect(&domainHandler, SIGNAL(hostnameChanged(const QString&)), SLOT(domainChanged(const QString&))); @@ -4019,6 +4029,7 @@ void Application::registerScriptEngineWithApplicationServices(ScriptEngine* scri scriptEngine->registerFunction("HMD", "getHUDLookAtPosition3D", HMDScriptingInterface::getHUDLookAtPosition3D, 0); scriptEngine->registerGlobalObject("Scene", DependencyManager::get().data()); + scriptEngine->registerGlobalObject("Messages", DependencyManager::get().data()); scriptEngine->registerGlobalObject("ScriptDiscoveryService", this->getRunningScriptsWidget()); diff --git a/libraries/networking/src/MessagesClient.cpp b/libraries/networking/src/MessagesClient.cpp index d3d44b6fdc..7517bb6535 100644 --- a/libraries/networking/src/MessagesClient.cpp +++ b/libraries/networking/src/MessagesClient.cpp @@ -74,34 +74,25 @@ bool haveMessagesMixer() { return true; } -void MessagesClient::handleMessagesPacket(QSharedPointer packet, SharedNodePointer senderNode) { - auto packetType = packet->getType(); +void MessagesClient::handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode) { + QByteArray data = packetList->getMessage(); + auto packetType = packetList->getType(); if (packetType == PacketType::MessagesData) { - qDebug() << "got a messages packet"; + QString message = QString::fromUtf8(data); + qDebug() << "got a messages packet:" << message; } } void MessagesClient::sendMessage(const QString& channel, const QString& message) { + qDebug() << "MessagesClient::sendMessage() channel:" << channel << "message:" << message; auto nodeList = DependencyManager::get(); SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer); if (messagesMixer) { auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true); - - #if 0 - auto messageID = ++_currentID; - packetList->writePrimitive(messageID); - - packetList->writePrimitive(static_cast(extension.length())); - packetList->write(extension.toLatin1().constData(), extension.length()); - - uint64_t size = data.length(); - packetList->writePrimitive(size); - packetList->write(data.constData(), size); - - nodeList->sendPacketList(std::move(packetList), *assetServer); - #endif + packetList->write(message.toUtf8()); + nodeList->sendPacketList(std::move(packetList), *messagesMixer); } } diff --git a/libraries/networking/src/MessagesClient.h b/libraries/networking/src/MessagesClient.h index f1f13bfe20..121e6041b1 100644 --- a/libraries/networking/src/MessagesClient.h +++ b/libraries/networking/src/MessagesClient.h @@ -31,7 +31,7 @@ public: Q_INVOKABLE void sendMessage(const QString& channel, const QString& message); private slots: - void handleMessagesPacket(QSharedPointer packet, SharedNodePointer senderNode); + void handleMessagesPacket(QSharedPointer packetList, SharedNodePointer senderNode); void handleNodeKilled(SharedNodePointer node); private: