Checkpoint trait-level flow control

This commit is contained in:
Roxanne Skelly 2018-12-21 15:48:58 -08:00
parent 1977180ecf
commit 3a282045ff
7 changed files with 68 additions and 4 deletions

View file

@ -56,6 +56,7 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket");
packetReceiver.registerListener(PacketType::AvatarIdentityRequest, this, "handleAvatarIdentityRequestPacket");
packetReceiver.registerListener(PacketType::SetAvatarTraits, this, "queueIncomingPacket");
packetReceiver.registerListener(PacketType::BulkAvatarTraitsAck, this, "queueIncomingPacket");
packetReceiver.registerListenerForTypes({
PacketType::ReplicatedAvatarIdentity,

View file

@ -68,6 +68,9 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData
case PacketType::SetAvatarTraits:
processSetTraitsMessage(*packet, slaveSharedData, *node);
break;
case PacketType::BulkAvatarTraitsAck:
processBulkAvatarTraitsAckMessage(*packet);
break;
default:
Q_UNREACHABLE();
}
@ -179,6 +182,29 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
}
}
void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) {
// Look up the avatar/trait data associated with this ack and update the 'last ack' list
// with it.
AvatarTraits::TraitMessageSequence seq;
message.readPrimitive(&seq);
auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq);
if (sentAvatarTraitVersions != _pendingTraitVersions.end()) {
for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) {
auto& nodeId = nodeTraitVersions.first;
auto& versions = nodeTraitVersions.second;
auto simpleReceivedIt = versions.simpleCBegin();
while (simpleReceivedIt != versions.simpleCEnd()) {
auto traitType = static_cast<AvatarTraits::TraitType>(std::distance(versions.simpleCBegin(),
simpleReceivedIt));
_ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt;
}
}
_pendingTraitVersions.erase(sentAvatarTraitVersions);
}
}
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode,
AvatarTraits::TraitVersion traitVersion) {
const auto& whitelist = slaveSharedData.skeletonURLWhitelist;

View file

@ -42,6 +42,7 @@ public:
AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID);
virtual ~AvatarMixerClientData() {}
using HRCTime = p_high_resolution_clock::time_point;
using NodeTraitVersions = std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions>;
int parseData(ReceivedMessage& message) override;
AvatarData& getAvatar() { return *_avatar; }
@ -124,6 +125,7 @@ public:
int processPackets(const SlaveSharedData& slaveSharedData); // returns number of packets processed
void processSetTraitsMessage(ReceivedMessage& message, const SlaveSharedData& slaveSharedData, Node& sendingNode);
void processBulkAvatarTraitsAckMessage(ReceivedMessage& message);
void checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData, Node& sendingNode,
AvatarTraits::TraitVersion traitVersion);
@ -138,7 +140,14 @@ public:
void setLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar, TraitsCheckTimestamp sendPoint)
{ _lastSentTraitsTimestamps[otherAvatar] = sendPoint; }
AvatarTraits::TraitMessageSequence getTraitsMessageSequence() const { return _currentTraitsMessageSequence; }
AvatarTraits::TraitMessageSequence nextTraitsMessageSequence() { return ++_currentTraitsMessageSequence; }
AvatarTraits::TraitVersions& getPendingTraitVersions(AvatarTraits::TraitMessageSequence seq, Node::LocalID otherId) {
return _pendingTraitVersions[seq][otherId];
}
AvatarTraits::TraitVersions& getLastSentTraitVersions(Node::LocalID otherAvatar) { return _sentTraitVersions[otherAvatar]; }
AvatarTraits::TraitVersions& getLastAckedTraitVersions(Node::LocalID otherAvatar) { return _ackedTraitVersions[otherAvatar]; }
void resetSentTraitData(Node::LocalID nodeID);
@ -183,8 +192,13 @@ private:
AvatarTraits::TraitVersions _lastReceivedTraitVersions;
TraitsCheckTimestamp _lastReceivedTraitsChange;
AvatarTraits::TraitMessageSequence _currentTraitsMessageSequence{ 0 };
std::unordered_map<AvatarTraits::TraitMessageSequence, NodeTraitVersions> _pendingTraitVersions;
std::unordered_map<Node::LocalID, TraitsCheckTimestamp> _lastSentTraitsTimestamps;
std::unordered_map<Node::LocalID, AvatarTraits::TraitVersions> _sentTraitVersions;
NodeTraitVersions _sentTraitVersions;
NodeTraitVersions _ackedTraitVersions;
std::atomic_bool _isIgnoreRadiusEnabled { false };
};

