checkpoint with eachNode() changed to nestedEach()

This commit is contained in:
Brad Hefta-Gaub 2017-02-15 22:14:26 -08:00
parent d22f4c1dd7
commit 2d300a6643
2 changed files with 137 additions and 144 deletions

View file

@ -37,8 +37,7 @@ const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
const unsigned int AVATAR_DATA_SEND_INTERVAL_MSECS = (1.0f / (float) AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND) * 1000;
AvatarMixer::AvatarMixer(ReceivedMessage& message) :
ThreadedAssignment(message),
_broadcastThread()
ThreadedAssignment(message)
{
// make sure we hear about node kills so we can tell the other nodes
connect(DependencyManager::get<NodeList>().data(), &NodeList::nodeKilled, this, &AvatarMixer::nodeKilled);
@ -66,12 +65,6 @@ void AvatarMixer::queueIncomingPacket(QSharedPointer<ReceivedMessage> message, S
AvatarMixer::~AvatarMixer() {
if (_broadcastTimer) {
_broadcastTimer->deleteLater();
}
_broadcastThread.quit();
_broadcastThread.wait();
}
// An 80% chance of sending a identity packet within a 5 second interval.
@ -135,6 +128,10 @@ void AvatarMixer::start() {
// this is where we need to put the real work...
{
// for now, call the single threaded version
broadcastAvatarData();
/*
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
@ -197,6 +194,16 @@ void AvatarMixer::manageDisplayName(const SharedNodePointer& node) {
}
}
void avatarLoops();
// only send extra avatar data (avatars out of view, ignored) every Nth AvatarData frame
// Extra avatar data will be sent (AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND/EXTRA_AVATAR_DATA_FRAME_RATIO) times
// per second.
// This value should be a power of two for performance purposes, as the mixer performs a modulo operation every frame
// to determine whether the extra data should be sent.
static const int EXTRA_AVATAR_DATA_FRAME_RATIO = 16;
// NOTE: some additional optimizations to consider.
// 1) use the view frustum to cull those avatars that are out of view. Since avatar data doesn't need to be present
// if the avatar is not in view or in the keyhole.
@ -224,13 +231,6 @@ void AvatarMixer::broadcastAvatarData() {
const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES;
const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO;
// only send extra avatar data (avatars out of view, ignored) every Nth AvatarData frame
// Extra avatar data will be sent (AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND/EXTRA_AVATAR_DATA_FRAME_RATIO) times
// per second.
// This value should be a power of two for performance purposes, as the mixer performs a modulo operation every frame
// to determine whether the extra data should be sent.
const int EXTRA_AVATAR_DATA_FRAME_RATIO = 16;
// NOTE: The following code calculates the _performanceThrottlingRatio based on how much the avatar-mixer was
// able to sleep. This will eventually be used to ask for an additional avatar-mixer to help out. Currently the value
// is unused as it is assumed this should not be hit before the avatar-mixer hits the desired bandwidth limit per client.
@ -272,6 +272,35 @@ void AvatarMixer::broadcastAvatarData() {
++framesSinceCutoffEvent;
}
avatarLoops();
_lastFrameTimestamp = p_high_resolution_clock::now();
#ifdef WANT_DEBUG
auto sinceLastDebug = p_high_resolution_clock::now() - _lastDebugMessage;
auto sinceLastDebugUsecs = std::chrono::duration_cast<std::chrono::microseconds>(sinceLastDebug).count();
quint64 DEBUG_INTERVAL = USECS_PER_SECOND * 5;
if (sinceLastDebugUsecs > DEBUG_INTERVAL) {
qDebug() << "broadcast rate:" << _broadcastRate.rate() << "hz";
_lastDebugMessage = p_high_resolution_clock::now();
}
#endif
quint64 endBroadcastAvatarData = usecTimestampNow();
_broadcastAvatarDataElapsedTime += (endBroadcastAvatarData - startBroadcastAvatarData);
}
void avatarLoopsInner(NodeList::const_iterator cbegin, NodeList::const_iterator cend);
void avatarLoops() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
avatarLoopsInner(cbegin, cend);
});
}
void avatarLoopsInner(NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto nodeList = DependencyManager::get<NodeList>();
// setup for distributed random floating point values
@ -279,26 +308,19 @@ void AvatarMixer::broadcastAvatarData() {
std::mt19937 generator(randomDevice());
std::uniform_real_distribution<float> distribution;
nodeList->eachMatchingNode(
[&](const SharedNodePointer& node)->bool {
if (!node->getLinkedData()) {
return false;
}
if (node->getType() != NodeType::Agent) {
return false;
}
if (!node->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& node) {
std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
MutexTryLocker lock(nodeData->getMutex());
// FIXME????
if (!lock.isLocked()) {
return;
}
++_sumListeners;
// FIXME -- mixer data
// ++_sumListeners;
nodeData->resetInViewStats();
const AvatarData& avatar = nodeData->getAvatar();
@ -349,6 +371,8 @@ void AvatarMixer::broadcastAvatarData() {
// get the current full rate distance so we can work with it
float currentFullRateDistance = nodeData->getFullRateDistance();
// FIXME -- mixer data
float _maxKbpsPerNode = 5000.0f;
if (avatarDataRateLastSecond > _maxKbpsPerNode) {
// is the FRD greater than the farthest avatar?
@ -379,82 +403,81 @@ void AvatarMixer::broadcastAvatarData() {
// this is an AGENT we have received head data from
// send back a packet with other active node data to this node
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
std::for_each(cbegin, cend, [&](const SharedNodePointer& otherNode) {
bool shouldConsider = false;
quint64 startIgnoreCalculation = usecTimestampNow();
bool shouldConsider = false;
quint64 startIgnoreCalculation = usecTimestampNow();
// make sure we have data for this avatar, that it isn't the same node,
// and isn't an avatar that the viewing node has ignored
// or that has ignored the viewing node
if (!otherNode->getLinkedData()
|| otherNode->getUUID() == node->getUUID()
|| (node->isIgnoringNodeWithID(otherNode->getUUID()) && !getsIgnoredByMe)
|| (otherNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) {
// make sure we have data for this avatar, that it isn't the same node,
// and isn't an avatar that the viewing node has ignored
// or that has ignored the viewing node
if (!otherNode->getLinkedData()
|| otherNode->getUUID() == node->getUUID()
|| (node->isIgnoringNodeWithID(otherNode->getUUID()) && !getsIgnoredByMe)
|| (otherNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) {
shouldConsider = false;
shouldConsider = false;
} else {
AvatarMixerClientData* otherData = reinterpret_cast<AvatarMixerClientData*>(otherNode->getLinkedData());
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
// Check to see if the space bubble is enabled
if (node->isIgnoreRadiusEnabled() || otherNode->isIgnoreRadiusEnabled()) {
// Define the minimum bubble size
static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f);
// Define the scale of the box for the current node
glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Define the scale of the box for the current other node
glm::vec3 otherNodeBoxScale = (otherData->getPosition() - otherData->getGlobalBoundingBoxCorner()) * 2.0f;
} else {
AvatarMixerClientData* otherData = reinterpret_cast<AvatarMixerClientData*>(otherNode->getLinkedData());
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
// Check to see if the space bubble is enabled
if (node->isIgnoreRadiusEnabled() || otherNode->isIgnoreRadiusEnabled()) {
// Define the minimum bubble size
static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f);
// Define the scale of the box for the current node
glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Define the scale of the box for the current other node
glm::vec3 otherNodeBoxScale = (otherData->getPosition() - otherData->getGlobalBoundingBoxCorner()) * 2.0f;
// Set up the bounding box for the current node
AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) {
nodeBox.setScaleStayCentered(minBubbleSize);
}
// Set up the bounding box for the current other node
AABox otherNodeBox(otherData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) {
otherNodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
nodeBox.embiggen(4.0f);
otherNodeBox.embiggen(4.0f);
// Perform the collision check between the two bounding boxes
if (nodeBox.touches(otherNodeBox)) {
nodeData->ignoreOther(node, otherNode);
shouldConsider = getsAnyIgnored;
}
// Set up the bounding box for the current node
AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) {
nodeBox.setScaleStayCentered(minBubbleSize);
}
// Not close enough to ignore
if (shouldConsider) {
nodeData->removeFromRadiusIgnoringSet(node, otherNode->getUUID());
// Set up the bounding box for the current other node
AABox otherNodeBox(otherData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) {
otherNodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
nodeBox.embiggen(4.0f);
otherNodeBox.embiggen(4.0f);
shouldConsider = true;
// Perform the collision check between the two bounding boxes
if (nodeBox.touches(otherNodeBox)) {
nodeData->ignoreOther(node, otherNode);
shouldConsider = getsAnyIgnored;
}
}
// Not close enough to ignore
if (shouldConsider) {
nodeData->removeFromRadiusIgnoringSet(node, otherNode->getUUID());
}
shouldConsider = true;
quint64 endIgnoreCalculation = usecTimestampNow();
_ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation);
return shouldConsider;
},
[&](const SharedNodePointer& otherNode) {
// FIXME -- mixer data
//_ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation);
}
if (shouldConsider) {
quint64 startAvatarDataPacking = usecTimestampNow();
++numOtherAvatars;
AvatarMixerClientData* otherNodeData = reinterpret_cast<AvatarMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
// FIXME -- might want to track this lock failed...
if (!lock.isLocked()) {
// FIXME -- might want to track this lock failed...
quint64 endAvatarDataPacking = usecTimestampNow();
_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
// FIXME - mixer data
//_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
return;
}
@ -463,9 +486,17 @@ void AvatarMixer::broadcastAvatarData() {
if (otherNodeData->getIdentityChangeTimestamp().time_since_epoch().count() > 0
&& (forceSend
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
//|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp // FIXME - mixer data
|| distribution(generator) < IDENTITY_SEND_PROBABILITY)) {
sendIdentityPacket(otherNodeData, node);
// FIXME --- used to be.../ mixer data dependency
//sendIdentityPacket(otherNodeData, node);
QByteArray individualData = otherNodeData->getAvatar().identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNodeData->getNodeID().toRfc4122());
identityPacket->write(individualData);
DependencyManager::get<NodeList>()->sendPacket(std::move(identityPacket), *node);
}
const AvatarData& otherAvatar = otherNodeData->getAvatar();
@ -484,7 +515,8 @@ void AvatarMixer::broadcastAvatarData() {
&& distribution(generator) > (nodeData->getFullRateDistance() / distanceToAvatar)) {
quint64 endAvatarDataPacking = usecTimestampNow();
_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
// FIXME - mixer data
//_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
return;
}
@ -502,7 +534,8 @@ void AvatarMixer::broadcastAvatarData() {
++numAvatarsHeldBack;
quint64 endAvatarDataPacking = usecTimestampNow();
_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
// FIXME - mixer data
//_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
return;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
@ -526,7 +559,8 @@ void AvatarMixer::broadcastAvatarData() {
// this throttles the extra data to only be sent every Nth message
if (!isInView && getsOutOfView && (lastSeqToReceiver % EXTRA_AVATAR_DATA_FRAME_RATIO > 0)) {
quint64 endAvatarDataPacking = usecTimestampNow();
_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
// FIXME - mixer data
//_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
return;
}
@ -554,7 +588,9 @@ void AvatarMixer::broadcastAvatarData() {
avatarPacketList->endSegment();
quint64 endAvatarDataPacking = usecTimestampNow();
_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
// FIXME - mixer data
//_avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
}
});
quint64 startPacketSending = usecTimestampNow();
@ -580,30 +616,19 @@ void AvatarMixer::broadcastAvatarData() {
}
quint64 endPacketSending = usecTimestampNow();
_packetSendingElapsedTime += (endPacketSending - startPacketSending);
// FIXME - mixer data
//_packetSendingElapsedTime += (endPacketSending - startPacketSending);
}
);
});
// We're done encoding this version of the otherAvatars. Update their "lastSent" joint-states so
// that we can notice differences, next time around.
//
// FIXME - this seems suspicious, the code seems to consider all avatars, but not all avatars will
// have had their joints sent, so actually we should consider the time since they actually were sent????
nodeList->eachMatchingNode(
[&](const SharedNodePointer& otherNode)->bool {
if (!otherNode->getLinkedData()) {
return false;
}
if (otherNode->getType() != NodeType::Agent) {
return false;
}
if (!otherNode->getActiveSocket()) {
return false;
}
return true;
},
[&](const SharedNodePointer& otherNode) {
std::for_each(cbegin, cend, [&](const SharedNodePointer& otherNode) {
if (otherNode->getLinkedData() && (otherNode->getType() == NodeType::Agent) && otherNode->getActiveSocket()) {
AvatarMixerClientData* otherNodeData = reinterpret_cast<AvatarMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
if (!lock.isLocked()) {
@ -611,24 +636,8 @@ void AvatarMixer::broadcastAvatarData() {
}
AvatarData& otherAvatar = otherNodeData->getAvatar();
otherAvatar.doneEncoding(false);
});
_lastFrameTimestamp = p_high_resolution_clock::now();
#ifdef WANT_DEBUG
auto sinceLastDebug = p_high_resolution_clock::now() - _lastDebugMessage;
auto sinceLastDebugUsecs = std::chrono::duration_cast<std::chrono::microseconds>(sinceLastDebug).count();
quint64 DEBUG_INTERVAL = USECS_PER_SECOND * 5;
if (sinceLastDebugUsecs > DEBUG_INTERVAL) {
qDebug() << "broadcast rate:" << _broadcastRate.rate() << "hz";
_lastDebugMessage = p_high_resolution_clock::now();
}
#endif
quint64 endBroadcastAvatarData = usecTimestampNow();
_broadcastAvatarDataElapsedTime += (endBroadcastAvatarData - startBroadcastAvatarData);
}
});
}
void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
@ -894,18 +903,6 @@ void AvatarMixer::run() {
ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer);
// setup the timer that will be fired on the broadcast thread
_broadcastTimer = new QTimer;
_broadcastTimer->setTimerType(Qt::PreciseTimer);
_broadcastTimer->setInterval(AVATAR_DATA_SEND_INTERVAL_MSECS);
_broadcastTimer->moveToThread(&_broadcastThread);
// connect appropriate signals and slots
connect(_broadcastTimer, &QTimer::timeout, this, &AvatarMixer::broadcastAvatarData, Qt::DirectConnection);
connect(&_broadcastThread, SIGNAL(started()), _broadcastTimer, SLOT(start()));
// start our tight loop...
start();
}
AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node) {
@ -929,8 +926,8 @@ void AvatarMixer::domainSettingsRequestComplete() {
// parse the settings to pull out the values we need
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
// start the broadcastThread
_broadcastThread.start();
// start our tight loop...
start();
}
void AvatarMixer::handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID) {

View file

@ -62,8 +62,6 @@ private:
void manageDisplayName(const SharedNodePointer& node);
QThread _broadcastThread;
p_high_resolution_clock::time_point _lastFrameTimestamp;
float _trailingSleepRatio { 1.0f };
@ -79,8 +77,6 @@ private:
float _domainMinimumScale { MIN_AVATAR_SCALE };
float _domainMaximumScale { MAX_AVATAR_SCALE };
QTimer* _broadcastTimer = nullptr;
RateCounter<> _broadcastRate;
p_high_resolution_clock::time_point _lastDebugMessage;
QHash<QString, QPair<int, int>> _sessionDisplayNames;