first cut at messages-mixer

This commit is contained in:
Brad Hefta-Gaub 2015-11-16 15:26:17 -08:00
parent 8dcf245b74
commit d42a1a721f
22 changed files with 959 additions and 7 deletions

View file

@ -55,6 +55,7 @@ Agent::Agent(NLPacket& packet) :
{ PacketType::OctreeStats, PacketType::EntityData, PacketType::EntityErase },
this, "handleOctreePacket");
packetReceiver.registerListener(PacketType::Jurisdiction, this, "handleJurisdictionPacket");
packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagePacket");
}
void Agent::handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
@ -93,7 +94,15 @@ void Agent::handleJurisdictionPacket(QSharedPointer<NLPacket> packet, SharedNode
DependencyManager::get<EntityScriptingInterface>()->getJurisdictionListener()->
queueReceivedPacket(packet, senderNode);
}
}
}
void Agent::handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
auto packetType = packet->getType();
if (packetType == PacketType::MessagesData) {
qDebug() << "got a messages packet";
}
}
void Agent::handleAudioPacket(QSharedPointer<NLPacket> packet) {
_receivedAudioStream.parseData(*packet);

View file

@ -58,6 +58,7 @@ private slots:
void handleAudioPacket(QSharedPointer<NLPacket> packet);
void handleOctreePacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleJurisdictionPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void sendPingRequests();
void processAgentAvatarAndAudio(float deltaTime);

View file

@ -17,6 +17,7 @@
#include "avatars/AvatarMixer.h"
#include "entities/EntityServer.h"
#include "assets/AssetServer.h"
#include "messages/MessagesMixer.h"
ThreadedAssignment* AssignmentFactory::unpackAssignment(NLPacket& packet) {
@ -36,6 +37,8 @@ ThreadedAssignment* AssignmentFactory::unpackAssignment(NLPacket& packet) {
return new EntityServer(packet);
case Assignment::AssetServerType:
return new AssetServer(packet);
case Assignment::MessagesMixerType:
return new MessagesMixer(packet);
default:
return NULL;
}

View file

@ -0,0 +1,535 @@
//
// MessagesMixer.cpp
// assignment-client/src/messages
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <cfloat>
#include <random>
#include <QtCore/QCoreApplication>
#include <QtCore/QDateTime>
#include <QtCore/QJsonObject>
#include <QtCore/QTimer>
#include <QtCore/QThread>
#include <LogHandler.h>
#include <NodeList.h>
#include <udt/PacketHeaders.h>
#include <SharedUtil.h>
#include <UUID.h>
#include <TryLocker.h>
#include "MessagesMixerClientData.h"
#include "MessagesMixer.h"
const QString MESSAGES_MIXER_LOGGING_NAME = "messages-mixer";
const int MESSAGES_MIXER_BROADCAST_FRAMES_PER_SECOND = 60;
const unsigned int MESSAGES_DATA_SEND_INTERVAL_MSECS = (1.0f / (float) MESSAGES_MIXER_BROADCAST_FRAMES_PER_SECOND) * 1000;
MessagesMixer::MessagesMixer(NLPacket& packet) :
ThreadedAssignment(packet),
_broadcastThread(),
_lastFrameTimestamp(QDateTime::currentMSecsSinceEpoch()),
_trailingSleepRatio(1.0f),
_performanceThrottlingRatio(0.0f),
_sumListeners(0),
_numStatFrames(0),
_sumBillboardPackets(0),
_sumIdentityPackets(0)
{
// make sure we hear about node kills so we can tell the other nodes
connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesDataPacket");
}
MessagesMixer::~MessagesMixer() {
if (_broadcastTimer) {
_broadcastTimer->deleteLater();
}
_broadcastThread.quit();
_broadcastThread.wait();
}
// An 80% chance of sending a identity packet within a 5 second interval.
// assuming 60 htz update rate.
const float BILLBOARD_AND_IDENTITY_SEND_PROBABILITY = 1.0f / 187.0f;
void MessagesMixer::broadcastMessagesData() {
qDebug() << "MessagesMixer::broadcastMessagesData()...";
int idleTime = QDateTime::currentMSecsSinceEpoch() - _lastFrameTimestamp;
++_numStatFrames;
const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f;
const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f;
const float RATIO_BACK_OFF = 0.02f;
const int TRAILING_AVERAGE_FRAMES = 100;
int framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES;
const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES;
const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO;
// NOTE: The following code calculates the _performanceThrottlingRatio based on how much the messages-mixer was
// able to sleep. This will eventually be used to ask for an additional messages-mixer to help out. Currently the value
// is unused as it is assumed this should not be hit before the messages-mixer hits the desired bandwidth limit per client.
// It is reported in the domain-server stats for the messages-mixer.
_trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio)
+ (idleTime * CURRENT_FRAME_RATIO / (float) MESSAGES_DATA_SEND_INTERVAL_MSECS);
float lastCutoffRatio = _performanceThrottlingRatio;
bool hasRatioChanged = false;
if (framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) {
if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) {
// we're struggling - change our performance throttling ratio
_performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio));
qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
} else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) {
// we've recovered and can back off the performance throttling
_performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF;
if (_performanceThrottlingRatio < 0) {
_performanceThrottlingRatio = 0;
}
qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
}
if (hasRatioChanged) {
framesSinceCutoffEvent = 0;
}
}
if (!hasRatioChanged) {
++framesSinceCutoffEvent;
}
auto nodeList = DependencyManager::get<NodeList>();
// setup for distributed random floating point values
std::random_device randomDevice;
std::mt19937 generator(randomDevice());
std::uniform_real_distribution<float> distribution;
qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode()";
nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool {
if (!node->getLinkedData()) {
return false;
}
if (node->getType() != NodeType::Agent) {
return false;
}
if (!node->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& node) {
MessagesMixerClientData* nodeData = reinterpret_cast<MessagesMixerClientData*>(node->getLinkedData());
MutexTryLocker lock(nodeData->getMutex());
if (!lock.isLocked()) {
return;
}
++_sumListeners;
AvatarData& avatar = nodeData->getAvatar();
glm::vec3 myPosition = avatar.getPosition();
// reset the internal state for correct random number distribution
distribution.reset();
// reset the max distance for this frame
float maxAvatarDistanceThisFrame = 0.0f;
// reset the number of sent avatars
nodeData->resetNumAvatarsSentLastFrame();
// keep a counter of the number of considered avatars
int numOtherAvatars = 0;
// keep track of outbound data rate specifically for avatar data
int numAvatarDataBytes = 0;
// keep track of the number of other avatars held back in this frame
int numAvatarsHeldBack = 0;
// keep track of the number of other avatar frames skipped
int numAvatarsWithSkippedFrames = 0;
// use the data rate specifically for avatar data for FRD adjustment checks
float avatarDataRateLastSecond = nodeData->getOutboundAvatarDataKbps();
// Check if it is time to adjust what we send this client based on the observed
// bandwidth to this node. We do this once a second, which is also the window for
// the bandwidth reported by node->getOutboundBandwidth();
if (nodeData->getNumFramesSinceFRDAdjustment() > MESSAGES_MIXER_BROADCAST_FRAMES_PER_SECOND) {
const float FRD_ADJUSTMENT_ACCEPTABLE_RATIO = 0.8f;
const float HYSTERISIS_GAP = (1 - FRD_ADJUSTMENT_ACCEPTABLE_RATIO);
const float HYSTERISIS_MIDDLE_PERCENTAGE = (1 - (HYSTERISIS_GAP * 0.5f));
// get the current full rate distance so we can work with it
float currentFullRateDistance = nodeData->getFullRateDistance();
if (avatarDataRateLastSecond > _maxKbpsPerNode) {
// is the FRD greater than the farthest avatar?
// if so, before we calculate anything, set it to that distance
currentFullRateDistance = std::min(currentFullRateDistance, nodeData->getMaxAvatarDistance());
// we're adjusting the full rate distance to target a bandwidth in the middle
// of the hysterisis gap
currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond;
nodeData->setFullRateDistance(currentFullRateDistance);
nodeData->resetNumFramesSinceFRDAdjustment();
} else if (currentFullRateDistance < nodeData->getMaxAvatarDistance()
&& avatarDataRateLastSecond < _maxKbpsPerNode * FRD_ADJUSTMENT_ACCEPTABLE_RATIO) {
// we are constrained AND we've recovered to below the acceptable ratio
// lets adjust the full rate distance to target a bandwidth in the middle of the hyterisis gap
currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond;
nodeData->setFullRateDistance(currentFullRateDistance);
nodeData->resetNumFramesSinceFRDAdjustment();
}
} else {
nodeData->incrementNumFramesSinceFRDAdjustment();
}
// setup a PacketList for the avatarPackets
auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData);
// this is an AGENT we have received head data from
// send back a packet with other active node data to this node
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
if (!otherNode->getLinkedData()) {
return false;
}
if (otherNode->getUUID() == node->getUUID()) {
return false;
}
return true;
},
[&](const SharedNodePointer& otherNode) {
++numOtherAvatars;
MessagesMixerClientData* otherNodeData = reinterpret_cast<MessagesMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
if (!lock.isLocked()) {
return;
}
// make sure we send out identity and billboard packets to and from new arrivals.
bool forceSend = !otherNodeData->checkAndSetHasReceivedFirstPacketsFrom(node->getUUID());
// we will also force a send of billboard or identity packet
// if either has changed in the last frame
if (otherNodeData->getBillboardChangeTimestamp() > 0
&& (forceSend
|| otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
QByteArray rfcUUID = otherNode->getUUID().toRfc4122();
QByteArray billboard = otherNodeData->getAvatar().getBillboard();
auto billboardPacket = NLPacket::create(PacketType::AvatarBillboard, rfcUUID.size() + billboard.size());
billboardPacket->write(rfcUUID);
billboardPacket->write(billboard);
nodeList->sendPacket(std::move(billboardPacket), *node);
++_sumBillboardPackets;
}
if (otherNodeData->getIdentityChangeTimestamp() > 0
&& (forceSend
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
QByteArray individualData = otherNodeData->getAvatar().identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122());
identityPacket->write(individualData);
nodeList->sendPacket(std::move(identityPacket), *node);
++_sumIdentityPackets;
}
AvatarData& otherAvatar = otherNodeData->getAvatar();
// Decide whether to send this avatar's data based on it's distance from us
// The full rate distance is the distance at which EVERY update will be sent for this avatar
// at twice the full rate distance, there will be a 50% chance of sending this avatar's update
glm::vec3 otherPosition = otherAvatar.getPosition();
float distanceToAvatar = glm::length(myPosition - otherPosition);
// potentially update the max full rate distance for this frame
maxAvatarDistanceThisFrame = std::max(maxAvatarDistanceThisFrame, distanceToAvatar);
if (distanceToAvatar != 0.0f
&& distribution(generator) > (nodeData->getFullRateDistance() / distanceToAvatar)) {
return;
}
AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(otherNode->getUUID());
AvatarDataSequenceNumber lastSeqFromSender = otherNodeData->getLastReceivedSequenceNumber();
if (lastSeqToReceiver > lastSeqFromSender && lastSeqToReceiver != UINT16_MAX) {
// we got out out of order packets from the sender, track it
otherNodeData->incrementNumOutOfOrderSends();
}
// make sure we haven't already sent this data from this sender to this receiver
// or that somehow we haven't sent
if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) {
++numAvatarsHeldBack;
return;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
++numAvatarsWithSkippedFrames;
}
// we're going to send this avatar
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
// set the last sent sequence number for this sender on the receiver
nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(),
otherNodeData->getLastReceivedSequenceNumber());
// start a new segment in the PacketList for this avatar
avatarPacketList->startSegment();
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
numAvatarDataBytes +=
avatarPacketList->write(otherAvatar.toByteArray(false, distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO));
avatarPacketList->endSegment();
});
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
// send the avatar data PacketList
nodeList->sendPacketList(std::move(avatarPacketList), *node);
// record the bytes sent for other avatar data in the MessagesMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
// record the number of avatars held back this frame
nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack);
nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames);
if (numOtherAvatars == 0) {
// update the full rate distance to FLOAT_MAX since we didn't have any other avatars to send
nodeData->setMaxAvatarDistance(FLT_MAX);
} else {
nodeData->setMaxAvatarDistance(maxAvatarDistanceThisFrame);
}
}
);
qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode() for encode...";
// We're done encoding this version of the otherAvatars. Update their "lastSent" joint-states so
// that we can notice differences, next time around.
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
if (!otherNode->getLinkedData()) {
return false;
}
if (otherNode->getType() != NodeType::Agent) {
return false;
}
if (!otherNode->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& otherNode) {
MessagesMixerClientData* otherNodeData = reinterpret_cast<MessagesMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
if (!lock.isLocked()) {
return;
}
AvatarData& otherAvatar = otherNodeData->getAvatar();
otherAvatar.doneEncoding(false);
});
_lastFrameTimestamp = QDateTime::currentMSecsSinceEpoch();
}
void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
if (killedNode->getType() == NodeType::Agent
&& killedNode->getLinkedData()) {
auto nodeList = DependencyManager::get<NodeList>();
// this was an avatar we were sending to other people
// send a kill packet for it to our other nodes
auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID);
killPacket->write(killedNode->getUUID().toRfc4122());
nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent);
// we also want to remove sequence number data for this avatar on our other avatars
// so invoke the appropriate method on the MessagesMixerClientData for other avatars
nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool {
if (!node->getLinkedData()) {
return false;
}
if (node->getUUID() == killedNode->getUUID()) {
return false;
}
return true;
},
[&](const SharedNodePointer& node) {
QMetaObject::invokeMethod(node->getLinkedData(),
"removeLastBroadcastSequenceNumber",
Qt::AutoConnection,
Q_ARG(const QUuid&, QUuid(killedNode->getUUID())));
}
);
}
}
void MessagesMixer::handleMessagesDataPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->updateNodeWithDataFromPacket(packet, senderNode);
}
void MessagesMixer::sendStatsPacket() {
QJsonObject statsObject;
statsObject["average_listeners_last_second"] = (float) _sumListeners / (float) _numStatFrames;
statsObject["trailing_sleep_percentage"] = _trailingSleepRatio * 100;
statsObject["performance_throttling_ratio"] = _performanceThrottlingRatio;
QJsonObject messagesObject;
auto nodeList = DependencyManager::get<NodeList>();
// add stats for each listerner
nodeList->eachNode([&](const SharedNodePointer& node) {
QJsonObject messagesStats;
const QString NODE_OUTBOUND_KBPS_STAT_KEY = "outbound_kbps";
const QString NODE_INBOUND_KBPS_STAT_KEY = "inbound_kbps";
// add the key to ask the domain-server for a username replacement, if it has it
messagesStats[USERNAME_UUID_REPLACEMENT_STATS_KEY] = uuidStringWithoutCurlyBraces(node->getUUID());
messagesStats[NODE_OUTBOUND_KBPS_STAT_KEY] = node->getOutboundBandwidth();
messagesStats[NODE_INBOUND_KBPS_STAT_KEY] = node->getInboundBandwidth();
MessagesMixerClientData* clientData = static_cast<MessagesMixerClientData*>(node->getLinkedData());
if (clientData) {
MutexTryLocker lock(clientData->getMutex());
if (lock.isLocked()) {
clientData->loadJSONStats(messagesStats);
// add the diff between the full outbound bandwidth and the measured bandwidth for AvatarData send only
messagesStats["delta_full_vs_avatar_data_kbps"] =
messagesStats[NODE_OUTBOUND_KBPS_STAT_KEY].toDouble() - messagesStats[OUTBOUND_MESSAGES_DATA_STATS_KEY].toDouble();
}
}
messagesObject[uuidStringWithoutCurlyBraces(node->getUUID())] = messagesStats;
});
statsObject["messages"] = messagesObject;
ThreadedAssignment::addPacketStatsAndSendStatsPacket(statsObject);
_sumListeners = 0;
_numStatFrames = 0;
}
void MessagesMixer::run() {
ThreadedAssignment::commonInit(MESSAGES_MIXER_LOGGING_NAME, NodeType::MessagesMixer);
NodeType_t owningNodeType = DependencyManager::get<NodeList>()->getOwnerType();
qDebug() << "owningNodeType:" << owningNodeType;
auto nodeList = DependencyManager::get<NodeList>();
nodeList->addNodeTypeToInterestSet(NodeType::Agent);
nodeList->linkedDataCreateCallback = [] (Node* node) {
node->setLinkedData(new MessagesMixerClientData());
};
// setup the timer that will be fired on the broadcast thread
_broadcastTimer = new QTimer;
_broadcastTimer->setInterval(MESSAGES_DATA_SEND_INTERVAL_MSECS);
_broadcastTimer->moveToThread(&_broadcastThread);
// connect appropriate signals and slots
connect(_broadcastTimer, &QTimer::timeout, this, &MessagesMixer::broadcastMessagesData, Qt::DirectConnection);
connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start()));
// wait until we have the domain-server settings, otherwise we bail
DomainHandler& domainHandler = nodeList->getDomainHandler();
qDebug() << "Waiting for domain settings from domain-server.";
// block until we get the settingsRequestComplete signal
QEventLoop loop;
connect(&domainHandler, &DomainHandler::settingsReceived, &loop, &QEventLoop::quit);
connect(&domainHandler, &DomainHandler::settingsReceiveFail, &loop, &QEventLoop::quit);
domainHandler.requestDomainSettings();
loop.exec();
if (domainHandler.getSettingsObject().isEmpty()) {
qDebug() << "Failed to retreive settings object from domain-server. Bailing on assignment.";
setFinished(true);
return;
}
// parse the settings to pull out the values we need
parseDomainServerSettings(domainHandler.getSettingsObject());
// start the broadcastThread
_broadcastThread.start();
}
void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
qDebug() << "MessagesMixer::parseDomainServerSettings() domainSettings:" << domainSettings;
const QString MESSAGES_MIXER_SETTINGS_KEY = "messages_mixer";
const QString NODE_SEND_BANDWIDTH_KEY = "max_node_send_bandwidth";
const float DEFAULT_NODE_SEND_BANDWIDTH = 1.0f;
QJsonValue nodeBandwidthValue = domainSettings[MESSAGES_MIXER_SETTINGS_KEY].toObject()[NODE_SEND_BANDWIDTH_KEY];
if (!nodeBandwidthValue.isDouble()) {
qDebug() << NODE_SEND_BANDWIDTH_KEY << "is not a double - will continue with default value";
}
_maxKbpsPerNode = nodeBandwidthValue.toDouble(DEFAULT_NODE_SEND_BANDWIDTH) * KILO_PER_MEGA;
qDebug() << "The maximum send bandwidth per node is" << _maxKbpsPerNode << "kbps.";
}