View file

@ -103,6 +103,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
// compare trait versions so we can see what exactly needs to go out
auto& lastSentVersions = listeningNodeData->getLastSentTraitVersions(otherNodeLocalID);
auto& lastAckedVersions = listeningNodeData->getLastAckedTraitVersions(otherNodeLocalID);
const auto& lastReceivedVersions = sendingNodeData->getLastReceivedTraitVersions();
auto simpleReceivedIt = lastReceivedVersions.simpleCBegin();
@ -112,13 +113,18 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
auto lastReceivedVersion = *simpleReceivedIt;
auto& lastSentVersionRef = lastSentVersions[traitType];
auto& lastAckedVersionRef = lastAckedVersions[traitType];
if (lastReceivedVersions[traitType] > lastSentVersionRef) {
// hold sending more traits until we've been acked that the last one we sent was received
if (lastAckedVersionRef == lastSentVersionRef && lastReceivedVersions[traitType] > lastSentVersionRef) {
// there is an update to this trait, add it to the traits packet
bytesWritten += sendingAvatar->packTrait(traitType, traitsPacketList, lastReceivedVersion);
// update the last sent version
lastSentVersionRef = lastReceivedVersion;
// Remember which versions we sent in this particular packet
// so we can verify when it's acked.
auto& pendingTraitVersions = listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(), otherNodeLocalID);
pendingTraitVersions[traitType] = lastReceivedVersion;
}
++simpleReceivedIt;
@ -419,6 +425,8 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
int remainingAvatars = (int)sortedAvatars.size();
auto traitsPacketList = NLPacketList::create(PacketType::BulkAvatarTraits, QByteArray(), true, true);
traitsPacketList->writePrimitive(nodeData->nextTraitsMessageSequence());
auto avatarPacket = NLPacket::create(PacketType::BulkAvatarData);
const int avatarPacketCapacity = avatarPacket->getPayloadCapacity();
int avatarSpaceAvailable = avatarPacketCapacity;

View file

@ -328,6 +328,17 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer<ReceivedMessage>
}
void AvatarHashMap::processBulkAvatarTraits(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
AvatarTraits::TraitMessageSequence seq;
message->readPrimitive(&seq);
// we have a mixer to send to, setup our set traits packet
auto traitsAckPacket = NLPacket::create(PacketType::BulkAvatarTraitsAck, sizeof(AvatarTraits::TraitMessageSequence), true);
traitsAckPacket->writePrimitive(seq);
auto nodeList = DependencyManager::get<LimitedNodeList>();
SharedNodePointer avatarMixer = nodeList->soloNodeOfType(NodeType::AvatarMixer);
nodeList->sendPacket(std::move(traitsAckPacket), *avatarMixer);
while (message->getBytesLeftToRead()) {
// read the avatar ID to figure out which avatar this is for

View file

@ -41,6 +41,10 @@ namespace AvatarTraits {
const TraitWireSize DELETED_TRAIT_SIZE = -1;
const TraitWireSize MAXIMUM_TRAIT_SIZE = INT16_MAX;
using TraitMessageSequence = int64_t;
const TraitMessageSequence FIRST_TRAIT_SEQUENCE = 0;
const TraitMessageSequence MAX_TRAIT_SEQUENCE = INT64_MAX;
inline qint64 packInstancedTraitDelete(TraitType traitType, TraitInstanceID instanceID, ExtendedIODevice& destination,
TraitVersion traitVersion = NULL_TRAIT_VERSION) {
qint64 bytesWritten = 0;

View file

@ -133,7 +133,7 @@ public:
EntityQueryInitialResultsComplete,
BulkAvatarTraits,
AudioSoloRequest,
BulkAvatarTraitsAck,
NUM_PACKET_TYPE
};