diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 9616c8cb21..176fd51eea 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -20,9 +20,9 @@ #include #include #include +#include #include "AvatarMixerClientData.h" - #include "AvatarMixer.h" const QString AVATAR_MIXER_LOGGING_NAME = "avatar-mixer"; @@ -119,12 +119,25 @@ void AvatarMixer::broadcastAvatarData() { auto nodeList = DependencyManager::get(); - AvatarMixerClientData* nodeData = NULL; - AvatarMixerClientData* otherNodeData = NULL; - - nodeList->eachNode([&](const SharedNodePointer& node) { - if (node->getLinkedData() && node->getType() == NodeType::Agent && node->getActiveSocket() - && (nodeData = reinterpret_cast(node->getLinkedData()))->getMutex().tryLock()) { + nodeList->eachMatchingNode( + [&](const SharedNodePointer& node)->bool { + if (!node->getLinkedData()) { + return false; + } + if (node->getType() != NodeType::Agent) { + return false; + } + if (!node->getActiveSocket()) { + return false; + } + return true; + }, + [&](const SharedNodePointer& node) { + AvatarMixerClientData* nodeData = reinterpret_cast(node->getLinkedData()); + MutexTryLocker lock(nodeData->getMutex()); + if (!lock.isLocked()) { + return; + } ++_sumListeners; // reset packet pointers for this node @@ -132,83 +145,97 @@ void AvatarMixer::broadcastAvatarData() { AvatarData& avatar = nodeData->getAvatar(); glm::vec3 myPosition = avatar.getPosition(); + // TODO use this along with the distance in the calculation of whether to send an update + // about a given otherNode to this node + // FIXME does this mean we should sort the othernodes by distance before iterating + // over them? + float outputBandwidth = node->getOutboundBandwidth(); // this is an AGENT we have received head data from // send back a packet with other active node data to this node - nodeList->eachNode([&](const SharedNodePointer& otherNode) { - if (otherNode->getLinkedData() && otherNode->getUUID() != node->getUUID() - && (otherNodeData = reinterpret_cast(otherNode->getLinkedData()))->getMutex().tryLock()) { - - AvatarMixerClientData* otherNodeData = reinterpret_cast(otherNode->getLinkedData()); + nodeList->eachMatchingNode( + [&](const SharedNodePointer& otherNode)->bool { + if (!otherNode->getLinkedData()) { + return false; + } + if (otherNode->getUUID() == node->getUUID()) { + return false; + } + + // Check throttling value + if (!(_performanceThrottlingRatio == 0 || randFloat() < (1.0f - _performanceThrottlingRatio))) { + return false; + } + return true; + }, + [&](const SharedNodePointer& otherNode) { + AvatarMixerClientData* otherNodeData = otherNodeData = reinterpret_cast(otherNode->getLinkedData()); + MutexTryLocker lock(otherNodeData->getMutex()); + if (!lock.isLocked()) { + return; + } AvatarData& otherAvatar = otherNodeData->getAvatar(); - glm::vec3 otherPosition = otherAvatar.getPosition(); - - float distanceToAvatar = glm::length(myPosition - otherPosition); + // Decide whether to send this avatar's data based on it's distance from us // The full rate distance is the distance at which EVERY update will be sent for this avatar // at a distance of twice the full rate distance, there will be a 50% chance of sending this avatar's update const float FULL_RATE_DISTANCE = 2.0f; - - // Decide whether to send this avatar's data based on it's distance from us - if ((_performanceThrottlingRatio == 0 || randFloat() < (1.0f - _performanceThrottlingRatio)) - && (distanceToAvatar == 0.0f || randFloat() < FULL_RATE_DISTANCE / distanceToAvatar)) { - QByteArray avatarByteArray; - avatarByteArray.append(otherNode->getUUID().toRfc4122()); - avatarByteArray.append(otherAvatar.toByteArray()); - - if (avatarByteArray.size() + mixedAvatarByteArray.size() > MAX_PACKET_SIZE) { - nodeList->writeDatagram(mixedAvatarByteArray, node); - - // reset the packet - mixedAvatarByteArray.resize(numPacketHeaderBytes); - } - - // copy the avatar into the mixedAvatarByteArray packet - mixedAvatarByteArray.append(avatarByteArray); - - // if the receiving avatar has just connected make sure we send out the mesh and billboard - // for this avatar (assuming they exist) - bool forceSend = !nodeData->checkAndSetHasReceivedFirstPackets(); - - // we will also force a send of billboard or identity packet - // if either has changed in the last frame - - if (otherNodeData->getBillboardChangeTimestamp() > 0 - && (forceSend - || otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp - || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - QByteArray billboardPacket = byteArrayWithPopulatedHeader(PacketTypeAvatarBillboard); - billboardPacket.append(otherNode->getUUID().toRfc4122()); - billboardPacket.append(otherNodeData->getAvatar().getBillboard()); - nodeList->writeDatagram(billboardPacket, node); - - ++_sumBillboardPackets; - } - - if (otherNodeData->getIdentityChangeTimestamp() > 0 - && (forceSend - || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp - || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { - - QByteArray identityPacket = byteArrayWithPopulatedHeader(PacketTypeAvatarIdentity); - - QByteArray individualData = otherNodeData->getAvatar().identityByteArray(); - individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122()); - identityPacket.append(individualData); - - nodeList->writeDatagram(identityPacket, node); - - ++_sumIdentityPackets; - } + glm::vec3 otherPosition = otherAvatar.getPosition(); + float distanceToAvatar = glm::length(myPosition - otherPosition); + + if (!(distanceToAvatar == 0.0f || randFloat() < FULL_RATE_DISTANCE / distanceToAvatar)) { + return; + } + + QByteArray avatarByteArray; + avatarByteArray.append(otherNode->getUUID().toRfc4122()); + avatarByteArray.append(otherAvatar.toByteArray()); + + if (avatarByteArray.size() + mixedAvatarByteArray.size() > MAX_PACKET_SIZE) { + nodeList->writeDatagram(mixedAvatarByteArray, node); + + // reset the packet + mixedAvatarByteArray.resize(numPacketHeaderBytes); + } + + // copy the avatar into the mixedAvatarByteArray packet + mixedAvatarByteArray.append(avatarByteArray); + + // if the receiving avatar has just connected make sure we send out the mesh and billboard + // for this avatar (assuming they exist) + bool forceSend = !nodeData->checkAndSetHasReceivedFirstPackets(); + + // we will also force a send of billboard or identity packet + // if either has changed in the last frame + + if (otherNodeData->getBillboardChangeTimestamp() > 0 + && (forceSend + || otherNodeData->getBillboardChangeTimestamp() > _lastFrameTimestamp + || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { + QByteArray billboardPacket = byteArrayWithPopulatedHeader(PacketTypeAvatarBillboard); + billboardPacket.append(otherNode->getUUID().toRfc4122()); + billboardPacket.append(otherNodeData->getAvatar().getBillboard()); + nodeList->writeDatagram(billboardPacket, node); + + ++_sumBillboardPackets; + } + + if (otherNodeData->getIdentityChangeTimestamp() > 0 + && (forceSend + || otherNodeData->getIdentityChangeTimestamp() > _lastFrameTimestamp + || randFloat() < BILLBOARD_AND_IDENTITY_SEND_PROBABILITY)) { + + QByteArray identityPacket = byteArrayWithPopulatedHeader(PacketTypeAvatarIdentity); + + QByteArray individualData = otherNodeData->getAvatar().identityByteArray(); + individualData.replace(0, NUM_BYTES_RFC4122_UUID, otherNode->getUUID().toRfc4122()); + identityPacket.append(individualData); + + nodeList->writeDatagram(identityPacket, node); + + ++_sumIdentityPackets; } - - otherNodeData->getMutex().unlock(); - } }); - nodeList->writeDatagram(mixedAvatarByteArray, node); - - nodeData->getMutex().unlock(); - } }); _lastFrameTimestamp = QDateTime::currentMSecsSinceEpoch(); diff --git a/libraries/networking/src/BandwidthRecorder.h b/libraries/networking/src/BandwidthRecorder.h index c22665d2cc..f87d9d4d06 100644 --- a/libraries/networking/src/BandwidthRecorder.h +++ b/libraries/networking/src/BandwidthRecorder.h @@ -17,7 +17,6 @@ #include #include #include "DependencyManager.h" -#include "Node.h" #include "SimpleMovingAverage.h" diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 520dc650ed..98e1ed0572 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -263,8 +263,10 @@ qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, } emit dataSent(destinationNode->getType(), datagram.size()); - - return writeDatagram(datagram, *destinationSockAddr, destinationNode->getConnectionSecret()); + auto bytesWritten = writeDatagram(datagram, *destinationSockAddr, destinationNode->getConnectionSecret()); + // Keep track of per-destination-node bandwidth + destinationNode->recordBytesSent(bytesWritten); + return bytesWritten; } // didn't have a destinationNode to send to, return 0 diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index a071eced31..532e8ffcf4 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -151,7 +151,18 @@ public: functor(it->second); } } - + + template + void eachMatchingNode(PredLambda predicate, NodeLambda functor) { + QReadLocker readLock(&_nodeMutex); + + for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) { + if (predicate(it->second)) { + functor(it->second); + } + } + } + template void eachNodeBreakable(BreakableNodeLambda functor) { QReadLocker readLock(&_nodeMutex); diff --git a/libraries/networking/src/NetworkPeer.cpp b/libraries/networking/src/NetworkPeer.cpp index eaaf57471c..c6026b3a23 100644 --- a/libraries/networking/src/NetworkPeer.cpp +++ b/libraries/networking/src/NetworkPeer.cpp @@ -15,6 +15,7 @@ #include #include "NetworkPeer.h" +#include "BandwidthRecorder.h" NetworkPeer::NetworkPeer() : _uuid(), @@ -96,4 +97,37 @@ QDebug operator<<(QDebug debug, const NetworkPeer &peer) { << "- public:" << peer.getPublicSocket() << "- local:" << peer.getLocalSocket(); return debug; -} \ No newline at end of file +} + + +// FIXME this is a temporary implementation to determine if this is the right approach. +// If so, migrate the BandwidthRecorder into the NetworkPeer class +using BandwidthRecorderPtr = QSharedPointer; +static QHash PEER_BANDWIDTH; + +BandwidthRecorder& getBandwidthRecorder(const QUuid & uuid) { + if (!PEER_BANDWIDTH.count(uuid)) { + PEER_BANDWIDTH.insert(uuid, BandwidthRecorderPtr(new BandwidthRecorder())); + } + return *PEER_BANDWIDTH[uuid].data(); +} + +void NetworkPeer::recordBytesSent(int count) { + auto& bw = getBandwidthRecorder(_uuid); + bw.updateOutboundData(0, count); +} + +void NetworkPeer::recordBytesReceived(int count) { + auto& bw = getBandwidthRecorder(_uuid); + bw.updateInboundData(0, count); +} + +float NetworkPeer::getOutboundBandwidth() { + auto& bw = getBandwidthRecorder(_uuid); + return bw.getAverageOutputKilobitsPerSecond(0); +} + +float NetworkPeer::getInboundBandwidth() { + auto& bw = getBandwidthRecorder(_uuid); + return bw.getAverageInputKilobitsPerSecond(0); +} diff --git a/libraries/networking/src/NetworkPeer.h b/libraries/networking/src/NetworkPeer.h index bb92c54eb8..5bf798d2c5 100644 --- a/libraries/networking/src/NetworkPeer.h +++ b/libraries/networking/src/NetworkPeer.h @@ -54,6 +54,12 @@ public: int getConnectionAttempts() const { return _connectionAttempts; } void incrementConnectionAttempts() { ++_connectionAttempts; } void resetConnectionAttemps() { _connectionAttempts = 0; } + + void recordBytesSent(int count); + void recordBytesReceived(int count); + + float getOutboundBandwidth(); + float getInboundBandwidth(); friend QDataStream& operator<<(QDataStream& out, const NetworkPeer& peer); friend QDataStream& operator>>(QDataStream& in, NetworkPeer& peer); diff --git a/libraries/shared/src/TryLocker.h b/libraries/shared/src/TryLocker.h new file mode 100644 index 0000000000..a5c8077484 --- /dev/null +++ b/libraries/shared/src/TryLocker.h @@ -0,0 +1,28 @@ +// +// TryLocker.h +// libraries/shared/src +// +// Created by Brad Davis on 2015/03/16. +// Copyright 2015 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_TryLocker_h +#define hifi_TryLocker_h + +#include + +class MutexTryLocker { + QMutex& _mutex; + bool _locked{ false }; +public: + MutexTryLocker(QMutex &m) : _mutex(m), _locked(m.tryLock()) {} + ~MutexTryLocker() { if (_locked) _mutex.unlock(); } + bool isLocked() { + return _locked; + } +}; + +#endif // hifi_TryLocker_h