View file

@ -0,0 +1,58 @@
//
// MessagesMixer.h
// assignment-client/src/messages
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// The avatar mixer receives head, hand and positional data from all connected
// nodes, and broadcasts that data back to them, every BROADCAST_INTERVAL ms.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_MessagesMixer_h
#define hifi_MessagesMixer_h
#include <ThreadedAssignment.h>
/// Handles assignments of type MessagesMixer - distribution of avatar data to various clients
class MessagesMixer : public ThreadedAssignment {
Q_OBJECT
public:
MessagesMixer(NLPacket& packet);
~MessagesMixer();
public slots:
/// runs the avatar mixer
void run();
void nodeKilled(SharedNodePointer killedNode);
void sendStatsPacket();
private slots:
void handleMessagesDataPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
private:
void broadcastMessagesData();
void parseDomainServerSettings(const QJsonObject& domainSettings);
QThread _broadcastThread;
quint64 _lastFrameTimestamp;
float _trailingSleepRatio;
float _performanceThrottlingRatio;
int _sumListeners;
int _numStatFrames;
int _sumBillboardPackets;
int _sumIdentityPackets;
float _maxKbpsPerNode = 0.0f;
QTimer* _broadcastTimer = nullptr;
};
#endif // hifi_MessagesMixer_h

View file

@ -0,0 +1,55 @@
//
// MessagesMixerClientData.cpp
// assignment-client/src/messages
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include <udt/PacketHeaders.h>
#include "MessagesMixerClientData.h"
int MessagesMixerClientData::parseData(NLPacket& packet) {
// pull the sequence number from the data first
packet.readPrimitive(&_lastReceivedSequenceNumber);
// compute the offset to the data payload
return _avatar.parseDataFromBuffer(packet.readWithoutCopy(packet.bytesLeftToRead()));
}
bool MessagesMixerClientData::checkAndSetHasReceivedFirstPacketsFrom(const QUuid& uuid) {
if (_hasReceivedFirstPacketsFrom.find(uuid) == _hasReceivedFirstPacketsFrom.end()) {
_hasReceivedFirstPacketsFrom.insert(uuid);
return false;
}
return true;
}
uint16_t MessagesMixerClientData::getLastBroadcastSequenceNumber(const QUuid& nodeUUID) const {
// return the matching PacketSequenceNumber, or the default if we don't have it
auto nodeMatch = _lastBroadcastSequenceNumbers.find(nodeUUID);
if (nodeMatch != _lastBroadcastSequenceNumbers.end()) {
return nodeMatch->second;
} else {
return 0;
}
}
void MessagesMixerClientData::loadJSONStats(QJsonObject& jsonObject) const {
jsonObject["display_name"] = _avatar.getDisplayName();
jsonObject["full_rate_distance"] = _fullRateDistance;
jsonObject["max_av_distance"] = _maxAvatarDistance;
jsonObject["num_avs_sent_last_frame"] = _numAvatarsSentLastFrame;
jsonObject["avg_other_av_starves_per_second"] = getAvgNumOtherAvatarStarvesPerSecond();
jsonObject["avg_other_av_skips_per_second"] = getAvgNumOtherAvatarSkipsPerSecond();
jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends;
jsonObject[OUTBOUND_MESSAGES_DATA_STATS_KEY] = getOutboundAvatarDataKbps();
jsonObject[INBOUND_MESSAGES_DATA_STATS_KEY] = _avatar.getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT;
jsonObject["av_data_receive_rate"] = _avatar.getReceiveRate();
}

View file

@ -0,0 +1,105 @@
//
// MessagesMixerClientData.h
// assignment-client/src/messages
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_MessagesMixerClientData_h
#define hifi_MessagesMixerClientData_h
#include <algorithm>
#include <cfloat>
#include <unordered_map>
#include <unordered_set>
#include <QtCore/QJsonObject>
#include <QtCore/QUrl>
#include <AvatarData.h>
#include <NodeData.h>
#include <NumericalConstants.h>
#include <udt/PacketHeaders.h>
#include <SimpleMovingAverage.h>
#include <UUIDHasher.h>
const QString OUTBOUND_MESSAGES_DATA_STATS_KEY = "outbound_av_data_kbps";
const QString INBOUND_MESSAGES_DATA_STATS_KEY = "inbound_av_data_kbps";
class MessagesMixerClientData : public NodeData {
Q_OBJECT
public:
int parseData(NLPacket& packet);
AvatarData& getAvatar() { return _avatar; }
bool checkAndSetHasReceivedFirstPacketsFrom(const QUuid& uuid);
uint16_t getLastBroadcastSequenceNumber(const QUuid& nodeUUID) const;
void setLastBroadcastSequenceNumber(const QUuid& nodeUUID, uint16_t sequenceNumber)
{ _lastBroadcastSequenceNumbers[nodeUUID] = sequenceNumber; }
Q_INVOKABLE void removeLastBroadcastSequenceNumber(const QUuid& nodeUUID) { _lastBroadcastSequenceNumbers.erase(nodeUUID); }
uint16_t getLastReceivedSequenceNumber() const { return _lastReceivedSequenceNumber; }
quint64 getBillboardChangeTimestamp() const { return _billboardChangeTimestamp; }
void setBillboardChangeTimestamp(quint64 billboardChangeTimestamp) { _billboardChangeTimestamp = billboardChangeTimestamp; }
quint64 getIdentityChangeTimestamp() const { return _identityChangeTimestamp; }
void setIdentityChangeTimestamp(quint64 identityChangeTimestamp) { _identityChangeTimestamp = identityChangeTimestamp; }
void setFullRateDistance(float fullRateDistance) { _fullRateDistance = fullRateDistance; }
float getFullRateDistance() const { return _fullRateDistance; }
void setMaxAvatarDistance(float maxAvatarDistance) { _maxAvatarDistance = maxAvatarDistance; }
float getMaxAvatarDistance() const { return _maxAvatarDistance; }
void resetNumAvatarsSentLastFrame() { _numAvatarsSentLastFrame = 0; }
void incrementNumAvatarsSentLastFrame() { ++_numAvatarsSentLastFrame; }
int getNumAvatarsSentLastFrame() const { return _numAvatarsSentLastFrame; }
void recordNumOtherAvatarStarves(int numAvatarsHeldBack) { _otherAvatarStarves.updateAverage((float) numAvatarsHeldBack); }
float getAvgNumOtherAvatarStarvesPerSecond() const { return _otherAvatarStarves.getAverageSampleValuePerSecond(); }
void recordNumOtherAvatarSkips(int numOtherAvatarSkips) { _otherAvatarSkips.updateAverage((float) numOtherAvatarSkips); }
float getAvgNumOtherAvatarSkipsPerSecond() const { return _otherAvatarSkips.getAverageSampleValuePerSecond(); }
void incrementNumOutOfOrderSends() { ++_numOutOfOrderSends; }
int getNumFramesSinceFRDAdjustment() const { return _numFramesSinceAdjustment; }
void incrementNumFramesSinceFRDAdjustment() { ++_numFramesSinceAdjustment; }
void resetNumFramesSinceFRDAdjustment() { _numFramesSinceAdjustment = 0; }
void recordSentAvatarData(int numBytes) { _avgOtherAvatarDataRate.updateAverage((float) numBytes); }
float getOutboundAvatarDataKbps() const
{ return _avgOtherAvatarDataRate.getAverageSampleValuePerSecond() / (float) BYTES_PER_KILOBIT; }
void loadJSONStats(QJsonObject& jsonObject) const;
private:
AvatarData _avatar;
uint16_t _lastReceivedSequenceNumber { 0 };
std::unordered_map<QUuid, uint16_t> _lastBroadcastSequenceNumbers;
std::unordered_set<QUuid> _hasReceivedFirstPacketsFrom;
quint64 _billboardChangeTimestamp = 0;
quint64 _identityChangeTimestamp = 0;
float _fullRateDistance = FLT_MAX;
float _maxAvatarDistance = FLT_MAX;
int _numAvatarsSentLastFrame = 0;
int _numFramesSinceAdjustment = 0;
SimpleMovingAverage _otherAvatarStarves;
SimpleMovingAverage _otherAvatarSkips;
int _numOutOfOrderSends = 0;
SimpleMovingAverage _avgOtherAvatarDataRate;
};
#endif // hifi_MessagesMixerClientData_h

