From 3a282045ff60d766bc96a5856b32bf147b37ba5d Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Fri, 21 Dec 2018 15:48:58 -0800 Subject: [PATCH 1/9] Checkpoint trait-level flow control --- assignment-client/src/avatars/AvatarMixer.cpp | 1 + .../src/avatars/AvatarMixerClientData.cpp | 26 +++++++++++++++++++ .../src/avatars/AvatarMixerClientData.h | 16 +++++++++++- .../src/avatars/AvatarMixerSlave.cpp | 12 +++++++-- libraries/avatars/src/AvatarHashMap.cpp | 11 ++++++++ libraries/avatars/src/AvatarTraits.h | 4 +++ libraries/networking/src/udt/PacketHeaders.h | 2 +- 7 files changed, 68 insertions(+), 4 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index 53fc13e5cf..6b2b60d3fb 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -56,6 +56,7 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) : packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket"); packetReceiver.registerListener(PacketType::AvatarIdentityRequest, this, "handleAvatarIdentityRequestPacket"); packetReceiver.registerListener(PacketType::SetAvatarTraits, this, "queueIncomingPacket"); + packetReceiver.registerListener(PacketType::BulkAvatarTraitsAck, this, "queueIncomingPacket"); packetReceiver.registerListenerForTypes({ PacketType::ReplicatedAvatarIdentity, diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index 5e36d8beaf..076f9ea862 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -68,6 +68,9 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData case PacketType::SetAvatarTraits: processSetTraitsMessage(*packet, slaveSharedData, *node); break; + case PacketType::BulkAvatarTraitsAck: + processBulkAvatarTraitsAckMessage(*packet); + break; default: Q_UNREACHABLE(); } @@ -179,6 +182,29 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, } } +void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) { + + // Look up the avatar/trait data associated with this ack and update the 'last ack' list + // with it. + AvatarTraits::TraitMessageSequence seq; + message.readPrimitive(&seq); + auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq); + if (sentAvatarTraitVersions != _pendingTraitVersions.end()) { + for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) { + auto& nodeId = nodeTraitVersions.first; + auto& versions = nodeTraitVersions.second; + auto simpleReceivedIt = versions.simpleCBegin(); + while (simpleReceivedIt != versions.simpleCEnd()) { + auto traitType = static_cast(std::distance(versions.simpleCBegin(), + simpleReceivedIt)); + _ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt; + } + } + _pendingTraitVersions.erase(sentAvatarTraitVersions); + } +} + + void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode, AvatarTraits::TraitVersion traitVersion) { const auto& whitelist = slaveSharedData.skeletonURLWhitelist; diff --git a/assignment-client/src/avatars/AvatarMixerClientData.h b/assignment-client/src/avatars/AvatarMixerClientData.h index 8a86af384a..f26ce8504b 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.h +++ b/assignment-client/src/avatars/AvatarMixerClientData.h @@ -42,6 +42,7 @@ public: AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID); virtual ~AvatarMixerClientData() {} using HRCTime = p_high_resolution_clock::time_point; + using NodeTraitVersions = std::unordered_map; int parseData(ReceivedMessage& message) override; AvatarData& getAvatar() { return *_avatar; } @@ -124,6 +125,7 @@ public: int processPackets(const SlaveSharedData& slaveSharedData); // returns number of packets processed void processSetTraitsMessage(ReceivedMessage& message, const SlaveSharedData& slaveSharedData, Node& sendingNode); + void processBulkAvatarTraitsAckMessage(ReceivedMessage& message); void checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, Node& sendingNode, AvatarTraits::TraitVersion traitVersion); @@ -138,7 +140,14 @@ public: void setLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar, TraitsCheckTimestamp sendPoint) { _lastSentTraitsTimestamps[otherAvatar] = sendPoint; } + AvatarTraits::TraitMessageSequence getTraitsMessageSequence() const { return _currentTraitsMessageSequence; } + AvatarTraits::TraitMessageSequence nextTraitsMessageSequence() { return ++_currentTraitsMessageSequence; } + AvatarTraits::TraitVersions& getPendingTraitVersions(AvatarTraits::TraitMessageSequence seq, Node::LocalID otherId) { + return _pendingTraitVersions[seq][otherId]; + } + AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _sentTraitVersions[otherAvatar]; } + AvatarTraits::TraitVersions& getLastAckedTraitVersions(Node::LocalID otherAvatar) { return _ackedTraitVersions[otherAvatar]; } void resetSentTraitData(Node::LocalID nodeID); @@ -183,8 +192,13 @@ private: AvatarTraits::TraitVersions _lastReceivedTraitVersions; TraitsCheckTimestamp _lastReceivedTraitsChange; + AvatarTraits::TraitMessageSequence _currentTraitsMessageSequence{ 0 }; + + std::unordered_map _pendingTraitVersions; + std::unordered_map _lastSentTraitsTimestamps; - std::unordered_map _sentTraitVersions; + NodeTraitVersions _sentTraitVersions; + NodeTraitVersions _ackedTraitVersions; std::atomic_bool _isIgnoreRadiusEnabled { false }; }; diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index cd9d164ef7..b0cda85bec 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -103,6 +103,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // compare trait versions so we can see what exactly needs to go out auto& lastSentVersions = listeningNodeData->getLastSentTraitVersions(otherNodeLocalID); + auto& lastAckedVersions = listeningNodeData->getLastAckedTraitVersions(otherNodeLocalID); const auto& lastReceivedVersions = sendingNodeData->getLastReceivedTraitVersions(); auto simpleReceivedIt = lastReceivedVersions.simpleCBegin(); @@ -112,13 +113,18 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis auto lastReceivedVersion = *simpleReceivedIt; auto& lastSentVersionRef = lastSentVersions[traitType]; + auto& lastAckedVersionRef = lastAckedVersions[traitType]; - if (lastReceivedVersions[traitType] > lastSentVersionRef) { + // hold sending more traits until we've been acked that the last one we sent was received + if (lastAckedVersionRef == lastSentVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) { // there is an update to this trait, add it to the traits packet bytesWritten += sendingAvatar->packTrait(traitType, traitsPacketList, lastReceivedVersion); - // update the last sent version lastSentVersionRef = lastReceivedVersion; + // Remember which versions we sent in this particular packet + // so we can verify when it's acked. + auto& pendingTraitVersions = listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), otherNodeLocalID); + pendingTraitVersions[traitType] = lastReceivedVersion; } ++simpleReceivedIt; @@ -419,6 +425,8 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) int remainingAvatars = (int)sortedAvatars.size(); auto traitsPacketList = NLPacketList::create(PacketType::BulkAvatarTraits, QByteArray(), true, true); + traitsPacketList->writePrimitive(nodeData->nextTraitsMessageSequence()); + auto avatarPacket = NLPacket::create(PacketType::BulkAvatarData); const int avatarPacketCapacity = avatarPacket->getPayloadCapacity(); int avatarSpaceAvailable = avatarPacketCapacity; diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index 41ca950b3b..ff902945bc 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -328,6 +328,17 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer } void AvatarHashMap::processBulkAvatarTraits(QSharedPointer message, SharedNodePointer sendingNode) { + AvatarTraits::TraitMessageSequence seq; + + message->readPrimitive(&seq); + + // we have a mixer to send to, setup our set traits packet + auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); + traitsAckPacket->writePrimitive(seq); + auto nodeList = DependencyManager::get(); + SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); + nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); + while (message->getBytesLeftToRead()) { // read the avatar ID to figure out which avatar this is for diff --git a/libraries/avatars/src/AvatarTraits.h b/libraries/avatars/src/AvatarTraits.h index 5e28515d12..966ae6bdde 100644 --- a/libraries/avatars/src/AvatarTraits.h +++ b/libraries/avatars/src/AvatarTraits.h @@ -41,6 +41,10 @@ namespace AvatarTraits { const TraitWireSize DELETED_TRAIT_SIZE = -1; const TraitWireSize MAXIMUM_TRAIT_SIZE = INT16_MAX; + using TraitMessageSequence = int64_t; + const TraitMessageSequence FIRST_TRAIT_SEQUENCE = 0; + const TraitMessageSequence MAX_TRAIT_SEQUENCE = INT64_MAX; + inline qint64 packInstancedTraitDelete(TraitType traitType, TraitInstanceID instanceID, ExtendedIODevice& destination, TraitVersion traitVersion = NULL_TRAIT_VERSION) { qint64 bytesWritten = 0; diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index b46eb3e9e4..508d46def8 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -133,7 +133,7 @@ public: EntityQueryInitialResultsComplete, BulkAvatarTraits, AudioSoloRequest, - + BulkAvatarTraitsAck, NUM_PACKET_TYPE }; From da70271acfccdfd32d8d61b39978f8743b993065 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Wed, 26 Dec 2018 13:46:08 -0800 Subject: [PATCH 2/9] Checkpoint #2 Case20377 - Add Ack for BulkAvatarTraits --- .../src/avatars/AvatarMixerClientData.cpp | 13 ++++++++ .../src/avatars/AvatarMixerSlave.cpp | 33 ++++++++++++++----- .../src/avatars/AvatarMixerSlave.h | 5 +++ .../networking/src/udt/PacketHeaders.cpp | 2 ++ libraries/networking/src/udt/PacketHeaders.h | 3 +- 5 files changed, 46 insertions(+), 10 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index 076f9ea862..a20d6504de 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -190,6 +190,10 @@ void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& m message.readPrimitive(&seq); auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq); if (sentAvatarTraitVersions != _pendingTraitVersions.end()) { + // Note, this is not a simple move of the pending traits + // to the acked traits. Instead, it's a copy where existing + // trait versions in the acked hash are retained for traits not + // included in the pending hash for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) { auto& nodeId = nodeTraitVersions.first; auto& versions = nodeTraitVersions.second; @@ -198,10 +202,19 @@ void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& m auto traitType = static_cast(std::distance(versions.simpleCBegin(), simpleReceivedIt)); _ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt; + simpleReceivedIt++; } } _pendingTraitVersions.erase(sentAvatarTraitVersions); } + else { + // This can happen either the BulkAvatarTraits was sent with no simple traits, + // or if the avatar mixer restarts while there are pending + // BulkAvatarTraits messages in-flight. + if (seq > getTraitsMessageSequence()) { + qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " << message.getSenderSockAddr(); + } + } } diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index b0cda85bec..1e84da7e55 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -80,6 +80,21 @@ int AvatarMixerSlave::sendIdentityPacket(NLPacketList& packetList, const AvatarM } } +qint64 AvatarMixerSlave::addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData, + const AvatarMixerClientData* sendingNodeData, + NLPacketList& traitsPacketList, + qint64 bytesWritten) { + if (bytesWritten == 0) { + + if (traitsPacketList.getNumPackets() == 0) { + bytesWritten += traitsPacketList.writePrimitive(listeningNodeData->nextTraitsMessageSequence()); + } + // add the avatar ID to mark the beginning of traits for this avatar + bytesWritten += traitsPacketList.write(sendingNodeData->getNodeID().toRfc4122()); + } + return bytesWritten; +} + qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData, const AvatarMixerClientData* sendingNodeData, NLPacketList& traitsPacketList) { @@ -96,9 +111,6 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis if (timeOfLastTraitsChange > timeOfLastTraitsSent) { // there is definitely new traits data to send - // add the avatar ID to mark the beginning of traits for this avatar - bytesWritten += traitsPacketList.write(sendingNodeData->getNodeID().toRfc4122()); - auto sendingAvatar = sendingNodeData->getAvatarSharedPointer(); // compare trait versions so we can see what exactly needs to go out @@ -116,7 +128,8 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis auto& lastAckedVersionRef = lastAckedVersions[traitType]; // hold sending more traits until we've been acked that the last one we sent was received - if (lastAckedVersionRef == lastSentVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) { + if (lastSentVersionRef == lastAckedVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) { + addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // there is an update to this trait, add it to the traits packet bytesWritten += sendingAvatar->packTrait(traitType, traitsPacketList, lastReceivedVersion); // update the last sent version @@ -156,6 +169,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis }); if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) { + addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // this instance version exists and has never been sent or is newer so we need to send it bytesWritten += sendingAvatar->packTraitInstance(traitType, instanceID, traitsPacketList, receivedVersion); @@ -166,6 +180,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis sentIDValuePairs.emplace_back(instanceID, receivedVersion); } } else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) { + addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // this instance version was deleted and we haven't sent the delete to this client yet bytesWritten += AvatarTraits::packInstancedTraitDelete(traitType, instanceID, traitsPacketList, absoluteReceivedVersion); @@ -177,10 +192,10 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis ++instancedReceivedIt; } - - // write a null trait type to mark the end of trait data for this avatar - bytesWritten += traitsPacketList.writePrimitive(AvatarTraits::NullTrait); - + if (bytesWritten) { + // write a null trait type to mark the end of trait data for this avatar + bytesWritten += traitsPacketList.writePrimitive(AvatarTraits::NullTrait); + } // since we send all traits for this other avatar, update the time of last traits sent // to match the time of last traits change listeningNodeData->setLastOtherAvatarTraitsSendPoint(otherNodeLocalID, timeOfLastTraitsChange); @@ -425,7 +440,6 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) int remainingAvatars = (int)sortedAvatars.size(); auto traitsPacketList = NLPacketList::create(PacketType::BulkAvatarTraits, QByteArray(), true, true); - traitsPacketList->writePrimitive(nodeData->nextTraitsMessageSequence()); auto avatarPacket = NLPacket::create(PacketType::BulkAvatarData); const int avatarPacketCapacity = avatarPacket->getPayloadCapacity(); @@ -558,6 +572,7 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) if (traitsPacketList->getNumPackets() >= 1) { // send the traits packet list + nodeList->sendPacketList(std::move(traitsPacketList), *destinationNode); } diff --git a/assignment-client/src/avatars/AvatarMixerSlave.h b/assignment-client/src/avatars/AvatarMixerSlave.h index 2ef90af38e..56c49b5b2e 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.h +++ b/assignment-client/src/avatars/AvatarMixerSlave.h @@ -104,6 +104,11 @@ private: int sendIdentityPacket(NLPacketList& packet, const AvatarMixerClientData* nodeData, const Node& destinationNode); int sendReplicatedIdentityPacket(const Node& agentNode, const AvatarMixerClientData* nodeData, const Node& destinationNode); + qint64 addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData, + const AvatarMixerClientData* sendingNodeData, + NLPacketList& traitsPacketList, + qint64 bytesWritten); + qint64 addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData, const AvatarMixerClientData* sendingNodeData, NLPacketList& traitsPacketList); diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index fad38d565b..5275c2c78e 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -96,6 +96,8 @@ PacketVersion versionForPacketType(PacketType packetType) { return 22; case PacketType::EntityQueryInitialResultsComplete: return static_cast(EntityVersion::ParticleSpin); + case PacketType::BulkAvatarTraitsAck: + return static_cast(AvatarMixerPacketVersion::AvatarTraitsAck); default: return 22; } diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index 508d46def8..5da3acdef9 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -299,7 +299,8 @@ enum class AvatarMixerPacketVersion : PacketVersion { MigrateSkeletonURLToTraits, MigrateAvatarEntitiesToTraits, FarGrabJointsRedux, - JointTransScaled + JointTransScaled, + AvatarTraitsAck }; enum class DomainConnectRequestVersion : PacketVersion { From 2e457e2212ef0e7dec1c6719adcfb06442d5326a Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Tue, 1 Jan 2019 20:50:46 -0800 Subject: [PATCH 3/9] Checkpoint trait flow control --- .../src/avatars/AvatarMixerClientData.cpp | 69 +++++++++++-------- .../src/avatars/AvatarMixerSlave.cpp | 24 ++++++- 2 files changed, 64 insertions(+), 29 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index a20d6504de..b2b09df6d2 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -19,9 +19,7 @@ #include "AvatarMixerSlave.h" -AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : - NodeData(nodeID, nodeLocalID) -{ +AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) { // in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID _avatar->setID(nodeID); } @@ -82,12 +80,11 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData } int AvatarMixerClientData::parseData(ReceivedMessage& message) { - // pull the sequence number from the data first uint16_t sequenceNumber; message.readPrimitive(&sequenceNumber); - + if (sequenceNumber < _lastReceivedSequenceNumber && _lastReceivedSequenceNumber != UINT16_MAX) { incrementNumOutOfOrderSends(); } @@ -98,7 +95,8 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) { } void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, - const SlaveSharedData& slaveSharedData, Node& sendingNode) { + const SlaveSharedData& slaveSharedData, + Node& sendingNode) { // pull the trait version from the message AvatarTraits::TraitVersion packetTraitVersion; message.readPrimitive(&packetTraitVersion); @@ -137,7 +135,7 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, AvatarTraits::TraitInstanceID instanceID = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID)); if (message.getBytesLeftToRead() == 0) { - qWarning () << "Received an instanced trait with no size from" << message.getSenderSockAddr(); + qWarning() << "Received an instanced trait with no size from" << message.getSenderSockAddr(); break; } @@ -145,7 +143,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, message.readPrimitive(&traitSize); if (traitSize < -1 || traitSize > message.getBytesLeftToRead()) { - qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" << message.getSenderSockAddr(); + qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" + << message.getSenderSockAddr(); break; } @@ -171,7 +170,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, message.seek(message.getPosition() + traitSize); } } else { - qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" << message.getSenderSockAddr(); + qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" + << message.getSenderSockAddr(); break; } } @@ -183,42 +183,56 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, } void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) { - // Look up the avatar/trait data associated with this ack and update the 'last ack' list // with it. AvatarTraits::TraitMessageSequence seq; message.readPrimitive(&seq); - auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq); + auto sentAvatarTraitVersions = _pendingTraitVersions.find(seq); if (sentAvatarTraitVersions != _pendingTraitVersions.end()) { // Note, this is not a simple move of the pending traits // to the acked traits. Instead, it's a copy where existing - // trait versions in the acked hash are retained for traits not - // included in the pending hash + // trait versions in the acked hash are retained for traits not + // included in the pending hash for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) { auto& nodeId = nodeTraitVersions.first; auto& versions = nodeTraitVersions.second; auto simpleReceivedIt = versions.simpleCBegin(); while (simpleReceivedIt != versions.simpleCEnd()) { - auto traitType = static_cast(std::distance(versions.simpleCBegin(), - simpleReceivedIt)); + auto traitType = static_cast(std::distance(versions.simpleCBegin(), simpleReceivedIt)); _ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt; simpleReceivedIt++; } + + // enumerate the sent instanced trait versions + auto instancedSentIt = versions.instancedCBegin(); + while (instancedSentIt != versions.instancedCEnd()) { + auto traitType = instancedSentIt->traitType; + // get or create the sent trait versions for this trait type + auto& sentIDValuePairs = versions.getInstanceIDValuePairs(traitType); + + // enumerate each sent instance + for (auto& sentInstance : instancedSentIt->instances) { + auto instanceID = sentInstance.id; + const auto sentVersion = sentInstance.value; + _ackedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion); + } + instancedSentIt++; + } } _pendingTraitVersions.erase(sentAvatarTraitVersions); - } - else { + } else { // This can happen either the BulkAvatarTraits was sent with no simple traits, // or if the avatar mixer restarts while there are pending // BulkAvatarTraits messages in-flight. if (seq > getTraitsMessageSequence()) { - qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " << message.getSenderSockAddr(); + qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " + << message.getSenderSockAddr(); } } } - -void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode, +void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, + Node& sendingNode, AvatarTraits::TraitVersion traitVersion) { const auto& whitelist = slaveSharedData.skeletonURLWhitelist; @@ -244,8 +258,8 @@ void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedDa // make sure we're not unecessarily overriding the default avatar with the default avatar if (_avatar->getWireSafeSkeletonModelURL() != slaveSharedData.skeletonReplacementURL) { // we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change - qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() - << "to replacement" << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); + qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement" + << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); _avatar->setSkeletonModelURL(slaveSharedData.skeletonReplacementURL); auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true); @@ -327,7 +341,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { _currentViewFrustums.clear(); auto sourceBuffer = reinterpret_cast(message.constData()); - + uint8_t numFrustums = 0; memcpy(&numFrustums, sourceBuffer, sizeof(numFrustums)); sourceBuffer += sizeof(numFrustums); @@ -342,9 +356,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { bool AvatarMixerClientData::otherAvatarInView(const AABox& otherAvatarBox) { return std::any_of(std::begin(_currentViewFrustums), std::end(_currentViewFrustums), - [&](const ConicalViewFrustum& viewFrustum) { - return viewFrustum.intersects(otherAvatarBox); - }); + [&](const ConicalViewFrustum& viewFrustum) { return viewFrustum.intersects(otherAvatarBox); }); } void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { @@ -355,14 +367,15 @@ void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends; jsonObject[OUTBOUND_AVATAR_DATA_STATS_KEY] = getOutboundAvatarDataKbps(); - jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT; + jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float)BYTES_PER_KILOBIT; jsonObject["av_data_receive_rate"] = _avatar->getReceiveRate(); jsonObject["recent_other_av_in_view"] = _recentOtherAvatarsInView; jsonObject["recent_other_av_out_of_view"] = _recentOtherAvatarsOutOfView; } -AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar) const { +AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint( + Node::LocalID otherAvatar) const { auto it = _lastSentTraitsTimestamps.find(otherAvatar); if (it != _lastSentTraitsTimestamps.end()) { diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 1e84da7e55..5ed10dad98 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -150,6 +150,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // get or create the sent trait versions for this trait type auto& sentIDValuePairs = lastSentVersions.getInstanceIDValuePairs(traitType); + auto& ackIDValuePairs = lastAckedVersions.getInstanceIDValuePairs(traitType); // enumerate each received instance for (auto& receivedInstance : instancedReceivedIt->instances) { @@ -167,7 +168,16 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis { return sentInstance.id == instanceID; }); - + // look for existing acked version for this instance + auto ackedInstanceIt = std::find_if(ackIDValuePairs.begin(), ackIDValuePairs.end(), + [instanceID](auto& ackInstance) { return ackInstance.id == instanceID; }); + + // if we have a sent version, then we must have an acked instance of the same trait with the same + // version to go on, otherwise we drop the received trait + if (sentInstanceIt != sentIDValuePairs.end() && + (ackedInstanceIt == ackIDValuePairs.end() || sentInstanceIt->value != ackedInstanceIt->value)) { + continue; + } if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) { addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); @@ -179,6 +189,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis } else { sentIDValuePairs.emplace_back(instanceID, receivedVersion); } + + auto& pendingTraitVersions = + listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), + otherNodeLocalID); + pendingTraitVersions.instanceInsert(traitType, instanceID, receivedVersion); + } else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) { addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); @@ -187,6 +203,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // update the last sent version for this trait instance to the absolute value of the deleted version sentInstanceIt->value = absoluteReceivedVersion; + + auto& pendingTraitVersions = + listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), + otherNodeLocalID); + pendingTraitVersions.instanceInsert(traitType, instanceID, absoluteReceivedVersion); + } } From 3d8a323fae7dcb97e672d27c6be1d4e957eb6207 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Wed, 2 Jan 2019 14:17:06 -0800 Subject: [PATCH 4/9] Remove unused variable --- .../src/avatars/AvatarMixerClientData.cpp | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index b2b09df6d2..37f71103d2 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -19,7 +19,8 @@ #include "AvatarMixerSlave.h" -AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) { +AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : + NodeData(nodeID, nodeLocalID) { // in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID _avatar->setID(nodeID); } @@ -208,7 +209,6 @@ void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& m while (instancedSentIt != versions.instancedCEnd()) { auto traitType = instancedSentIt->traitType; // get or create the sent trait versions for this trait type - auto& sentIDValuePairs = versions.getInstanceIDValuePairs(traitType); // enumerate each sent instance for (auto& sentInstance : instancedSentIt->instances) { @@ -258,8 +258,8 @@ void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedDa // make sure we're not unecessarily overriding the default avatar with the default avatar if (_avatar->getWireSafeSkeletonModelURL() != slaveSharedData.skeletonReplacementURL) { // we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change - qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement" - << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); + qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() + << "to replacement" << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); _avatar->setSkeletonModelURL(slaveSharedData.skeletonReplacementURL); auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true); @@ -356,7 +356,9 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { bool AvatarMixerClientData::otherAvatarInView(const AABox& otherAvatarBox) { return std::any_of(std::begin(_currentViewFrustums), std::end(_currentViewFrustums), - [&](const ConicalViewFrustum& viewFrustum) { return viewFrustum.intersects(otherAvatarBox); }); + [&](const ConicalViewFrustum& viewFrustum) { + return viewFrustum.intersects(otherAvatarBox); + }); } void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { @@ -367,15 +369,14 @@ void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends; jsonObject[OUTBOUND_AVATAR_DATA_STATS_KEY] = getOutboundAvatarDataKbps(); - jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float)BYTES_PER_KILOBIT; + jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT; jsonObject["av_data_receive_rate"] = _avatar->getReceiveRate(); jsonObject["recent_other_av_in_view"] = _recentOtherAvatarsInView; jsonObject["recent_other_av_out_of_view"] = _recentOtherAvatarsOutOfView; } -AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint( - Node::LocalID otherAvatar) const { +AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar) const { auto it = _lastSentTraitsTimestamps.find(otherAvatar); if (it != _lastSentTraitsTimestamps.end()) { From 443b54d931a1850ae1714a68ed267ba03138fcae Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Thu, 3 Jan 2019 11:25:39 -0800 Subject: [PATCH 5/9] comment and naming cleanup --- .../src/avatars/AvatarMixerClientData.cpp | 49 ++++++++++--------- .../src/avatars/AvatarMixerClientData.h | 14 +++--- .../src/avatars/AvatarMixerSlave.cpp | 10 +++- libraries/avatars/src/AvatarHashMap.cpp | 8 +-- 4 files changed, 47 insertions(+), 34 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index 37f71103d2..1da592b640 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -184,42 +184,47 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, } void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) { + // Avatar Traits flow control marks each outgoing avatar traits packet with a + // sequence number. The mixer caches the traits sent in the traits packet. + // Until an ack with the sequence number comes back, all updates to _traits + // in that packet_ are ignored. Updates to traits not in that packet will + // be sent. + // Look up the avatar/trait data associated with this ack and update the 'last ack' list // with it. AvatarTraits::TraitMessageSequence seq; message.readPrimitive(&seq); - auto sentAvatarTraitVersions = _pendingTraitVersions.find(seq); - if (sentAvatarTraitVersions != _pendingTraitVersions.end()) { - // Note, this is not a simple move of the pending traits - // to the acked traits. Instead, it's a copy where existing - // trait versions in the acked hash are retained for traits not - // included in the pending hash - for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) { - auto& nodeId = nodeTraitVersions.first; - auto& versions = nodeTraitVersions.second; - auto simpleReceivedIt = versions.simpleCBegin(); - while (simpleReceivedIt != versions.simpleCEnd()) { - auto traitType = static_cast(std::distance(versions.simpleCBegin(), simpleReceivedIt)); - _ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt; + auto sentAvatarTraitVersions = _perNodePendingTraitVersions.find(seq); + if (sentAvatarTraitVersions != _perNodePendingTraitVersions.end()) { + for (auto& perNodeTraitVersions : sentAvatarTraitVersions->second) { + auto& nodeId = perNodeTraitVersions.first; + auto& traitVersions = perNodeTraitVersions.second; + // For each trait that was sent in the traits packet, + // update the 'acked' trait version. Traits not + // sent in the traits packet keep their version. + + // process simple traits + auto simpleReceivedIt = traitVersions.simpleCBegin(); + while (simpleReceivedIt != traitVersions.simpleCEnd()) { + auto traitType = static_cast(std::distance(traitVersions.simpleCBegin(), simpleReceivedIt)); + _perNodeAckedTraitVersions[nodeId][traitType] = *simpleReceivedIt; simpleReceivedIt++; } - // enumerate the sent instanced trait versions - auto instancedSentIt = versions.instancedCBegin(); - while (instancedSentIt != versions.instancedCEnd()) { + // process instanced traits + auto instancedSentIt = traitVersions.instancedCBegin(); + while (instancedSentIt != traitVersions.instancedCEnd()) { auto traitType = instancedSentIt->traitType; - // get or create the sent trait versions for this trait type - // enumerate each sent instance for (auto& sentInstance : instancedSentIt->instances) { auto instanceID = sentInstance.id; const auto sentVersion = sentInstance.value; - _ackedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion); + _perNodeAckedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion); } instancedSentIt++; } } - _pendingTraitVersions.erase(sentAvatarTraitVersions); + _perNodePendingTraitVersions.erase(sentAvatarTraitVersions); } else { // This can happen either the BulkAvatarTraits was sent with no simple traits, // or if the avatar mixer restarts while there are pending @@ -334,7 +339,7 @@ void AvatarMixerClientData::removeFromRadiusIgnoringSet(const QUuid& other) { void AvatarMixerClientData::resetSentTraitData(Node::LocalID nodeLocalID) { _lastSentTraitsTimestamps[nodeLocalID] = TraitsCheckTimestamp(); - _sentTraitVersions[nodeLocalID].reset(); + _perNodeSentTraitVersions[nodeLocalID].reset(); } void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { @@ -390,5 +395,5 @@ void AvatarMixerClientData::cleanupKilledNode(const QUuid&, Node::LocalID nodeLo removeLastBroadcastSequenceNumber(nodeLocalID); removeLastBroadcastTime(nodeLocalID); _lastSentTraitsTimestamps.erase(nodeLocalID); - _sentTraitVersions.erase(nodeLocalID); + _perNodeSentTraitVersions.erase(nodeLocalID); } diff --git a/assignment-client/src/avatars/AvatarMixerClientData.h b/assignment-client/src/avatars/AvatarMixerClientData.h index f26ce8504b..16054e3458 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.h +++ b/assignment-client/src/avatars/AvatarMixerClientData.h @@ -42,7 +42,7 @@ public: AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID); virtual ~AvatarMixerClientData() {} using HRCTime = p_high_resolution_clock::time_point; - using NodeTraitVersions = std::unordered_map; + using PerNodeTraitVersions = std::unordered_map; int parseData(ReceivedMessage& message) override; AvatarData& getAvatar() { return *_avatar; } @@ -143,11 +143,11 @@ public: AvatarTraits::TraitMessageSequence getTraitsMessageSequence() const { return _currentTraitsMessageSequence; } AvatarTraits::TraitMessageSequence nextTraitsMessageSequence() { return ++_currentTraitsMessageSequence; } AvatarTraits::TraitVersions& getPendingTraitVersions(AvatarTraits::TraitMessageSequence seq, Node::LocalID otherId) { - return _pendingTraitVersions[seq][otherId]; + return _perNodePendingTraitVersions[seq][otherId]; } - AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _sentTraitVersions[otherAvatar]; } - AvatarTraits::TraitVersions& getLastAckedTraitVersions(Node::LocalID otherAvatar) { return _ackedTraitVersions[otherAvatar]; } + AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _perNodeSentTraitVersions[otherAvatar]; } + AvatarTraits::TraitVersions& getLastAckedTraitVersions(Node::LocalID otherAvatar) { return _perNodeAckedTraitVersions[otherAvatar]; } void resetSentTraitData(Node::LocalID nodeID); @@ -194,11 +194,11 @@ private: AvatarTraits::TraitMessageSequence _currentTraitsMessageSequence{ 0 }; - std::unordered_map _pendingTraitVersions; + std::unordered_map _perNodePendingTraitVersions; std::unordered_map _lastSentTraitsTimestamps; - NodeTraitVersions _sentTraitVersions; - NodeTraitVersions _ackedTraitVersions; + PerNodeTraitVersions _perNodeSentTraitVersions; + PerNodeTraitVersions _perNodeAckedTraitVersions; std::atomic_bool _isIgnoreRadiusEnabled { false }; }; diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 5ed10dad98..5f24ae1b64 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -85,11 +85,11 @@ qint64 AvatarMixerSlave::addTraitsNodeHeader(AvatarMixerClientData* listeningNod NLPacketList& traitsPacketList, qint64 bytesWritten) { if (bytesWritten == 0) { - if (traitsPacketList.getNumPackets() == 0) { + // This is the beginning of the traits packet, write out the sequence number. bytesWritten += traitsPacketList.writePrimitive(listeningNodeData->nextTraitsMessageSequence()); } - // add the avatar ID to mark the beginning of traits for this avatar + // This is the beginning of the traits for a node, write out the node id bytesWritten += traitsPacketList.write(sendingNodeData->getNodeID().toRfc4122()); } return bytesWritten; @@ -99,6 +99,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis const AvatarMixerClientData* sendingNodeData, NLPacketList& traitsPacketList) { + // Avatar Traits flow control marks each outgoing avatar traits packet with a + // sequence number. The mixer caches the traits sent in the traits packet. + // Until an ack with the sequence number comes back, all updates to _traits + // in that packet_ are ignored. Updates to traits not in that packet will + // be sent. + auto otherNodeLocalID = sendingNodeData->getNodeLocalID(); // Perform a simple check with two server clock time points diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index ff902945bc..6a67ef6638 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -332,13 +332,15 @@ void AvatarHashMap::processBulkAvatarTraits(QSharedPointer mess message->readPrimitive(&seq); - // we have a mixer to send to, setup our set traits packet auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); traitsAckPacket->writePrimitive(seq); auto nodeList = DependencyManager::get(); SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); - nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); - + if (!avatarMixer.isNull()) { + // we have a mixer to send to, acknowledge that we received these + // traits. + nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); + } while (message->getBytesLeftToRead()) { // read the avatar ID to figure out which avatar this is for From 6d791a80c28ebd22cd9f596647fed282164fd161 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Fri, 4 Jan 2019 19:04:01 -0800 Subject: [PATCH 6/9] Bump protocol version for BulkAvatarTraits and add some guard code around packet parsing. --- interface/src/Application.cpp | 2 +- libraries/avatars/src/AvatarHashMap.cpp | 29 ++++++++++++------- .../networking/src/udt/PacketHeaders.cpp | 1 + 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/interface/src/Application.cpp b/interface/src/Application.cpp index 6d870b58d6..04adc376e6 100644 --- a/interface/src/Application.cpp +++ b/interface/src/Application.cpp @@ -2057,7 +2057,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer, bo properties["avatar_ping"] = avatarMixerNode ? avatarMixerNode->getPingMs() : -1; properties["asset_ping"] = assetServerNode ? assetServerNode->getPingMs() : -1; properties["messages_ping"] = messagesMixerNode ? messagesMixerNode->getPingMs() : -1; - properties["atp_in_kbps"] = messagesMixerNode ? assetServerNode->getInboundKbps() : 0.0f; + properties["atp_in_kbps"] = assetServerNode ? assetServerNode->getInboundKbps() : 0.0f; auto loadingRequests = ResourceCache::getLoadingRequests(); diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index 6a67ef6638..b3add74f9c 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -330,19 +330,25 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer void AvatarHashMap::processBulkAvatarTraits(QSharedPointer message, SharedNodePointer sendingNode) { AvatarTraits::TraitMessageSequence seq; - message->readPrimitive(&seq); + if (message->getBytesLeftToRead() > sizeof(AvatarTraits::TraitMessageSequence)) { + message->readPrimitive(&seq); - auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); - traitsAckPacket->writePrimitive(seq); - auto nodeList = DependencyManager::get(); - SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); - if (!avatarMixer.isNull()) { - // we have a mixer to send to, acknowledge that we received these - // traits. - nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); + auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); + traitsAckPacket->writePrimitive(seq); + auto nodeList = DependencyManager::get(); + SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); + if (!avatarMixer.isNull()) { + // we have a mixer to send to, acknowledge that we received these + // traits. + nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); + } + } + else { + qWarning() << "No BulkAvatarTraits packet sequence number."; + return; } - while (message->getBytesLeftToRead()) { + while (message->getBytesLeftToRead() >= NUM_BYTES_RFC4122_UUID + sizeof(AvatarTraits::TraitType)) { // read the avatar ID to figure out which avatar this is for auto avatarID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); @@ -407,6 +413,9 @@ void AvatarHashMap::processBulkAvatarTraits(QSharedPointer mess message->readPrimitive(&traitType); } } + if (message->getBytesLeftToRead() > 0) { + qWarning() << "Leftover bytes in BulkAvatarTraits message"; + } } void AvatarHashMap::processKillAvatar(QSharedPointer message, SharedNodePointer sendingNode) { diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index 45aa0c2b22..a94d45efc9 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -97,6 +97,7 @@ PacketVersion versionForPacketType(PacketType packetType) { case PacketType::EntityQueryInitialResultsComplete: return static_cast(EntityVersion::ParticleSpin); case PacketType::BulkAvatarTraitsAck: + case PacketType::BulkAvatarTraits: return static_cast(AvatarMixerPacketVersion::AvatarTraitsAck); default: return 22; From 9e887585fae85fe957fe5f8f39a128858e2294b5 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Sat, 5 Jan 2019 13:49:33 -0800 Subject: [PATCH 7/9] Bad write length calculation was causing faults. --- assignment-client/src/avatars/AvatarMixerSlave.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 5f24ae1b64..6ad5a4dbf1 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -135,7 +135,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // hold sending more traits until we've been acked that the last one we sent was received if (lastSentVersionRef == lastAckedVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) { - addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); + bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // there is an update to this trait, add it to the traits packet bytesWritten += sendingAvatar->packTrait(traitType, traitsPacketList, lastReceivedVersion); // update the last sent version @@ -185,7 +185,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis continue; } if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) { - addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); + bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // this instance version exists and has never been sent or is newer so we need to send it bytesWritten += sendingAvatar->packTraitInstance(traitType, instanceID, traitsPacketList, receivedVersion); @@ -202,7 +202,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis pendingTraitVersions.instanceInsert(traitType, instanceID, receivedVersion); } else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) { - addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); + bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); // this instance version was deleted and we haven't sent the delete to this client yet bytesWritten += AvatarTraits::packInstancedTraitDelete(traitType, instanceID, traitsPacketList, absoluteReceivedVersion); From 0a76554d572bb193d4b89461ab4957d4f9849f44 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Sat, 5 Jan 2019 14:04:26 -0800 Subject: [PATCH 8/9] Remove debugging code --- libraries/avatars/src/AvatarHashMap.cpp | 29 +++++++++---------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/libraries/avatars/src/AvatarHashMap.cpp b/libraries/avatars/src/AvatarHashMap.cpp index b3add74f9c..6a67ef6638 100644 --- a/libraries/avatars/src/AvatarHashMap.cpp +++ b/libraries/avatars/src/AvatarHashMap.cpp @@ -330,25 +330,19 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer void AvatarHashMap::processBulkAvatarTraits(QSharedPointer message, SharedNodePointer sendingNode) { AvatarTraits::TraitMessageSequence seq; - if (message->getBytesLeftToRead() > sizeof(AvatarTraits::TraitMessageSequence)) { - message->readPrimitive(&seq); + message->readPrimitive(&seq); - auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); - traitsAckPacket->writePrimitive(seq); - auto nodeList = DependencyManager::get(); - SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); - if (!avatarMixer.isNull()) { - // we have a mixer to send to, acknowledge that we received these - // traits. - nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); - } - } - else { - qWarning() << "No BulkAvatarTraits packet sequence number."; - return; + auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true); + traitsAckPacket->writePrimitive(seq); + auto nodeList = DependencyManager::get(); + SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer); + if (!avatarMixer.isNull()) { + // we have a mixer to send to, acknowledge that we received these + // traits. + nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer); } - while (message->getBytesLeftToRead() >= NUM_BYTES_RFC4122_UUID + sizeof(AvatarTraits::TraitType)) { + while (message->getBytesLeftToRead()) { // read the avatar ID to figure out which avatar this is for auto avatarID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); @@ -413,9 +407,6 @@ void AvatarHashMap::processBulkAvatarTraits(QSharedPointer mess message->readPrimitive(&traitType); } } - if (message->getBytesLeftToRead() > 0) { - qWarning() << "Leftover bytes in BulkAvatarTraits message"; - } } void AvatarHashMap::processKillAvatar(QSharedPointer message, SharedNodePointer sendingNode) { From 0297d337d57cdb52075b337732a864961ff68162 Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Tue, 8 Jan 2019 10:50:16 -0800 Subject: [PATCH 9/9] Add some comments as per CR request --- .../src/avatars/AvatarMixerClientData.h | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.h b/assignment-client/src/avatars/AvatarMixerClientData.h index 16054e3458..d1d6dd4d69 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.h +++ b/assignment-client/src/avatars/AvatarMixerClientData.h @@ -194,12 +194,26 @@ private: AvatarTraits::TraitMessageSequence _currentTraitsMessageSequence{ 0 }; + // Cache of trait versions sent in a given packet (indexed by sequence number) + // When an ack is received, the sequence number in the ack is used to look up + // the sent trait versions and they are copied to _perNodeAckedTraitVersions. + // We remember the data in _perNodePendingTraitVersions instead of requiring + // the client to return all of the versions for each trait it received in a given packet, + // reducing the size of the ack packet. std::unordered_map _perNodePendingTraitVersions; - std::unordered_map _lastSentTraitsTimestamps; - PerNodeTraitVersions _perNodeSentTraitVersions; + // Versions of traits that have been acked, which will be compared to incoming + // trait updates. Incoming updates going to a given node will be ignored if + // the ack for the previous packet (containing those versions) has not been + // received. PerNodeTraitVersions _perNodeAckedTraitVersions; + std::unordered_map _lastSentTraitsTimestamps; + + // cache of traits sent to a node which are compared to incoming traits to + // prevent sending traits that have already been sent. + PerNodeTraitVersions _perNodeSentTraitVersions; + std::atomic_bool _isIgnoreRadiusEnabled { false }; };