Merge pull request #14633 from roxanneskelly/Case20377

Trait-level flow control
This commit is contained in:
John Conklin II 2019-01-09 15:57:13 -08:00 committed by GitHub
commit 2757590158
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 192 additions and 28 deletions

View file

@ -56,6 +56,7 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket");
packetReceiver.registerListener(PacketType::AvatarIdentityRequest, this, "handleAvatarIdentityRequestPacket");
packetReceiver.registerListener(PacketType::SetAvatarTraits, this, "queueIncomingPacket");
packetReceiver.registerListener(PacketType::BulkAvatarTraitsAck, this, "queueIncomingPacket");
packetReceiver.registerListenerForTypes({
PacketType::ReplicatedAvatarIdentity,

View file

@ -19,9 +19,8 @@
#include "AvatarMixerSlave.h"
AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) :
NodeData(nodeID, nodeLocalID)
{
AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) :
NodeData(nodeID, nodeLocalID) {
// in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID
_avatar->setID(nodeID);
}
@ -68,6 +67,9 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData
case PacketType::SetAvatarTraits:
processSetTraitsMessage(*packet, slaveSharedData, *node);
break;
case PacketType::BulkAvatarTraitsAck:
processBulkAvatarTraitsAckMessage(*packet);
break;
default:
Q_UNREACHABLE();
}
@ -79,12 +81,11 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData
}
int AvatarMixerClientData::parseData(ReceivedMessage& message) {
// pull the sequence number from the data first
uint16_t sequenceNumber;
message.readPrimitive(&sequenceNumber);
if (sequenceNumber < _lastReceivedSequenceNumber && _lastReceivedSequenceNumber != UINT16_MAX) {
incrementNumOutOfOrderSends();
}
@ -95,7 +96,8 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) {
}
void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
const SlaveSharedData& slaveSharedData, Node& sendingNode) {
const SlaveSharedData& slaveSharedData,
Node& sendingNode) {
// pull the trait version from the message
AvatarTraits::TraitVersion packetTraitVersion;
message.readPrimitive(&packetTraitVersion);
@ -134,7 +136,7 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
AvatarTraits::TraitInstanceID instanceID = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));
if (message.getBytesLeftToRead() == 0) {
qWarning () << "Received an instanced trait with no size from" << message.getSenderSockAddr();
qWarning() << "Received an instanced trait with no size from" << message.getSenderSockAddr();
break;
}
@ -142,7 +144,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
message.readPrimitive(&traitSize);
if (traitSize < -1 || traitSize > message.getBytesLeftToRead()) {
qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" << message.getSenderSockAddr();
qWarning() << "Refusing to process instanced trait of size" << traitSize << "from"
<< message.getSenderSockAddr();
break;
}
@ -169,7 +172,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
message.seek(message.getPosition() + traitSize);
}
} else {
qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" << message.getSenderSockAddr();
qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from"
<< message.getSenderSockAddr();
break;
}
}
@ -180,7 +184,61 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
}
}
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode,
void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) {
// Avatar Traits flow control marks each outgoing avatar traits packet with a
// sequence number. The mixer caches the traits sent in the traits packet.
// Until an ack with the sequence number comes back, all updates to _traits
// in that packet_ are ignored. Updates to traits not in that packet will
// be sent.
// Look up the avatar/trait data associated with this ack and update the 'last ack' list
// with it.
AvatarTraits::TraitMessageSequence seq;
message.readPrimitive(&seq);
auto sentAvatarTraitVersions = _perNodePendingTraitVersions.find(seq);
if (sentAvatarTraitVersions != _perNodePendingTraitVersions.end()) {
for (auto& perNodeTraitVersions : sentAvatarTraitVersions->second) {
auto& nodeId = perNodeTraitVersions.first;
auto& traitVersions = perNodeTraitVersions.second;
// For each trait that was sent in the traits packet,
// update the 'acked' trait version. Traits not
// sent in the traits packet keep their version.
// process simple traits
auto simpleReceivedIt = traitVersions.simpleCBegin();
while (simpleReceivedIt != traitVersions.simpleCEnd()) {
auto traitType = static_cast<AvatarTraits::TraitType>(std::distance(traitVersions.simpleCBegin(), simpleReceivedIt));
_perNodeAckedTraitVersions[nodeId][traitType] = *simpleReceivedIt;
simpleReceivedIt++;
}
// process instanced traits
auto instancedSentIt = traitVersions.instancedCBegin();
while (instancedSentIt != traitVersions.instancedCEnd()) {
auto traitType = instancedSentIt->traitType;
for (auto& sentInstance : instancedSentIt->instances) {
auto instanceID = sentInstance.id;
const auto sentVersion = sentInstance.value;
_perNodeAckedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion);
}
instancedSentIt++;
}
}
_perNodePendingTraitVersions.erase(sentAvatarTraitVersions);
} else {
// This can happen either the BulkAvatarTraits was sent with no simple traits,
// or if the avatar mixer restarts while there are pending
// BulkAvatarTraits messages in-flight.
if (seq > getTraitsMessageSequence()) {
qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from "
<< message.getSenderSockAddr();
}
}
}
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData,
Node& sendingNode,
AvatarTraits::TraitVersion traitVersion) {
const auto& whitelist = slaveSharedData.skeletonURLWhitelist;
@ -282,14 +340,14 @@ void AvatarMixerClientData::removeFromRadiusIgnoringSet(const QUuid& other) {
void AvatarMixerClientData::resetSentTraitData(Node::LocalID nodeLocalID) {
_lastSentTraitsTimestamps[nodeLocalID] = TraitsCheckTimestamp();
_sentTraitVersions[nodeLocalID].reset();
_perNodeSentTraitVersions[nodeLocalID].reset();
}
void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) {
_currentViewFrustums.clear();
auto sourceBuffer = reinterpret_cast<const unsigned char*>(message.constData());
uint8_t numFrustums = 0;
memcpy(&numFrustums, sourceBuffer, sizeof(numFrustums));
sourceBuffer += sizeof(numFrustums);
@ -338,5 +396,5 @@ void AvatarMixerClientData::cleanupKilledNode(const QUuid&, Node::LocalID nodeLo
removeLastBroadcastSequenceNumber(nodeLocalID);
removeLastBroadcastTime(nodeLocalID);
_lastSentTraitsTimestamps.erase(nodeLocalID);
_sentTraitVersions.erase(nodeLocalID);
_perNodeSentTraitVersions.erase(nodeLocalID);
}

View file

@ -42,6 +42,7 @@ public:
AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID);
virtual ~AvatarMixerClientData() {}
using HRCTime = p_high_resolution_clock::time_point;
using PerNodeTraitVersions = std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions>;
int parseData(ReceivedMessage& message) override;
AvatarData& getAvatar() { return *_avatar; }
@ -124,6 +125,7 @@ public:
int processPackets(const SlaveSharedData& slaveSharedData); // returns number of packets processed
void processSetTraitsMessage(ReceivedMessage& message, const SlaveSharedData& slaveSharedData, Node& sendingNode);
void processBulkAvatarTraitsAckMessage(ReceivedMessage& message);
void checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, Node& sendingNode,
AvatarTraits::TraitVersion traitVersion);
@ -138,7 +140,14 @@ public:
void setLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar, TraitsCheckTimestamp sendPoint)
{ _lastSentTraitsTimestamps[otherAvatar] = sendPoint; }
AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _sentTraitVersions[otherAvatar]; }
AvatarTraits::TraitMessageSequence getTraitsMessageSequence() const { return _currentTraitsMessageSequence; }
AvatarTraits::TraitMessageSequence nextTraitsMessageSequence() { return ++_currentTraitsMessageSequence; }
AvatarTraits::TraitVersions& getPendingTraitVersions(AvatarTraits::TraitMessageSequence seq, Node::LocalID otherId) {
return _perNodePendingTraitVersions[seq][otherId];
}
AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _perNodeSentTraitVersions[otherAvatar]; }
AvatarTraits::TraitVersions& getLastAckedTraitVersions(Node::LocalID otherAvatar) { return _perNodeAckedTraitVersions[otherAvatar]; }
void resetSentTraitData(Node::LocalID nodeID);
@ -183,8 +192,27 @@ private:
AvatarTraits::TraitVersions _lastReceivedTraitVersions;
TraitsCheckTimestamp _lastReceivedTraitsChange;
AvatarTraits::TraitMessageSequence _currentTraitsMessageSequence{ 0 };
// Cache of trait versions sent in a given packet (indexed by sequence number)
// When an ack is received, the sequence number in the ack is used to look up
// the sent trait versions and they are copied to _perNodeAckedTraitVersions.
// We remember the data in _perNodePendingTraitVersions instead of requiring
// the client to return all of the versions for each trait it received in a given packet,
// reducing the size of the ack packet.
std::unordered_map<AvatarTraits::TraitMessageSequence, PerNodeTraitVersions> _perNodePendingTraitVersions;
// Versions of traits that have been acked, which will be compared to incoming
// trait updates. Incoming updates going to a given node will be ignored if
// the ack for the previous packet (containing those versions) has not been
// received.
PerNodeTraitVersions _perNodeAckedTraitVersions;
std::unordered_map<Node::LocalID, TraitsCheckTimestamp> _lastSentTraitsTimestamps;
std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions> _sentTraitVersions;
// cache of traits sent to a node which are compared to incoming traits to
// prevent sending traits that have already been sent.
PerNodeTraitVersions _perNodeSentTraitVersions;
std::atomic_bool _isIgnoreRadiusEnabled { false };
};

