some cleanup

This commit is contained in:
Brad Hefta-Gaub 2017-02-17 21:38:30 -08:00
parent c2c843d841
commit 4570b3439d
5 changed files with 42 additions and 44 deletions

View file

@ -120,20 +120,25 @@ void AvatarMixer::start() {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// DO THIS FIRST!!!!!!!!
// WORK ITEMS...
//
// DONE --- 1) only sleep for remainder
// DONE --- 2) clean up stats, add slave stats
// DONE --- 3) out of view??? is it broken? - verified - it's working
// 4) Error in PacketList::writeData - attempted to write a segment to an unordered packet that is larger than the payload size.
// DONE --- 4a) hack to not send face data mostly seems to work...
// DONE --- 5) fix two different versions of toByteArray()
// DONE --- 7) audit the locking and side-effects to node, otherNode, and nodeData
// DONE --- 8) delete dead code from mixer (now that it's in slave)
// DONE --- 10) FIXME on sending identity packets
// DONE --- 12) FIXME _maxKbpsPerNode
// DONE --- 11) FIXME ++_sumListeners;
//
// 4) Error in PacketList::writeData - attempted to write a segment to an unordered packet that is larger than the payload size.
// 4b) some kind of a better approach to handling otherAvatar.toByteArray() for content that is larger than MTU
// 5) fix two different versions of toByteArray()
// 6) throttling??
// 7) audit the locking and side-effects to node, otherNode, and nodeData
// 8) delete dead code from mixer (now that it's in slave)
// 6) CPU throttling??
// 9) better stats in the nodes:
// how many avatars are actually "in view" for the avtar in question (even if they are over bandwidth budget)
// 13) FIXME -- otherNodeData->incrementNumOutOfOrderSends();
//
//
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -157,8 +162,6 @@ void AvatarMixer::start() {
}, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsElapsedTime += (end - start);
//qDebug() << "PROCESS PACKETS... " << "lockWait:" << lockWait << "nodeTransform:" << nodeTransform << "functor:" << functor;
}
// process pending display names... this doesn't currently run on multiple threads, because it
@ -168,23 +171,19 @@ void AvatarMixer::start() {
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
manageDisplayName(node);
++_sumListeners;
});
}, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow();
_displayNameManagementElapsedTime += (end - start);
//qDebug() << "PROCESS PACKETS... " << "lockWait:" << lockWait << "nodeTransform:" << nodeTransform << "functor:" << functor;
}
// this is where we need to put the real work...
{
// for now, call the single threaded version
//broadcastAvatarData();
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto start = usecTimestampNow();
_slavePool.broadcastAvatarData(cbegin, cend);
_slavePool.broadcastAvatarData(cbegin, cend, _lastFrameTimestamp, _maxKbpsPerNode);
auto end = usecTimestampNow();
_broadcastAvatarDataInner += (end - start);
}, &lockWait, &nodeTransform, &functor);
@ -210,6 +209,9 @@ void AvatarMixer::start() {
auto end = usecTimestampNow();
_processEventsElapsedTime += (end - start);
}
_lastFrameTimestamp = frameTimestamp;
}
}
@ -461,7 +463,6 @@ void AvatarMixer::sendStatsPacket() {
QJsonObject statsObject;
//statsObject["average_listeners_last_second"] = (float) _sumListeners / (float) _numStatFrames;
//statsObject["average_identity_packets_per_frame"] = (float) _sumIdentityPackets / (float) _numStatFrames;
statsObject["broadcast_loop_rate"] = _loopRate.rate();
@ -476,6 +477,8 @@ void AvatarMixer::sendStatsPacket() {
#define TIGHT_LOOP_STAT(x) (x > tenTimesPerFrame) ? x / tightLoopFrames : ((float)x / (float)tightLoopFrames);
#define TIGHT_LOOP_STAT_UINT64(x) (x > (quint64)tenTimesPerFrame) ? x / tightLoopFrames : ((float)x / (float)tightLoopFrames);
statsObject["average_listeners_last_second"] = TIGHT_LOOP_STAT(_sumListeners);
QJsonObject singleCoreTasks;
singleCoreTasks["processEvents"] = TIGHT_LOOP_STAT_UINT64(_processEventsElapsedTime);
singleCoreTasks["queueIncomingPacket"] = TIGHT_LOOP_STAT_UINT64(_queueIncomingPacketElapsedTime);

View file

@ -35,6 +35,15 @@ void AvatarMixerSlave::configure(ConstIter begin, ConstIter end) {
_end = end;
}
void AvatarMixerSlave::configureBroadcast(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode) {
_begin = begin;
_end = end;
_lastFrameTimestamp = lastFrameTimestamp;
_maxKbpsPerNode = maxKbpsPerNode;
}
void AvatarMixerSlave::harvestStats(AvatarMixerSlaveStats& stats) {
stats = _stats;
_stats.reset();
@ -71,8 +80,6 @@ const float IDENTITY_SEND_PROBABILITY = 1.0f / 187.0f; // FIXME... this is wrong
void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 start = usecTimestampNow();
//qDebug() << __FUNCTION__ << "node:" << node;
auto nodeList = DependencyManager::get<NodeList>();
// setup for distributed random floating point values
@ -82,13 +89,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
//MutexTryLocker lock(nodeData->getMutex());
// FIXME????
//if (!lock.isLocked()) {
//qDebug() << __FUNCTION__ << "unable to lock... node:" << node << " would BAIL???... line:" << __LINE__;
//return;
//}
// FIXME -- mixer data
// ++_sumListeners;
@ -143,8 +143,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
// get the current full rate distance so we can work with it
float currentFullRateDistance = nodeData->getFullRateDistance();
// FIXME -- mixer data
float _maxKbpsPerNode = 5000.0f;
if (avatarDataRateLastSecond > _maxKbpsPerNode) {
// is the FRD greater than the farthest avatar?
@ -176,7 +174,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
// this is an AGENT we have received head data from
// send back a packet with other active node data to this node
std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) {
//qDebug() << __FUNCTION__ << "inner loop, node:" << node << "otherNode:" << otherNode;
bool shouldConsider = false;
quint64 startIgnoreCalculation = usecTimestampNow();
@ -236,9 +233,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
_stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation);
}
//qDebug() << __FUNCTION__ << "inner loop, node:" << node << "otherNode:" << otherNode << "shouldConsider:" << shouldConsider;
if (shouldConsider) {
quint64 startAvatarDataPacking = usecTimestampNow();
@ -247,24 +241,19 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
const AvatarMixerClientData* otherNodeData = reinterpret_cast<const AvatarMixerClientData*>(otherNode->getLinkedData());
// make sure we send out identity packets to and from new arrivals.
// FIXME this is where our crash was on friday!!
// this is getting called on multiple threads... needs mutex of better solution
bool forceSend = !nodeData->checkAndSetHasReceivedFirstPacketsFrom(otherNode->getUUID());
// FIXME - this clause seems suspicious "... || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp ..."
if (otherNodeData->getIdentityChangeTimestamp().time_since_epoch().count() > 0
&& (forceSend
//|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp // FIXME - mixer data
|| otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp
|| distribution(generator) < IDENTITY_SEND_PROBABILITY)) {
// FIXME --- used to be.../ mixer data dependency
//sendIdentityPacket(otherNodeData, node);
QByteArray individualData = otherNodeData->getConstAvatarData()->identityByteArray();
auto identityPacket = NLPacket::create(PacketType::AvatarIdentity, individualData.size());
individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNodeData->getNodeID().toRfc4122());
identityPacket->write(individualData);
DependencyManager::get<NodeList>()->sendPacket(std::move(identityPacket), *node);
//qDebug() << __FUNCTION__ << "inner loop, node:" << node << "otherNode:" << otherNode << " sending itentity packet for otherNode to node...";
}
const AvatarData* otherAvatar = otherNodeData->getConstAvatarData();
@ -285,7 +274,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
}
@ -308,7 +296,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
@ -334,7 +321,6 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
}

