more cleanup, improved stats, port throttling

This commit is contained in:
Brad Hefta-Gaub 2017-02-18 12:29:01 -08:00
parent 71af81851e
commit 66a6666b52
8 changed files with 114 additions and 62 deletions

View file

@ -9,6 +9,15 @@
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// WORK ITEMS...
//
// 1) FIXME in AvatarMixerSlave.cpp -- otherNodeData->incrementNumOutOfOrderSends();
// This code appears to be determining if a node sent out of order packets, that logic should not be in
// the broadcast method, but would make more sense in the incoming packet processing section
//
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#include <cfloat>
#include <random>
#include <memory>
@ -116,38 +125,8 @@ void AvatarMixer::start() {
while (!_isFinished) {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// WORK ITEMS...
//
// DONE --- only sleep for remainder
// DONE --- clean up stats, add slave stats
// DONE --- out of view??? is it broken? - verified - it's working
// DONE --- hack to not send face data mostly seems to work...
// DONE --- fix two different versions of toByteArray()
// DONE --- audit the locking and side-effects to node, otherNode, and nodeData
// DONE --- delete dead code from mixer (now that it's in slave)
// DONE --- FIXME on sending identity packets
// DONE --- FIXME _maxKbpsPerNode
// DONE --- FIXME ++_sumListeners;
// DONE --- fix toByteArray() virtual hiding!!!
//
// 1) CPU throttling - now we're calculating it (like audio mixer, how to use it???)
//
// 2) Error in PacketList::writeData - attempted to write a segment to an unordered packet that is larger than the payload size.
// 2b) some kind of a better approach to handling otherAvatar.toByteArray() for content that is larger than MTU
// 3) better stats in the nodes:
// how many avatars are actually "in view" for the avtar in question (even if they are over bandwidth budget)
// 4) FIXME -- otherNodeData->incrementNumOutOfOrderSends();
// 5) average_identity_packets_per_frame???
//
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// calculates last frame duration and sleeps for the remainder of the target amount
auto frameDuration = timeFrame(frameTimestamp);
throttle(frameDuration, frame);
auto frameDuration = timeFrame(frameTimestamp); // calculates last frame duration and sleeps remainder of target amount
throttle(frameDuration, frame); // determines _throttlingRatio for upcoming mix frame
int lockWait, nodeTransform, functor;
@ -184,7 +163,7 @@ void AvatarMixer::start() {
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto start = usecTimestampNow();
_slavePool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode);
_slavePool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode, _throttlingRatio);
auto end = usecTimestampNow();
_broadcastAvatarDataInner += (end - start);
}, &lockWait, &nodeTransform, &functor);
@ -497,15 +476,28 @@ void AvatarMixer::sendStatsPacket() {
AvatarMixerSlaveStats aggregateStats;
QJsonObject slavesObject;
float secondsSinceLastStats = (float)(start - _lastStatsTime) / (float)USECS_PER_SECOND;
// gather stats
int slaveNumber = 1;
_slavePool.each([&](AvatarMixerSlave& slave) {
QJsonObject slaveObject;
AvatarMixerSlaveStats stats;
slave.harvestStats(stats);
slaveObject["nodesProcessed"] = TIGHT_LOOP_STAT(stats.nodesProcessed);
slaveObject["numPacketsReceived"] = TIGHT_LOOP_STAT(stats.packetsProcessed);
slaveObject["numPacketsSent"] = TIGHT_LOOP_STAT(stats.numPacketsSent);
slaveObject["recevied_1_nodesProcessed"] = TIGHT_LOOP_STAT(stats.nodesProcessed);
slaveObject["received_2_numPacketsReceived"] = TIGHT_LOOP_STAT(stats.packetsProcessed);
slaveObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(stats.nodesBroadcastedTo);
slaveObject["sent_2_numBytesSent"] = TIGHT_LOOP_STAT(stats.numBytesSent);
slaveObject["sent_3_numPacketsSent"] = TIGHT_LOOP_STAT(stats.numPacketsSent);
slaveObject["sent_4_numIdentityPackets"] = TIGHT_LOOP_STAT(stats.numIdentityPackets);
float averageNodes = ((float)stats.nodesBroadcastedTo / (float)tightLoopFrames);
float averageOutboundAvatarKbps = averageNodes ? ((stats.numBytesSent / secondsSinceLastStats) / BYTES_PER_KILOBIT) / averageNodes : 0.0f;
slaveObject["sent_5_averageOutboundAvatarKbps"] = averageOutboundAvatarKbps;
float averageOthersIncluded = averageNodes ? stats.numOthersIncluded / averageNodes : 0.0f;
slaveObject["sent_6_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded);
slaveObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(stats.processIncomingPacketsElapsedTime);
slaveObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(stats.ignoreCalculationElapsedTime);
@ -522,10 +514,21 @@ void AvatarMixer::sendStatsPacket() {
QJsonObject slavesAggregatObject;
slavesAggregatObject["nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed);
slavesAggregatObject["numPacketsReceived"] = TIGHT_LOOP_STAT(aggregateStats.packetsProcessed);
slavesAggregatObject["numPacketsSent"] = TIGHT_LOOP_STAT(aggregateStats.numPacketsSent);
slavesAggregatObject["recevied_1_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed);
slavesAggregatObject["received_2_numPacketsReceived"] = TIGHT_LOOP_STAT(aggregateStats.packetsProcessed);
slavesAggregatObject["sent_1_nodesBroadcastedTo"] = TIGHT_LOOP_STAT(aggregateStats.nodesBroadcastedTo);
slavesAggregatObject["sent_2_numBytesSent"] = TIGHT_LOOP_STAT(aggregateStats.numBytesSent);
slavesAggregatObject["sent_3_numPacketsSent"] = TIGHT_LOOP_STAT(aggregateStats.numPacketsSent);
slavesAggregatObject["sent_4_numIdentityPackets"] = TIGHT_LOOP_STAT(aggregateStats.numIdentityPackets);
float averageNodes = ((float)aggregateStats.nodesBroadcastedTo / (float)tightLoopFrames);
float averageOutboundAvatarKbps = averageNodes ? ((aggregateStats.numBytesSent / secondsSinceLastStats) / BYTES_PER_KILOBIT) / averageNodes : 0.0f;
slavesAggregatObject["sent_5_averageOutboundAvatarKbps"] = averageOutboundAvatarKbps;
float averageOthersIncluded = averageNodes ? aggregateStats.numOthersIncluded / averageNodes : 0.0f;
slavesAggregatObject["sent_6_averageOthersIncluded"] = TIGHT_LOOP_STAT(averageOthersIncluded);
slavesAggregatObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.processIncomingPacketsElapsedTime);
slavesAggregatObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.ignoreCalculationElapsedTime);
slavesAggregatObject["timing_3_toByteArray"] = TIGHT_LOOP_STAT_UINT64(aggregateStats.toByteArrayElapsedTime);
@ -600,6 +603,8 @@ void AvatarMixer::sendStatsPacket() {
auto end = usecTimestampNow();
_sendStatsElapsedTime = (end - start);
_lastStatsTime = start;
}
void AvatarMixer::run() {

View file

@ -104,6 +104,7 @@ private:
quint64 _processEventsElapsedTime { 0 };
quint64 _sendStatsElapsedTime { 0 };
quint64 _queueIncomingPacketElapsedTime { 0 };
quint64 _lastStatsTime { usecTimestampNow() };
RateCounter<> _loopRate; // this is the rate that the main thread tight loop runs

View file

@ -58,7 +58,6 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) {
return _avatar->parseDataFromBuffer(message.readWithoutCopy(message.getBytesLeftToRead()));
}
// FIXME -- this needs a mutex in new model.
bool AvatarMixerClientData::checkAndSetHasReceivedFirstPacketsFrom(const QUuid& uuid) {
if (_hasReceivedFirstPacketsFrom.find(uuid) == _hasReceivedFirstPacketsFrom.end()) {
_hasReceivedFirstPacketsFrom.insert(uuid);

View file

@ -10,6 +10,7 @@
//
#include <algorithm>
#include <random>
#include <glm/glm.hpp>
#include <glm/gtx/norm.hpp>
@ -37,11 +38,12 @@ void AvatarMixerSlave::configure(ConstIter begin, ConstIter end) {
void AvatarMixerSlave::configureBroadcast(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode) {
float maxKbpsPerNode, float throttlingRatio) {
_begin = begin;
_end = end;
_lastFrameTimestamp = lastFrameTimestamp;
_maxKbpsPerNode = maxKbpsPerNode;
_throttlingRatio = throttlingRatio;
}
void AvatarMixerSlave::harvestStats(AvatarMixerSlaveStats& stats) {
@ -61,8 +63,15 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) {
_stats.processIncomingPacketsElapsedTime += (end - start);
}
#include <random>
#include <TryLocker.h>
void AvatarMixerSlave::sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122());
identityPacket->write(individualData);
DependencyManager::get<NodeList>()->sendPacket(std::move(identityPacket), *destinationNode);
_stats.numIdentityPackets++;
}
static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
@ -88,10 +97,9 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
std::uniform_real_distribution<float> distribution;
if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
_stats.nodesBroadcastedTo++;
// FIXME -- mixer data
// ++_sumListeners;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
nodeData->resetInViewStats();
@ -249,11 +257,7 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < IDENTITY_SEND_PROBABILITY)) {
QByteArray individualData = otherNodeData->getConstAvatarData()->identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNodeData->getNodeID().toRfc4122());
identityPacket->write(individualData);
DependencyManager::get<NodeList>()->sendPacket(std::move(identityPacket), *node);
sendIdentityPacket(otherNodeData, node);
}
const AvatarData* otherAvatar = otherNodeData->getConstAvatarData();
@ -339,13 +343,13 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
}
{
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
bool includeThisAvatar = true;
auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID());
QVector<JointData>& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID());
bool distanceAdjust = true;
glm::vec3 viewerPosition = myPosition;
AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray
bool dropFaceTracking = true; // this is a hack for now... always drop face tracking
bool dropFaceTracking = false;
quint64 start = usecTimestampNow();
QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
@ -353,10 +357,30 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 end = usecTimestampNow();
_stats.toByteArrayElapsedTime += (end - start);
if (bytes.size() > 1400) {
qDebug() << "WARNING: otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size();
} else {
static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qDebug() << "WARNING: otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data";
dropFaceTracking = true; // first try dropping the facial data
bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qDebug() << "WARNING: otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData";
bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
}
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qDebug() << "WARNING: otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!";
includeThisAvatar = false;
}
}
if (includeThisAvatar) {
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
numAvatarDataBytes += avatarPacketList->write(bytes);
_stats.numOthersIncluded++;
}
}
@ -376,6 +400,7 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
avatarPacketList->closeCurrentPacket(true);
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
// send the avatar data PacketList
//qDebug() << "about to call nodeList->sendPacketList() for node:" << node;