View file

@ -80,10 +80,31 @@ int AvatarMixerSlave::sendIdentityPacket(NLPacketList& packetList, const AvatarM
}
}
qint64 AvatarMixerSlave::addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList,
qint64 bytesWritten) {
if (bytesWritten == 0) {
if (traitsPacketList.getNumPackets() == 0) {
// This is the beginning of the traits packet, write out the sequence number.
bytesWritten += traitsPacketList.writePrimitive(listeningNodeData->nextTraitsMessageSequence());
}
// This is the beginning of the traits for a node, write out the node id
bytesWritten += traitsPacketList.write(sendingNodeData->getNodeID().toRfc4122());
}
return bytesWritten;
}
qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList) {
// Avatar Traits flow control marks each outgoing avatar traits packet with a
// sequence number. The mixer caches the traits sent in the traits packet.
// Until an ack with the sequence number comes back, all updates to _traits
// in that packet_ are ignored. Updates to traits not in that packet will
// be sent.
auto otherNodeLocalID = sendingNodeData->getNodeLocalID();
// Perform a simple check with two server clock time points
@ -96,13 +117,11 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
if (timeOfLastTraitsChange > timeOfLastTraitsSent) {
// there is definitely new traits data to send
// add the avatar ID to mark the beginning of traits for this avatar
bytesWritten += traitsPacketList.write(sendingNodeData->getNodeID().toRfc4122());
auto sendingAvatar = sendingNodeData->getAvatarSharedPointer();
// compare trait versions so we can see what exactly needs to go out
auto& lastSentVersions = listeningNodeData->getLastSentTraitVersions(otherNodeLocalID);
auto& lastAckedVersions = listeningNodeData->getLastAckedTraitVersions(otherNodeLocalID);
const auto& lastReceivedVersions = sendingNodeData->getLastReceivedTraitVersions();
auto simpleReceivedIt = lastReceivedVersions.simpleCBegin();
@ -112,13 +131,19 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
auto lastReceivedVersion = *simpleReceivedIt;
auto& lastSentVersionRef = lastSentVersions[traitType];
auto& lastAckedVersionRef = lastAckedVersions[traitType];
if (lastReceivedVersions[traitType] > lastSentVersionRef) {
// hold sending more traits until we've been acked that the last one we sent was received
if (lastSentVersionRef == lastAckedVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) {
bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten);
// there is an update to this trait, add it to the traits packet
bytesWritten += sendingAvatar->packTrait(traitType, traitsPacketList, lastReceivedVersion);
// update the last sent version
lastSentVersionRef = lastReceivedVersion;
// Remember which versions we sent in this particular packet
// so we can verify when it's acked.
auto& pendingTraitVersions = listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), otherNodeLocalID);
pendingTraitVersions[traitType] = lastReceivedVersion;
}
++simpleReceivedIt;
@ -131,6 +156,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
// get or create the sent trait versions for this trait type
auto& sentIDValuePairs = lastSentVersions.getInstanceIDValuePairs(traitType);
auto& ackIDValuePairs = lastAckedVersions.getInstanceIDValuePairs(traitType);
// enumerate each received instance
for (auto& receivedInstance : instancedReceivedIt->instances) {
@ -148,8 +174,18 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
{
return sentInstance.id == instanceID;
});
// look for existing acked version for this instance
auto ackedInstanceIt = std::find_if(ackIDValuePairs.begin(), ackIDValuePairs.end(),
[instanceID](auto& ackInstance) { return ackInstance.id == instanceID; });
// if we have a sent version, then we must have an acked instance of the same trait with the same
// version to go on, otherwise we drop the received trait
if (sentInstanceIt != sentIDValuePairs.end() &&
(ackedInstanceIt == ackIDValuePairs.end() || sentInstanceIt->value != ackedInstanceIt->value)) {
continue;
}
if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) {
bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten);
// this instance version exists and has never been sent or is newer so we need to send it
bytesWritten += sendingAvatar->packTraitInstance(traitType, instanceID, traitsPacketList, receivedVersion);
@ -159,22 +195,35 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
} else {
sentIDValuePairs.emplace_back(instanceID, receivedVersion);
}
auto& pendingTraitVersions =
listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(),
otherNodeLocalID);
pendingTraitVersions.instanceInsert(traitType, instanceID, receivedVersion);
} else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) {
bytesWritten += addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten);
// this instance version was deleted and we haven't sent the delete to this client yet
bytesWritten += AvatarTraits::packInstancedTraitDelete(traitType, instanceID, traitsPacketList, absoluteReceivedVersion);
// update the last sent version for this trait instance to the absolute value of the deleted version
sentInstanceIt->value = absoluteReceivedVersion;
auto& pendingTraitVersions =
listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(),
otherNodeLocalID);
pendingTraitVersions.instanceInsert(traitType, instanceID, absoluteReceivedVersion);
}
}
++instancedReceivedIt;
}
// write a null trait type to mark the end of trait data for this avatar
bytesWritten += traitsPacketList.writePrimitive(AvatarTraits::NullTrait);
if (bytesWritten) {
// write a null trait type to mark the end of trait data for this avatar
bytesWritten += traitsPacketList.writePrimitive(AvatarTraits::NullTrait);
}
// since we send all traits for this other avatar, update the time of last traits sent
// to match the time of last traits change
listeningNodeData->setLastOtherAvatarTraitsSendPoint(otherNodeLocalID, timeOfLastTraitsChange);
@ -419,6 +468,7 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
int remainingAvatars = (int)sortedAvatars.size();
auto traitsPacketList = NLPacketList::create(PacketType::BulkAvatarTraits, QByteArray(), true, true);
auto avatarPacket = NLPacket::create(PacketType::BulkAvatarData);
const int avatarPacketCapacity = avatarPacket->getPayloadCapacity();
int avatarSpaceAvailable = avatarPacketCapacity;
@ -550,6 +600,7 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
if (traitsPacketList->getNumPackets() >= 1) {
// send the traits packet list
nodeList->sendPacketList(std::move(traitsPacketList), *destinationNode);
}

View file

@ -104,6 +104,11 @@ private:
int sendIdentityPacket(NLPacketList& packet, const AvatarMixerClientData* nodeData, const Node& destinationNode);
int sendReplicatedIdentityPacket(const Node& agentNode, const AvatarMixerClientData* nodeData, const Node& destinationNode);
qint64 addTraitsNodeHeader(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList,
qint64 bytesWritten);
qint64 addChangedTraitsToBulkPacket(AvatarMixerClientData* listeningNodeData,
const AvatarMixerClientData* sendingNodeData,
NLPacketList& traitsPacketList);

View file

@ -2060,7 +2060,7 @@ Application::Application(int& argc, char** argv, QElapsedTimer& startupTimer, bo
properties["avatar_ping"] = avatarMixerNode ? avatarMixerNode->getPingMs() : -1;
properties["asset_ping"] = assetServerNode ? assetServerNode->getPingMs() : -1;
properties["messages_ping"] = messagesMixerNode ? messagesMixerNode->getPingMs() : -1;
properties["atp_in_kbps"] = messagesMixerNode ? assetServerNode->getInboundKbps() : 0.0f;
properties["atp_in_kbps"] = assetServerNode ? assetServerNode->getInboundKbps() : 0.0f;
auto loadingRequests = ResourceCache::getLoadingRequests();

View file

@ -328,6 +328,19 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer<ReceivedMessage>
}
void AvatarHashMap::processBulkAvatarTraits(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
AvatarTraits::TraitMessageSequence seq;
message->readPrimitive(&seq);
auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true);
traitsAckPacket->writePrimitive(seq);
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer);
if (!avatarMixer.isNull()) {
// we have a mixer to send to, acknowledge that we received these
// traits.
nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer);
}
while (message->getBytesLeftToRead()) {
// read the avatar ID to figure out which avatar this is for

View file

@ -42,6 +42,10 @@ namespace AvatarTraits {
const TraitWireSize DELETED_TRAIT_SIZE = -1;
const TraitWireSize MAXIMUM_TRAIT_SIZE = INT16_MAX;
using TraitMessageSequence = int64_t;
const TraitMessageSequence FIRST_TRAIT_SEQUENCE = 0;
const TraitMessageSequence MAX_TRAIT_SEQUENCE = INT64_MAX;
inline qint64 packInstancedTraitDelete(TraitType traitType, TraitInstanceID instanceID, ExtendedIODevice& destination,
TraitVersion traitVersion = NULL_TRAIT_VERSION) {
qint64 bytesWritten = 0;

View file

@ -97,6 +97,9 @@ PacketVersion versionForPacketType(PacketType packetType) {
return 22;
case PacketType::EntityQueryInitialResultsComplete:
return static_cast<PacketVersion>(EntityVersion::ParticleSpin);
case PacketType::BulkAvatarTraitsAck:
case PacketType::BulkAvatarTraits:
return static_cast<PacketVersion>(AvatarMixerPacketVersion::AvatarTraitsAck);
default:
return 22;
}

View file

@ -133,7 +133,7 @@ public:
EntityQueryInitialResultsComplete,
BulkAvatarTraits,
AudioSoloRequest,
BulkAvatarTraitsAck,
NUM_PACKET_TYPE
};
@ -310,7 +310,8 @@ enum class AvatarMixerPacketVersion : PacketVersion {
FarGrabJointsRedux,
JointTransScaled,
GrabTraits,
CollisionFlag
CollisionFlag,
AvatarTraitsAck
};
enum class DomainConnectRequestVersion : PacketVersion {