more work on multi-core

This commit is contained in:
Brad Hefta-Gaub 2017-02-16 13:25:51 -08:00
parent 0a48ea75a7
commit d532b3a4b8
5 changed files with 91 additions and 37 deletions

View file

@ -134,17 +134,22 @@ void AvatarMixer::start() {
// calculates last frame duration and sleeps for the remainder of the target amount
auto frameDuration = timeFrame(frameTimestamp);
int lockWait, nodeTransform, functor;
// Allow nodes to process any pending/queued packets across our worker threads
{
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsLockWaitElapsedTime += (end - start);
_slavePool.processIncomingPackets(cbegin, cend);
});
}, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow();
_processQueuedAvatarDataPacketsElapsedTime += (end - start);
//qDebug() << "PROCESS PACKETS... " << "lockWait:" << lockWait << "nodeTransform:" << nodeTransform << "functor:" << functor;
}
// process pending display names... this doesn't currently run on multiple threads, because it
@ -155,9 +160,11 @@ void AvatarMixer::start() {
std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
manageDisplayName(node);
});
});
}, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow();
_displayNameManagementElapsedTime += (end - start);
//qDebug() << "PROCESS PACKETS... " << "lockWait:" << lockWait << "nodeTransform:" << nodeTransform << "functor:" << functor;
}
// this is where we need to put the real work...
@ -167,10 +174,17 @@ void AvatarMixer::start() {
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
auto start = usecTimestampNow();
_slavePool.anotherJob(cbegin, cend);
});
auto end = usecTimestampNow();
_broadcastAvatarDataInner += (end - start);
}, &lockWait, &nodeTransform, &functor);
auto end = usecTimestampNow();
_broadcastAvatarDataElapsedTime += (end - start);
_broadcastAvatarDataLockWait += lockWait;
_broadcastAvatarDataNodeTransform += nodeTransform;
_broadcastAvatarDataNodeFunctor += functor;
}
@ -239,9 +253,6 @@ static const int EXTRA_AVATAR_DATA_FRAME_RATIO = 16;
// this "throttle" logic is the old approach. need to consider some
// reasonable throttle approach in new multi-core design
void AvatarMixer::broadcastAvatarData() {
quint64 startBroadcastAvatarData = usecTimestampNow();
_broadcastRate.increment();
int idleTime = AVATAR_DATA_SEND_INTERVAL_MSECS;
if (_lastFrameTimestamp.time_since_epoch().count() > 0) {
@ -317,9 +328,6 @@ void AvatarMixer::broadcastAvatarData() {
_lastDebugMessage = p_high_resolution_clock::now();
}
#endif
quint64 endBroadcastAvatarData = usecTimestampNow();
_broadcastAvatarDataElapsedTime += (endBroadcastAvatarData - startBroadcastAvatarData);
}
void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
@ -473,7 +481,12 @@ void AvatarMixer::sendStatsPacket() {
statsObject["timing_average_y_processEvents"] = TIGHT_LOOP_STAT(_processEventsElapsedTime);
statsObject["timing_average_y_queueIncomingPacket"] = TIGHT_LOOP_STAT(_queueIncomingPacketElapsedTime);
statsObject["timing_average_z_broadcastAvatarData"] = TIGHT_LOOP_STAT(_broadcastAvatarDataElapsedTime);
statsObject["timing_average_a1_broadcastAvatarData"] = TIGHT_LOOP_STAT(_broadcastAvatarDataElapsedTime);
statsObject["timing_average_a2_innnerBroadcastAvatarData"] = TIGHT_LOOP_STAT(_broadcastAvatarDataInner);
statsObject["timing_average_a3_broadcastAvatarDataLockWait"] = TIGHT_LOOP_STAT(_broadcastAvatarDataLockWait);
statsObject["timing_average_a4_broadcastAvatarDataNodeTransform"] = TIGHT_LOOP_STAT(_broadcastAvatarDataNodeTransform);
statsObject["timing_average_a5_broadcastAvatarDataNodeFunctor"] = TIGHT_LOOP_STAT(_broadcastAvatarDataNodeFunctor);
statsObject["timing_average_z_displayNameManagement"] = TIGHT_LOOP_STAT(_displayNameManagementElapsedTime);
statsObject["timing_average_z_handleAvatarDataPacket"] = TIGHT_LOOP_STAT(_handleAvatarDataPacketElapsedTime);
statsObject["timing_average_z_handleAvatarIdentityPacket"] = TIGHT_LOOP_STAT(_handleAvatarIdentityPacketElapsedTime);
@ -499,10 +512,13 @@ void AvatarMixer::sendStatsPacket() {
slave.harvestStats(stats);
slaveObject["nodesProcessed"] = TIGHT_LOOP_STAT(stats.nodesProcessed);
slaveObject["packetsProcessed"] = TIGHT_LOOP_STAT(stats.packetsProcessed);
slaveObject["processIncomingPackets"] = TIGHT_LOOP_STAT(stats.processIncomingPacketsElapsedTime);
slaveObject["ignoreCalculation"] = TIGHT_LOOP_STAT(stats.ignoreCalculationElapsedTime);
slaveObject["avatarDataPacking"] = TIGHT_LOOP_STAT(stats.avatarDataPackingElapsedTime);
slaveObject["packetSending"] = TIGHT_LOOP_STAT(stats.packetSendingElapsedTime);
slaveObject["timing_1_processIncomingPackets"] = TIGHT_LOOP_STAT(stats.processIncomingPacketsElapsedTime);
slaveObject["timing_2_ignoreCalculation"] = TIGHT_LOOP_STAT(stats.ignoreCalculationElapsedTime);
slaveObject["timing_3_avatarDataPacking"] = TIGHT_LOOP_STAT(stats.avatarDataPackingElapsedTime);
slaveObject["timing_4_packetSending"] = TIGHT_LOOP_STAT(stats.packetSendingElapsedTime);
slaveObject["timing_5_jobElapsedTime"] = TIGHT_LOOP_STAT(stats.jobElapsedTime);
slavesObject[QString::number(slaveNumber)] = slaveObject;
slaveNumber++;
@ -512,12 +528,13 @@ void AvatarMixer::sendStatsPacket() {
statsObject["timing_slaves"] = slavesObject;
// broadcastAvatarDataElapsed timing details...
statsObject["timing_aggregate_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed);
statsObject["timing_aggregate_packetsProcessed"] = TIGHT_LOOP_STAT(aggregateStats.packetsProcessed);
statsObject["timing_aggregate_processIncomingPackets"] = TIGHT_LOOP_STAT(aggregateStats.processIncomingPacketsElapsedTime);
statsObject["timing_aggregate_ignoreCalculation"] = TIGHT_LOOP_STAT(aggregateStats.ignoreCalculationElapsedTime);
statsObject["timing_aggregate_avatarDataPacking"] = TIGHT_LOOP_STAT(aggregateStats.avatarDataPackingElapsedTime);
statsObject["timing_aggregate_packetSending"] = TIGHT_LOOP_STAT(aggregateStats.packetSendingElapsedTime);
statsObject["aggregate_nodesProcessed"] = TIGHT_LOOP_STAT(aggregateStats.nodesProcessed);
statsObject["aggregate_packetsProcessed"] = TIGHT_LOOP_STAT(aggregateStats.packetsProcessed);
statsObject["timing_aggregate_1_processIncomingPackets"] = TIGHT_LOOP_STAT(aggregateStats.processIncomingPacketsElapsedTime);
statsObject["timing_aggregate_2_ignoreCalculation"] = TIGHT_LOOP_STAT(aggregateStats.ignoreCalculationElapsedTime);
statsObject["timing_aggregate_3_avatarDataPacking"] = TIGHT_LOOP_STAT(aggregateStats.avatarDataPackingElapsedTime);
statsObject["timing_aggregate_4_packetSending"] = TIGHT_LOOP_STAT(aggregateStats.packetSendingElapsedTime);
statsObject["timing_aggregate_4_jobElapsedTime"] = TIGHT_LOOP_STAT(aggregateStats.jobElapsedTime);
_handleViewFrustumPacketElapsedTime = 0;
_handleAvatarDataPacketElapsedTime = 0;
@ -571,6 +588,11 @@ void AvatarMixer::sendStatsPacket() {
_numTightLoopFrames = 0;
_broadcastAvatarDataElapsedTime = 0;
_broadcastAvatarDataInner = 0;
_broadcastAvatarDataLockWait = 0;
_broadcastAvatarDataNodeTransform = 0;
_broadcastAvatarDataNodeFunctor = 0;
_displayNameManagementElapsedTime = 0;
_ignoreCalculationElapsedTime = 0;
_avatarDataPackingElapsedTime = 0;

View file

@ -83,12 +83,18 @@ private:
p_high_resolution_clock::time_point _lastDebugMessage;
QHash<QString, QPair<int, int>> _sessionDisplayNames;
quint64 _broadcastAvatarDataElapsedTime { 0 }; // total time spent in broadcastAvatarData since last stats window
quint64 _displayNameManagementElapsedTime { 0 }; // total time spent in broadcastAvatarData/display name management... since last stats window
quint64 _ignoreCalculationElapsedTime { 0 };
quint64 _avatarDataPackingElapsedTime { 0 };
quint64 _packetSendingElapsedTime { 0 };
quint64 _broadcastAvatarDataElapsedTime { 0 }; // total time spent in broadcastAvatarData since last stats window
quint64 _broadcastAvatarDataInner { 0 };
quint64 _broadcastAvatarDataLockWait { 0 };
quint64 _broadcastAvatarDataNodeTransform { 0 };
quint64 _broadcastAvatarDataNodeFunctor { 0 };
quint64 _handleViewFrustumPacketElapsedTime { 0 };
quint64 _handleAvatarDataPacketElapsedTime { 0 };

View file

@ -70,6 +70,8 @@ static const int EXTRA_AVATAR_DATA_FRAME_RATIO = 16;
const float IDENTITY_SEND_PROBABILITY = 1.0f / 187.0f; // FIXME... this is wrong for 45hz
void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
quint64 start = usecTimestampNow();
//qDebug() << __FUNCTION__ << "node:" << node;
auto nodeList = DependencyManager::get<NodeList>();
@ -81,13 +83,13 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) {
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
MutexTryLocker lock(nodeData->getMutex());
//MutexTryLocker lock(nodeData->getMutex());
// FIXME????
if (!lock.isLocked()) {
//if (!lock.isLocked()) {
//qDebug() << __FUNCTION__ << "unable to lock... node:" << node << " would BAIL???... line:" << __LINE__;
//return;
}
//}
// FIXME -- mixer data
// ++_sumListeners;
@ -244,13 +246,13 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
++numOtherAvatars;
AvatarMixerClientData* otherNodeData = reinterpret_cast<AvatarMixerClientData*>(otherNode->getLinkedData());
MutexTryLocker lock(otherNodeData->getMutex());
//MutexTryLocker lock(otherNodeData->getMutex());
// FIXME -- might want to track this lock failed...
if (!lock.isLocked()) {
//if (!lock.isLocked()) {
//qDebug() << __FUNCTION__ << "inner loop, node:" << node << "otherNode:" << otherNode << " failed to lock... would BAIL??... line:" << __LINE__;
//return;
}
//}
// make sure we send out identity packets to and from new arrivals.
bool forceSend = !otherNodeData->checkAndSetHasReceivedFirstPacketsFrom(node->getUUID());
@ -289,7 +291,7 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
}
@ -309,7 +311,7 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
@ -335,7 +337,7 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
//qDebug() << __FUNCTION__ << "inner loop, node:" << node->getUUID() << "otherNode:" << otherNode->getUUID() << " BAILING... line:" << __LINE__;
shouldConsider = false;
}
@ -397,5 +399,8 @@ void AvatarMixerSlave::anotherJob(const SharedNodePointer& node) {
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
}
quint64 end = usecTimestampNow();
_stats.jobElapsedTime += (end - start);
}

View file

@ -30,6 +30,7 @@ public:
quint64 ignoreCalculationElapsedTime { 0 };
quint64 avatarDataPackingElapsedTime { 0 };
quint64 packetSendingElapsedTime { 0 };
quint64 jobElapsedTime { 0 };
void reset() {
nodesProcessed = 0;
@ -38,6 +39,7 @@ public:
ignoreCalculationElapsedTime = 0;
avatarDataPackingElapsedTime = 0;
packetSendingElapsedTime = 0;
jobElapsedTime = 0;
}
AvatarMixerSlaveStats& operator+=(const AvatarMixerSlaveStats& rhs) {
@ -47,6 +49,7 @@ public:
ignoreCalculationElapsedTime += rhs.ignoreCalculationElapsedTime;
avatarDataPackingElapsedTime += rhs.avatarDataPackingElapsedTime;
packetSendingElapsedTime += rhs.packetSendingElapsedTime;
jobElapsedTime += rhs.jobElapsedTime;
return *this;
}

View file

@ -182,15 +182,33 @@ public:
// This allows multiple threads (i.e. a thread pool) to share a lock
// without deadlocking when a dying node attempts to acquire a write lock
template<typename NestedNodeLambda>
void nestedEach(NestedNodeLambda functor) {
QReadLocker readLock(&_nodeMutex);
void nestedEach(NestedNodeLambda functor,
int* lockWaitOut = nullptr,
int* nodeTransformOut = nullptr,
int* functorOut = nullptr) {
auto start = usecTimestampNow();
{
QReadLocker readLock(&_nodeMutex);
auto endLock = usecTimestampNow();
if (lockWaitOut) {
*lockWaitOut = (endLock - start);
}
std::vector<SharedNodePointer> nodes(_nodeHash.size());
std::transform(_nodeHash.cbegin(), _nodeHash.cend(), nodes.begin(), [](const NodeHash::value_type& it) {
return it.second;
});
std::vector<SharedNodePointer> nodes(_nodeHash.size());
std::transform(_nodeHash.cbegin(), _nodeHash.cend(), nodes.begin(), [](const NodeHash::value_type& it) {
return it.second;
});
auto endTransform = usecTimestampNow();
if (nodeTransformOut) {
*nodeTransformOut = (endTransform - endLock);
}
functor(nodes.cbegin(), nodes.cend());
functor(nodes.cbegin(), nodes.cend());
auto endFunctor = usecTimestampNow();
if (functorOut) {
*functorOut = (endFunctor - endTransform);
}
}
}
template<typename NodeLambda>