View file

@ -62,6 +62,7 @@ public:
using ConstIter = NodeList::const_iterator;
void configure(ConstIter begin, ConstIter end);
void configureBroadcast(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode);
void processIncomingPackets(const SharedNodePointer& node);
void broadcastAvatarData(const SharedNodePointer& node);
@ -73,6 +74,9 @@ private:
ConstIter _begin;
ConstIter _end;
p_high_resolution_clock::time_point _lastFrameTimestamp;
float _maxKbpsPerNode { 0.0f };
AvatarMixerSlaveStats _stats;
};

View file

@ -69,13 +69,17 @@ static AvatarMixerSlave slave;
void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end) {
_function = &AvatarMixerSlave::processIncomingPackets;
_configure = [&](AvatarMixerSlave& slave) { slave.configure(begin, end); };
_configure = [&](AvatarMixerSlave& slave) {
slave.configure(begin, end);
};
run(begin, end);
}
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end) {
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end, p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode) {
_function = &AvatarMixerSlave::broadcastAvatarData;
_configure = [&](AvatarMixerSlave& slave) { slave.configure(begin, end); };
_configure = [&](AvatarMixerSlave& slave) {
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode);
};
run(begin, end);
}

View file

@ -65,7 +65,8 @@ public:
// Jobs the slave pool can do...
void processIncomingPackets(ConstIter begin, ConstIter end);
void broadcastAvatarData(ConstIter begin, ConstIter end);
void broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp, float maxKbpsPerNode);
// iterate over all slaves
void each(std::function<void(AvatarMixerSlave& slave)> functor);