View file

@ -12,13 +12,19 @@
#ifndef hifi_AvatarMixerSlave_h
#define hifi_AvatarMixerSlave_h
class AvatarMixerClientData;
class AvatarMixerSlaveStats {
public:
int nodesProcessed { 0 };
int packetsProcessed { 0 };
quint64 processIncomingPacketsElapsedTime { 0 };
int nodesBroadcastedTo { 0 };
int numPacketsSent { 0 };
int numBytesSent { 0 };
int numIdentityPackets { 0 };
int numOthersIncluded { 0 };
quint64 ignoreCalculationElapsedTime { 0 };
quint64 avatarDataPackingElapsedTime { 0 };
quint64 packetSendingElapsedTime { 0 };
@ -29,16 +35,22 @@ public:
// receiving job stats
nodesProcessed = 0;
packetsProcessed = 0;
numPacketsSent = 0;
processIncomingPacketsElapsedTime = 0;
// sending job stats
nodesBroadcastedTo = 0;
numPacketsSent = 0;
numBytesSent = 0;
numIdentityPackets = 0;
numOthersIncluded = 0;
ignoreCalculationElapsedTime = 0;
avatarDataPackingElapsedTime = 0;
packetSendingElapsedTime = 0;
toByteArrayElapsedTime = 0;
jobElapsedTime = 0;
//qDebug() << "reset!!! " << "_stats.numBytesSent:" << numBytesSent << "_stats.nodesBroadcastedTo:" << nodesBroadcastedTo;
}
AvatarMixerSlaveStats& operator+=(const AvatarMixerSlaveStats& rhs) {
@ -46,7 +58,11 @@ public:
packetsProcessed += rhs.packetsProcessed;
processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime;
nodesBroadcastedTo += rhs.nodesBroadcastedTo;
numPacketsSent += rhs.numPacketsSent;
numBytesSent += rhs.numBytesSent;
numIdentityPackets += rhs.numIdentityPackets;
numOthersIncluded += rhs.numOthersIncluded;
ignoreCalculationElapsedTime += rhs.ignoreCalculationElapsedTime;
avatarDataPackingElapsedTime += rhs.avatarDataPackingElapsedTime;
packetSendingElapsedTime += rhs.packetSendingElapsedTime;
@ -62,7 +78,9 @@ public:
using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end);
void configureBroadcast(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode);
void configureBroadcast(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio);
void processIncomingPackets(const SharedNodePointer& node);
void broadcastAvatarData(const SharedNodePointer& node);
@ -70,12 +88,15 @@ public:
void harvestStats(AvatarMixerSlaveStats& stats);
private:
void sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode);
// frame state
ConstIter _begin;
ConstIter _end;
p_high_resolution_clock::time_point _lastFrameTimestamp;
float _maxKbpsPerNode { 0.0f };
float _throttlingRatio { 0.0f };
AvatarMixerSlaveStats _stats;
};

View file

@ -75,10 +75,12 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end
run(begin, end);
}
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode) {
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio) {
_function = &AvatarMixerSlave::broadcastAvatarData;
_configure = [&](AvatarMixerSlave& slave) {
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode);
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio);
};
run(begin, end);
}

View file

@ -66,7 +66,7 @@ public:
// Jobs the slave pool can do...
void processIncomingPackets(ConstIter begin, ConstIter end);
void broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode);
p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode, float throttlingRatio);
// iterate over all slaves
void each(std::function<void(AvatarMixerSlave& slave)> functor);

View file

@ -370,7 +370,6 @@ public:
virtual QByteArray toByteArrayStateful(AvatarDataDetail dataDetail);
// FIXME
virtual QByteArray toByteArray(AvatarDataDetail dataDetail, quint64 lastSentTime, const QVector<JointData>& lastSentJointData,
AvatarDataPacket::HasFlags& hasFlagsOut, bool dropFaceTracking, bool distanceAdjust, glm::vec3 viewerPosition,
QVector<JointData>* sentJointDataOut, AvatarDataRate* outboundDataRateOut = nullptr) const;