View file

@ -549,6 +549,22 @@
"advanced": true
}
]
},
{
"name": "messages_mixer",
"label": "Messages Mixer",
"assignment-types": [4],
"settings": [
{
"name": "max_node_send_bandwidth",
"type": "double",
"label": "Per-Node Bandwidth",
"help": "Desired maximum send bandwidth (in Megabits per second) to each node",
"placeholder": 1.0,
"default": 1.0,
"advanced": true
}
]
}
]
}

View file

@ -48,7 +48,8 @@ QUuid DomainGatekeeper::assignmentUUIDForPendingAssignment(const QUuid& tempUUID
const NodeSet STATICALLY_ASSIGNED_NODES = NodeSet() << NodeType::AudioMixer
<< NodeType::AvatarMixer << NodeType::EntityServer
<< NodeType::AssetServer;
<< NodeType::AssetServer
<< NodeType::MessagesMixer;
void DomainGatekeeper::processConnectRequestPacket(QSharedPointer<NLPacket> packet) {
if (packet->getPayloadSize() == 0) {
@ -66,7 +67,7 @@ void DomainGatekeeper::processConnectRequestPacket(QSharedPointer<NLPacket> pack
}
static const NodeSet VALID_NODE_TYPES {
NodeType::AudioMixer, NodeType::AvatarMixer, NodeType::AssetServer, NodeType::EntityServer, NodeType::Agent
NodeType::AudioMixer, NodeType::AvatarMixer, NodeType::AssetServer, NodeType::EntityServer, NodeType::Agent, NodeType::MessagesMixer
};
if (!VALID_NODE_TYPES.contains(nodeConnection.nodeType)) {

View file

@ -553,7 +553,6 @@ void DomainServer::populateDefaultStaticAssignmentsExcludingTypes(const QSet<Ass
defaultedType != Assignment::AllTypes;
defaultedType = static_cast<Assignment::Type>(static_cast<int>(defaultedType) + 1)) {
if (!excludedTypes.contains(defaultedType)
&& defaultedType != Assignment::UNUSED_0
&& defaultedType != Assignment::UNUSED_1
&& defaultedType != Assignment::AgentType) {

View file

@ -131,7 +131,8 @@ void DomainServerSettingsManager::setupConfigMap(const QStringList& argumentList
appSettings.setValue(JSON_SETTINGS_VERSION_KEY, _descriptionVersion);
}
QVariant DomainServerSettingsManager::valueOrDefaultValueForKeyPath(const QString &keyPath) {
QVariant DomainServerSettingsManager::valueOrDefaultValueForKeyPath(const QString& keyPath) {
qDebug() << "DomainServerSettingsManager::valueOrDefaultValueForKeyPath() keyPath:" << keyPath;
const QVariant* foundValue = valueForKeyPath(_configMap.getMergedConfig(), keyPath);
if (foundValue) {
@ -226,6 +227,8 @@ bool DomainServerSettingsManager::handleAuthenticatedHTTPRequest(HTTPConnection
}
QJsonObject DomainServerSettingsManager::responseObjectForType(const QString& typeValue, bool isAuthenticated) {
qDebug() << "DomainServerSettingsManager::responseObjectForType() typeValue:" << typeValue;
QJsonObject responseObject;
if (!typeValue.isEmpty() || isAuthenticated) {

View file

@ -30,6 +30,8 @@ Assignment::Type Assignment::typeForNodeType(NodeType_t nodeType) {
return Assignment::EntityServerType;
case NodeType::AssetServer:
return Assignment::AssetServerType;
case NodeType::MessagesMixer:
return Assignment::MessagesMixerType;
default:
return Assignment::AllTypes;
}
@ -131,6 +133,8 @@ const char* Assignment::getTypeName() const {
return "asset-server";
case Assignment::EntityServerType:
return "entity-server";
case Assignment::MessagesMixerType:
return "messages-mixer";
default:
return "unknown";
}

View file

@ -30,7 +30,7 @@ public:
AvatarMixerType = 1,
AgentType = 2,
AssetServerType = 3,
UNUSED_0 = 4,
MessagesMixerType = 4,
UNUSED_1 = 5,
EntityServerType = 6,
AllTypes = 7

View file

@ -242,6 +242,8 @@ void DomainHandler::requestDomainSettings() {
Assignment::Type assignmentType = Assignment::typeForNodeType(DependencyManager::get<NodeList>()->getOwnerType());
qCDebug(networking) << "Requesting settings from domain server for assignmentType:" << assignmentType;
auto packet = NLPacket::create(PacketType::DomainSettingsRequest, sizeof(assignmentType), true, false);
packet->writePrimitive(assignmentType);

View file

@ -0,0 +1,113 @@
//
// MessagesClient.cpp
// libraries/networking/src
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "MessagesClient.h"
#include <cstdint>
#include <QtCore/QBuffer>
#include <QtCore/QStandardPaths>
#include <QtCore/QThread>
#include <QtNetwork/QNetworkDiskCache>
#include "AssetRequest.h"
#include "AssetUpload.h"
#include "AssetUtils.h"
#include "NetworkAccessManager.h"
#include "NetworkLogging.h"
#include "NodeList.h"
#include "PacketReceiver.h"
#include "ResourceCache.h"
MessagesClient::MessagesClient() {
setCustomDeleter([](Dependency* dependency){
static_cast<MessagesClient*>(dependency)->deleteLater();
});
auto nodeList = DependencyManager::get<NodeList>();
auto& packetReceiver = nodeList->getPacketReceiver();
packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagePacket");
connect(nodeList.data(), &LimitedNodeList::nodeKilled, this, &MessagesClient::handleNodeKilled);
}
void MessagesClient::init() {
if (QThread::currentThread() != thread()) {
QMetaObject::invokeMethod(this, "init", Qt::BlockingQueuedConnection);
}
// Setup disk cache if not already
QNetworkAccessManager& networkAccessManager = NetworkAccessManager::getInstance();
if (!networkAccessManager.cache()) {
QString cachePath = QStandardPaths::writableLocation(QStandardPaths::DataLocation);
cachePath = !cachePath.isEmpty() ? cachePath : "interfaceCache";
QNetworkDiskCache* cache = new QNetworkDiskCache();
cache->setMaximumCacheSize(MAXIMUM_CACHE_SIZE);
cache->setCacheDirectory(cachePath);
networkAccessManager.setCache(cache);
qCDebug(asset_client) << "MessagesClient disk cache setup at" << cachePath
<< "(size:" << MAXIMUM_CACHE_SIZE / BYTES_PER_GIGABYTES << "GB)";
}
}
bool haveMessagesMixer() {
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
if (!messagesMixer) {
qCWarning(messages_client) << "Could not complete MessagesClient operation "
<< "since you are not currently connected to a messages-mixer.";
return false;
}
return true;
}
void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
auto packetType = packet->getType();
if (packetType == PacketType::MessagesData) {
qDebug() << "got a messages packet";
}
}
void MessagesClient::sendMessage(const QString& channel, const QString& message) {
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
if (messagesMixer) {
auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true);
#if 0
auto messageID = ++_currentID;
packetList->writePrimitive(messageID);
packetList->writePrimitive(static_cast<uint8_t>(extension.length()));
packetList->write(extension.toLatin1().constData(), extension.length());
uint64_t size = data.length();
packetList->writePrimitive(size);
packetList->write(data.constData(), size);
nodeList->sendPacketList(std::move(packetList), *assetServer);
#endif
}
}
void MessagesClient::handleNodeKilled(SharedNodePointer node) {
if (node->getType() != NodeType::MessagesMixer) {
return;
}
}

View file

@ -0,0 +1,40 @@
//
// MessagesClient.h
// libraries/networking/src
//
// Created by Brad hefta-Gaub on 11/16/2015.
// Copyright 2015 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_MessagesClient_h
#define hifi_MessagesClient_h
#include <QString>
#include <DependencyManager.h>
#include "LimitedNodeList.h"
#include "NLPacket.h"
#include "Node.h"
class MessagesClient : public QObject, public Dependency {
Q_OBJECT
public:
MessagesClient();
Q_INVOKABLE void init();
Q_INVOKABLE void sendMessage(const QString& channel, const QString& message);
private slots:
void handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleNodeKilled(SharedNodePointer node);
private:
};
#endif

View file

@ -13,3 +13,4 @@
Q_LOGGING_CATEGORY(networking, "hifi.networking")
Q_LOGGING_CATEGORY(asset_client, "hifi.networking.asset_client")
Q_LOGGING_CATEGORY(messages_client, "hifi.networking.messages_client")

View file

@ -16,5 +16,6 @@
Q_DECLARE_LOGGING_CATEGORY(networking)
Q_DECLARE_LOGGING_CATEGORY(asset_client)
Q_DECLARE_LOGGING_CATEGORY(messages_client)
#endif // hifi_NetworkLogging_h

View file

@ -32,6 +32,7 @@ void NodeType::init() {
TypeNameHash.insert(NodeType::Agent, "Agent");
TypeNameHash.insert(NodeType::AudioMixer, "Audio Mixer");
TypeNameHash.insert(NodeType::AvatarMixer, "Avatar Mixer");
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
}

View file

@ -23,6 +23,7 @@ namespace NodeType {
const NodeType_t AudioMixer = 'M';
const NodeType_t AvatarMixer = 'W';
const NodeType_t AssetServer = 'A';
const NodeType_t MessagesMixer = 'm';
const NodeType_t Unassigned = 1;
void init();

View file

@ -119,9 +119,12 @@ void ThreadedAssignment::stopSendingStats() {
}
void ThreadedAssignment::checkInWithDomainServerOrExit() {
qDebug() << "ThreadedAssignment::checkInWithDomainServerOrExit()....";
if (DependencyManager::get<NodeList>()->getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS) {
qDebug() << "ThreadedAssignment::checkInWithDomainServerOrExit().... getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS";
setFinished(true);
} else {
qDebug() << "ThreadedAssignment::checkInWithDomainServerOrExit().... calling DependencyManager::get<NodeList>()->sendDomainServerCheckIn()";
DependencyManager::get<NodeList>()->sendDomainServerCheckIn();
}
}

View file

@ -79,7 +79,8 @@ enum class PacketType : uint8_t {
AssetUpload,
AssetUploadReply,
AssetGetInfo,
AssetGetInfoReply
AssetGetInfoReply,
MessagesData
};
const int NUM_BYTES_MD5_HASH = 16;