From 2e457e2212ef0e7dec1c6719adcfb06442d5326a Mon Sep 17 00:00:00 2001 From: Roxanne Skelly Date: Tue, 1 Jan 2019 20:50:46 -0800 Subject: [PATCH] Checkpoint trait flow control --- .../src/avatars/AvatarMixerClientData.cpp | 69 +++++++++++-------- .../src/avatars/AvatarMixerSlave.cpp | 24 ++++++- 2 files changed, 64 insertions(+), 29 deletions(-) diff --git a/assignment-client/src/avatars/AvatarMixerClientData.cpp b/assignment-client/src/avatars/AvatarMixerClientData.cpp index a20d6504de..b2b09df6d2 100644 --- a/assignment-client/src/avatars/AvatarMixerClientData.cpp +++ b/assignment-client/src/avatars/AvatarMixerClientData.cpp @@ -19,9 +19,7 @@ #include "AvatarMixerSlave.h" -AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : - NodeData(nodeID, nodeLocalID) -{ +AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) { // in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID _avatar->setID(nodeID); } @@ -82,12 +80,11 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData } int AvatarMixerClientData::parseData(ReceivedMessage& message) { - // pull the sequence number from the data first uint16_t sequenceNumber; message.readPrimitive(&sequenceNumber); - + if (sequenceNumber < _lastReceivedSequenceNumber && _lastReceivedSequenceNumber != UINT16_MAX) { incrementNumOutOfOrderSends(); } @@ -98,7 +95,8 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) { } void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, - const SlaveSharedData& slaveSharedData, Node& sendingNode) { + const SlaveSharedData& slaveSharedData, + Node& sendingNode) { // pull the trait version from the message AvatarTraits::TraitVersion packetTraitVersion; message.readPrimitive(&packetTraitVersion); @@ -137,7 +135,7 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, AvatarTraits::TraitInstanceID instanceID = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID)); if (message.getBytesLeftToRead() == 0) { - qWarning () << "Received an instanced trait with no size from" << message.getSenderSockAddr(); + qWarning() << "Received an instanced trait with no size from" << message.getSenderSockAddr(); break; } @@ -145,7 +143,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, message.readPrimitive(&traitSize); if (traitSize < -1 || traitSize > message.getBytesLeftToRead()) { - qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" << message.getSenderSockAddr(); + qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" + << message.getSenderSockAddr(); break; } @@ -171,7 +170,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, message.seek(message.getPosition() + traitSize); } } else { - qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" << message.getSenderSockAddr(); + qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" + << message.getSenderSockAddr(); break; } } @@ -183,42 +183,56 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message, } void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) { - // Look up the avatar/trait data associated with this ack and update the 'last ack' list // with it. AvatarTraits::TraitMessageSequence seq; message.readPrimitive(&seq); - auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq); + auto sentAvatarTraitVersions = _pendingTraitVersions.find(seq); if (sentAvatarTraitVersions != _pendingTraitVersions.end()) { // Note, this is not a simple move of the pending traits // to the acked traits. Instead, it's a copy where existing - // trait versions in the acked hash are retained for traits not - // included in the pending hash + // trait versions in the acked hash are retained for traits not + // included in the pending hash for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) { auto& nodeId = nodeTraitVersions.first; auto& versions = nodeTraitVersions.second; auto simpleReceivedIt = versions.simpleCBegin(); while (simpleReceivedIt != versions.simpleCEnd()) { - auto traitType = static_cast(std::distance(versions.simpleCBegin(), - simpleReceivedIt)); + auto traitType = static_cast(std::distance(versions.simpleCBegin(), simpleReceivedIt)); _ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt; simpleReceivedIt++; } + + // enumerate the sent instanced trait versions + auto instancedSentIt = versions.instancedCBegin(); + while (instancedSentIt != versions.instancedCEnd()) { + auto traitType = instancedSentIt->traitType; + // get or create the sent trait versions for this trait type + auto& sentIDValuePairs = versions.getInstanceIDValuePairs(traitType); + + // enumerate each sent instance + for (auto& sentInstance : instancedSentIt->instances) { + auto instanceID = sentInstance.id; + const auto sentVersion = sentInstance.value; + _ackedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion); + } + instancedSentIt++; + } } _pendingTraitVersions.erase(sentAvatarTraitVersions); - } - else { + } else { // This can happen either the BulkAvatarTraits was sent with no simple traits, // or if the avatar mixer restarts while there are pending // BulkAvatarTraits messages in-flight. if (seq > getTraitsMessageSequence()) { - qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " << message.getSenderSockAddr(); + qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " + << message.getSenderSockAddr(); } } } - -void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode, +void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, + Node& sendingNode, AvatarTraits::TraitVersion traitVersion) { const auto& whitelist = slaveSharedData.skeletonURLWhitelist; @@ -244,8 +258,8 @@ void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedDa // make sure we're not unecessarily overriding the default avatar with the default avatar if (_avatar->getWireSafeSkeletonModelURL() != slaveSharedData.skeletonReplacementURL) { // we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change - qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() - << "to replacement" << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); + qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement" + << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID(); _avatar->setSkeletonModelURL(slaveSharedData.skeletonReplacementURL); auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true); @@ -327,7 +341,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { _currentViewFrustums.clear(); auto sourceBuffer = reinterpret_cast(message.constData()); - + uint8_t numFrustums = 0; memcpy(&numFrustums, sourceBuffer, sizeof(numFrustums)); sourceBuffer += sizeof(numFrustums); @@ -342,9 +356,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) { bool AvatarMixerClientData::otherAvatarInView(const AABox& otherAvatarBox) { return std::any_of(std::begin(_currentViewFrustums), std::end(_currentViewFrustums), - [&](const ConicalViewFrustum& viewFrustum) { - return viewFrustum.intersects(otherAvatarBox); - }); + [&](const ConicalViewFrustum& viewFrustum) { return viewFrustum.intersects(otherAvatarBox); }); } void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { @@ -355,14 +367,15 @@ void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const { jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends; jsonObject[OUTBOUND_AVATAR_DATA_STATS_KEY] = getOutboundAvatarDataKbps(); - jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT; + jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float)BYTES_PER_KILOBIT; jsonObject["av_data_receive_rate"] = _avatar->getReceiveRate(); jsonObject["recent_other_av_in_view"] = _recentOtherAvatarsInView; jsonObject["recent_other_av_out_of_view"] = _recentOtherAvatarsOutOfView; } -AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar) const { +AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint( + Node::LocalID otherAvatar) const { auto it = _lastSentTraitsTimestamps.find(otherAvatar); if (it != _lastSentTraitsTimestamps.end()) { diff --git a/assignment-client/src/avatars/AvatarMixerSlave.cpp b/assignment-client/src/avatars/AvatarMixerSlave.cpp index 1e84da7e55..5ed10dad98 100644 --- a/assignment-client/src/avatars/AvatarMixerSlave.cpp +++ b/assignment-client/src/avatars/AvatarMixerSlave.cpp @@ -150,6 +150,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // get or create the sent trait versions for this trait type auto& sentIDValuePairs = lastSentVersions.getInstanceIDValuePairs(traitType); + auto& ackIDValuePairs = lastAckedVersions.getInstanceIDValuePairs(traitType); // enumerate each received instance for (auto& receivedInstance : instancedReceivedIt->instances) { @@ -167,7 +168,16 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis { return sentInstance.id == instanceID; }); - + // look for existing acked version for this instance + auto ackedInstanceIt = std::find_if(ackIDValuePairs.begin(), ackIDValuePairs.end(), + [instanceID](auto& ackInstance) { return ackInstance.id == instanceID; }); + + // if we have a sent version, then we must have an acked instance of the same trait with the same + // version to go on, otherwise we drop the received trait + if (sentInstanceIt != sentIDValuePairs.end() && + (ackedInstanceIt == ackIDValuePairs.end() || sentInstanceIt->value != ackedInstanceIt->value)) { + continue; + } if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) { addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); @@ -179,6 +189,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis } else { sentIDValuePairs.emplace_back(instanceID, receivedVersion); } + + auto& pendingTraitVersions = + listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), + otherNodeLocalID); + pendingTraitVersions.instanceInsert(traitType, instanceID, receivedVersion); + } else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) { addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten); @@ -187,6 +203,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis // update the last sent version for this trait instance to the absolute value of the deleted version sentInstanceIt->value = absoluteReceivedVersion; + + auto& pendingTraitVersions = + listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), + otherNodeLocalID); + pendingTraitVersions.instanceInsert(traitType, instanceID, absoluteReceivedVersion); + } }