more work on messages

This commit is contained in:
Brad Hefta-Gaub 2015-11-16 17:00:03 -08:00
parent 40e69f6946
commit 12f206e2f0
5 changed files with 47 additions and 357 deletions

View file

@ -48,7 +48,7 @@ MessagesMixer::MessagesMixer(NLPacket& packet) :
connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &MessagesMixer::nodeKilled);
auto& packetReceiver = DependencyManager::get<NodeList>()->getPacketReceiver();
packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesDataPacket");
packetReceiver.registerListener(PacketType::MessagesData, this, "handleMessagesPacket");
}
MessagesMixer::~MessagesMixer() {
@ -64,341 +64,13 @@ MessagesMixer::~MessagesMixer() {
// assuming 60 htz update rate.
const float BILLBOARD_AND_IDENTITY_SEND_PROBABILITY = 1.0f / 187.0f;
void MessagesMixer::broadcastMessagesData() {
qDebug() << "MessagesMixer::broadcastMessagesData()...";
int idleTime = QDateTime::currentMSecsSinceEpoch() - _lastFrameTimestamp;
++_numStatFrames;
const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f;
const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f;
const float RATIO_BACK_OFF = 0.02f;
const int TRAILING_AVERAGE_FRAMES = 100;
int framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES;
const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES;
const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO;
// NOTE: The following code calculates the _performanceThrottlingRatio based on how much the messages-mixer was
// able to sleep. This will eventually be used to ask for an additional messages-mixer to help out. Currently the value
// is unused as it is assumed this should not be hit before the messages-mixer hits the desired bandwidth limit per client.
// It is reported in the domain-server stats for the messages-mixer.
_trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio)
+ (idleTime * CURRENT_FRAME_RATIO / (float) MESSAGES_DATA_SEND_INTERVAL_MSECS);
float lastCutoffRatio = _performanceThrottlingRatio;
bool hasRatioChanged = false;
if (framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) {
if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) {
// we're struggling - change our performance throttling ratio
_performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio));
qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
} else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) {
// we've recovered and can back off the performance throttling
_performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF;
if (_performanceThrottlingRatio < 0) {
_performanceThrottlingRatio = 0;
}
qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
}
if (hasRatioChanged) {
framesSinceCutoffEvent = 0;
}
}
if (!hasRatioChanged) {
++framesSinceCutoffEvent;
}
auto nodeList = DependencyManager::get<NodeList>();
// setup for distributed random floating point values
std::random_device randomDevice;
std::mt19937 generator(randomDevice());
std::uniform_real_distribution<float> distribution;
qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode()";
nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool {
if (!node->getLinkedData()) {
return false;
}
if (node->getType() != NodeType::Agent) {
return false;
}
if (!node->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& node) {
MessagesMixerClientData* nodeData = reinterpret_cast<MessagesMixerClientData*>(node->getLinkedData());
MutexTryLocker lock(nodeData->getMutex());
if (!lock.isLocked()) {
return;
}
++_sumListeners;
AvatarData& avatar = nodeData->getAvatar();
glm::vec3 myPosition = avatar.getPosition();
// reset the internal state for correct random number distribution
distribution.reset();
// reset the max distance for this frame
float maxAvatarDistanceThisFrame = 0.0f;
// reset the number of sent avatars
nodeData->resetNumAvatarsSentLastFrame();
// keep a counter of the number of considered avatars
int numOtherAvatars = 0;
// keep track of outbound data rate specifically for avatar data
int numAvatarDataBytes = 0;
// keep track of the number of other avatars held back in this frame
int numAvatarsHeldBack = 0;
// keep track of the number of other avatar frames skipped
int numAvatarsWithSkippedFrames = 0;
// use the data rate specifically for avatar data for FRD adjustment checks
float avatarDataRateLastSecond = nodeData->getOutboundAvatarDataKbps();
// Check if it is time to adjust what we send this client based on the observed
// bandwidth to this node. We do this once a second, which is also the window for
// the bandwidth reported by node->getOutboundBandwidth();
if (nodeData->getNumFramesSinceFRDAdjustment() > MESSAGES_MIXER_BROADCAST_FRAMES_PER_SECOND) {
const float FRD_ADJUSTMENT_ACCEPTABLE_RATIO = 0.8f;
const float HYSTERISIS_GAP = (1 - FRD_ADJUSTMENT_ACCEPTABLE_RATIO);
const float HYSTERISIS_MIDDLE_PERCENTAGE = (1 - (HYSTERISIS_GAP * 0.5f));
// get the current full rate distance so we can work with it
float currentFullRateDistance = nodeData->getFullRateDistance();
if (avatarDataRateLastSecond > _maxKbpsPerNode) {
// is the FRD greater than the farthest avatar?
// if so, before we calculate anything, set it to that distance
currentFullRateDistance = std::min(currentFullRateDistance, nodeData->getMaxAvatarDistance());
// we're adjusting the full rate distance to target a bandwidth in the middle
// of the hysterisis gap
currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond;
nodeData->setFullRateDistance(currentFullRateDistance);
nodeData->resetNumFramesSinceFRDAdjustment();
} else if (currentFullRateDistance < nodeData->getMaxAvatarDistance()
&& avatarDataRateLastSecond < _maxKbpsPerNode * FRD_ADJUSTMENT_ACCEPTABLE_RATIO) {
// we are constrained AND we've recovered to below the acceptable ratio
// lets adjust the full rate distance to target a bandwidth in the middle of the hyterisis gap
currentFullRateDistance *= (_maxKbpsPerNode * HYSTERISIS_MIDDLE_PERCENTAGE) / avatarDataRateLastSecond;
nodeData->setFullRateDistance(currentFullRateDistance);
nodeData->resetNumFramesSinceFRDAdjustment();
}
} else {
nodeData->incrementNumFramesSinceFRDAdjustment();
}
// setup a PacketList for the avatarPackets
auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData);
// this is an AGENT we have received head data from
// send back a packet with other active node data to this node
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
if (!otherNode->getLinkedData()) {
return false;
}
if (otherNode->getUUID() == node->getUUID()) {
return false;
}
return true;
},
[&](const SharedNodePointer& otherNode) {
++numOtherAvatars;
MessagesMixerClientData* otherNodeData = reinterpret_cast<MessagesMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
if (!lock.isLocked()) {
return;
}
// make sure we send out identity and billboard packets to and from new arrivals.
bool forceSend = !otherNodeData->checkAndSetHasReceivedFirstPacketsFrom(node->getUUID());
// we will also force a send of billboard or identity packet
// if either has changed in the last frame
if (otherNodeData->getBillboardChangeTimestamp() > 0
&& (forceSend
|| otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
QByteArray rfcUUID = otherNode->getUUID().toRfc4122();
QByteArray billboard = otherNodeData->getAvatar().getBillboard();
auto billboardPacket = NLPacket::create(PacketType::AvatarBillboard, rfcUUID.size() + billboard.size());
billboardPacket->write(rfcUUID);
billboardPacket->write(billboard);
nodeList->sendPacket(std::move(billboardPacket), *node);
++_sumBillboardPackets;
}
if (otherNodeData->getIdentityChangeTimestamp() > 0
&& (forceSend
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) {
QByteArray individualData = otherNodeData->getAvatar().identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122());
identityPacket->write(individualData);
nodeList->sendPacket(std::move(identityPacket), *node);
++_sumIdentityPackets;
}
AvatarData& otherAvatar = otherNodeData->getAvatar();
// Decide whether to send this avatar's data based on it's distance from us
// The full rate distance is the distance at which EVERY update will be sent for this avatar
// at twice the full rate distance, there will be a 50% chance of sending this avatar's update
glm::vec3 otherPosition = otherAvatar.getPosition();
float distanceToAvatar = glm::length(myPosition - otherPosition);
// potentially update the max full rate distance for this frame
maxAvatarDistanceThisFrame = std::max(maxAvatarDistanceThisFrame, distanceToAvatar);
if (distanceToAvatar != 0.0f
&& distribution(generator) > (nodeData->getFullRateDistance() / distanceToAvatar)) {
return;
}
AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(otherNode->getUUID());
AvatarDataSequenceNumber lastSeqFromSender = otherNodeData->getLastReceivedSequenceNumber();
if (lastSeqToReceiver > lastSeqFromSender && lastSeqToReceiver != UINT16_MAX) {
// we got out out of order packets from the sender, track it
otherNodeData->incrementNumOutOfOrderSends();
}
// make sure we haven't already sent this data from this sender to this receiver
// or that somehow we haven't sent
if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) {
++numAvatarsHeldBack;
return;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
++numAvatarsWithSkippedFrames;
}
// we're going to send this avatar
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
// set the last sent sequence number for this sender on the receiver
nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(),
otherNodeData->getLastReceivedSequenceNumber());
// start a new segment in the PacketList for this avatar
avatarPacketList->startSegment();
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
numAvatarDataBytes +=
avatarPacketList->write(otherAvatar.toByteArray(false, distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO));
avatarPacketList->endSegment();
});
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
// send the avatar data PacketList
nodeList->sendPacketList(std::move(avatarPacketList), *node);
// record the bytes sent for other avatar data in the MessagesMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
// record the number of avatars held back this frame
nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack);
nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames);
if (numOtherAvatars == 0) {
// update the full rate distance to FLOAT_MAX since we didn't have any other avatars to send
nodeData->setMaxAvatarDistance(FLT_MAX);
} else {
nodeData->setMaxAvatarDistance(maxAvatarDistanceThisFrame);
}
}
);
qDebug() << "MessagesMixer::broadcastMessagesData()... calling nodeList->eachMatchingNode() for encode...";
// We're done encoding this version of the otherAvatars. Update their "lastSent" joint-states so
// that we can notice differences, next time around.
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
if (!otherNode->getLinkedData()) {
return false;
}
if (otherNode->getType() != NodeType::Agent) {
return false;
}
if (!otherNode->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& otherNode) {
MessagesMixerClientData* otherNodeData = reinterpret_cast<MessagesMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
if (!lock.isLocked()) {
return;
}
AvatarData& otherAvatar = otherNodeData->getAvatar();
otherAvatar.doneEncoding(false);
});
_lastFrameTimestamp = QDateTime::currentMSecsSinceEpoch();
}
void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
qDebug() << "MessagesMixer::nodeKilled()... node:" << killedNode->getUUID();
if (killedNode->getType() == NodeType::Agent
&& killedNode->getLinkedData()) {
auto nodeList = DependencyManager::get<NodeList>();
// this was an avatar we were sending to other people
// send a kill packet for it to our other nodes
auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID);
killPacket->write(killedNode->getUUID().toRfc4122());
nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent);
// we also want to remove sequence number data for this avatar on our other avatars
// so invoke the appropriate method on the MessagesMixerClientData for other avatars
nodeList->eachMatchingNode(
@ -414,18 +86,32 @@ void MessagesMixer::nodeKilled(SharedNodePointer killedNode) {
return true;
},
[&](const SharedNodePointer& node) {
QMetaObject::invokeMethod(node->getLinkedData(),
"removeLastBroadcastSequenceNumber",
Qt::AutoConnection,
Q_ARG(const QUuid&, QUuid(killedNode->getUUID())));
qDebug() << "eachMatchingNode()... node:" << node->getUUID();
}
);
}
}
void MessagesMixer::handleMessagesDataPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
void MessagesMixer::handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
qDebug() << "MessagesMixer::handleMessagesPacket()... senderNode:" << senderNode->getUUID();
auto nodeList = DependencyManager::get<NodeList>();
nodeList->updateNodeWithDataFromPacket(packet, senderNode);
//nodeList->updateNodeWithDataFromPacket(packet, senderNode);
QByteArray data = packetList->getMessage();
auto packetType = packetList->getType();
if (packetType == PacketType::MessagesData) {
QString message = QString::fromUtf8(data);
qDebug() << "got a messages packet:" << message;
// this was an avatar we were sending to other people
// send a kill packet for it to our other nodes
//auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID);
//killPacket->write(killedNode->getUUID().toRfc4122());
//nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent);
}
}
void MessagesMixer::sendStatsPacket() {
@ -484,6 +170,7 @@ void MessagesMixer::run() {
node->setLinkedData(new MessagesMixerClientData());
};
/*
// setup the timer that will be fired on the broadcast thread
_broadcastTimer = new QTimer;
_broadcastTimer->setInterval(MESSAGES_DATA_SEND_INTERVAL_MSECS);
@ -492,6 +179,7 @@ void MessagesMixer::run() {
// connect appropriate signals and slots
connect(_broadcastTimer, &QTimer::timeout, this, &MessagesMixer::broadcastMessagesData, Qt::DirectConnection);
connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start()));
*/
// wait until we have the domain-server settings, otherwise we bail
DomainHandler& domainHandler = nodeList->getDomainHandler();
@ -516,7 +204,7 @@ void MessagesMixer::run() {
parseDomainServerSettings(domainHandler.getSettingsObject());
// start the broadcastThread
_broadcastThread.start();
//_broadcastThread.start();
}
void MessagesMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {

View file

@ -32,7 +32,7 @@ public slots:
void sendStatsPacket();
private slots:
void handleMessagesDataPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);
private:
void broadcastMessagesData();

View file

@ -70,6 +70,7 @@
#include <LogHandler.h>
#include <MainWindow.h>
#include <MessageDialog.h>
#include <MessagesClient.h>
#include <ModelEntityItem.h>
#include <NetworkAccessManager.h>
#include <NetworkingConstants.h>
@ -339,6 +340,7 @@ bool setupEssentials(int& argc, char** argv) {
DependencyManager::set<PathUtils>();
DependencyManager::set<InterfaceActionFactory>();
DependencyManager::set<AssetClient>();
DependencyManager::set<MessagesClient>();
DependencyManager::set<UserInputMapper>();
DependencyManager::set<controller::ScriptingInterface, ControllerScriptingInterface>();
return true;
@ -484,6 +486,14 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer) :
connect(assetThread, &QThread::started, assetClient.data(), &AssetClient::init);
assetThread->start();
// Setup MessagesClient
auto messagesClient = DependencyManager::get<MessagesClient>();
QThread* messagesThread = new QThread;
messagesThread->setObjectName("Messages Client Thread");
messagesClient->moveToThread(messagesThread);
connect(messagesThread, &QThread::started, messagesClient.data(), &MessagesClient::init);
messagesThread->start();
const DomainHandler& domainHandler = nodeList->getDomainHandler();
connect(&domainHandler, SIGNAL(hostnameChanged(const QString&)), SLOT(domainChanged(const QString&)));
@ -4019,6 +4029,7 @@ void Application::registerScriptEngineWithApplicationServices(ScriptEngine* scri
scriptEngine->registerFunction("HMD", "getHUDLookAtPosition3D", HMDScriptingInterface::getHUDLookAtPosition3D, 0);
scriptEngine->registerGlobalObject("Scene", DependencyManager::get<SceneScriptingInterface>().data());
scriptEngine->registerGlobalObject("Messages", DependencyManager::get<MessagesClient>().data());
scriptEngine->registerGlobalObject("ScriptDiscoveryService", this->getRunningScriptsWidget());

View file

@ -74,34 +74,25 @@ bool haveMessagesMixer() {
return true;
}
void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode) {
auto packetType = packet->getType();
void MessagesClient::handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode) {
QByteArray data = packetList->getMessage();
auto packetType = packetList->getType();
if (packetType == PacketType::MessagesData) {
qDebug() << "got a messages packet";
QString message = QString::fromUtf8(data);
qDebug() << "got a messages packet:" << message;
}
}
void MessagesClient::sendMessage(const QString& channel, const QString& message) {
qDebug() << "MessagesClient::sendMessage() channel:" << channel << "message:" << message;
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer messagesMixer = nodeList->soloNodeOfType(NodeType::MessagesMixer);
if (messagesMixer) {
auto packetList = NLPacketList::create(PacketType::MessagesData, QByteArray(), true, true);
#if 0
auto messageID = ++_currentID;
packetList->writePrimitive(messageID);
packetList->writePrimitive(static_cast<uint8_t>(extension.length()));
packetList->write(extension.toLatin1().constData(), extension.length());
uint64_t size = data.length();
packetList->writePrimitive(size);
packetList->write(data.constData(), size);
nodeList->sendPacketList(std::move(packetList), *assetServer);
#endif
packetList->write(message.toUtf8());
nodeList->sendPacketList(std::move(packetList), *messagesMixer);
}
}

View file

@ -31,7 +31,7 @@ public:
Q_INVOKABLE void sendMessage(const QString& channel, const QString& message);
private slots:
void handleMessagesPacket(QSharedPointer<NLPacket> packet, SharedNodePointer senderNode);
void handleMessagesPacket(QSharedPointer<NLPacketList> packetList, SharedNodePointer senderNode);
void handleNodeKilled(SharedNodePointer node);
private: