Checkpoint trait flow control

This commit is contained in:
Roxanne Skelly 2019-01-01 20:50:46 -08:00
parent c9e6d2711d
commit 2e457e2212
2 changed files with 64 additions and 29 deletions

View file

@ -19,9 +19,7 @@
#include "AvatarMixerSlave.h"
AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) :
NodeData(nodeID, nodeLocalID)
{
AvatarMixerClientData::AvatarMixerClientData(const QUuid& nodeID, Node::LocalID nodeLocalID) : NodeData(nodeID, nodeLocalID) {
// in case somebody calls getSessionUUID on the AvatarData instance, make sure it has the right ID
_avatar->setID(nodeID);
}
@ -82,12 +80,11 @@ int AvatarMixerClientData::processPackets(const SlaveSharedData& slaveSharedData
}
int AvatarMixerClientData::parseData(ReceivedMessage& message) {
// pull the sequence number from the data first
uint16_t sequenceNumber;
message.readPrimitive(&sequenceNumber);
if (sequenceNumber < _lastReceivedSequenceNumber && _lastReceivedSequenceNumber != UINT16_MAX) {
incrementNumOutOfOrderSends();
}
@ -98,7 +95,8 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) {
}
void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
const SlaveSharedData& slaveSharedData, Node& sendingNode) {
const SlaveSharedData& slaveSharedData,
Node& sendingNode) {
// pull the trait version from the message
AvatarTraits::TraitVersion packetTraitVersion;
message.readPrimitive(&packetTraitVersion);
@ -137,7 +135,7 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
AvatarTraits::TraitInstanceID instanceID = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));
if (message.getBytesLeftToRead() == 0) {
qWarning () << "Received an instanced trait with no size from" << message.getSenderSockAddr();
qWarning() << "Received an instanced trait with no size from" << message.getSenderSockAddr();
break;
}
@ -145,7 +143,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
message.readPrimitive(&traitSize);
if (traitSize < -1 || traitSize > message.getBytesLeftToRead()) {
qWarning() << "Refusing to process instanced trait of size" << traitSize << "from" << message.getSenderSockAddr();
qWarning() << "Refusing to process instanced trait of size" << traitSize << "from"
<< message.getSenderSockAddr();
break;
}
@ -171,7 +170,8 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
message.seek(message.getPosition() + traitSize);
}
} else {
qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from" << message.getSenderSockAddr();
qWarning() << "Refusing to process traits packet with instanced trait of unprocessable type from"
<< message.getSenderSockAddr();
break;
}
}
@ -183,42 +183,56 @@ void AvatarMixerClientData::processSetTraitsMessage(ReceivedMessage& message,
}
void AvatarMixerClientData::processBulkAvatarTraitsAckMessage(ReceivedMessage& message) {
// Look up the avatar/trait data associated with this ack and update the 'last ack' list
// with it.
AvatarTraits::TraitMessageSequence seq;
message.readPrimitive(&seq);
auto& sentAvatarTraitVersions = _pendingTraitVersions.find(seq);
auto sentAvatarTraitVersions = _pendingTraitVersions.find(seq);
if (sentAvatarTraitVersions != _pendingTraitVersions.end()) {
// Note, this is not a simple move of the pending traits
// to the acked traits. Instead, it's a copy where existing
// trait versions in the acked hash are retained for traits not
// included in the pending hash
// trait versions in the acked hash are retained for traits not
// included in the pending hash
for (auto& nodeTraitVersions : sentAvatarTraitVersions->second) {
auto& nodeId = nodeTraitVersions.first;
auto& versions = nodeTraitVersions.second;
auto simpleReceivedIt = versions.simpleCBegin();
while (simpleReceivedIt != versions.simpleCEnd()) {
auto traitType = static_cast<AvatarTraits::TraitType>(std::distance(versions.simpleCBegin(),
simpleReceivedIt));
auto traitType = static_cast<AvatarTraits::TraitType>(std::distance(versions.simpleCBegin(), simpleReceivedIt));
_ackedTraitVersions[nodeId][traitType] = *simpleReceivedIt;
simpleReceivedIt++;
}
// enumerate the sent instanced trait versions
auto instancedSentIt = versions.instancedCBegin();
while (instancedSentIt != versions.instancedCEnd()) {
auto traitType = instancedSentIt->traitType;
// get or create the sent trait versions for this trait type
auto& sentIDValuePairs = versions.getInstanceIDValuePairs(traitType);
// enumerate each sent instance
for (auto& sentInstance : instancedSentIt->instances) {
auto instanceID = sentInstance.id;
const auto sentVersion = sentInstance.value;
_ackedTraitVersions[nodeId].instanceInsert(traitType, instanceID, sentVersion);
}
instancedSentIt++;
}
}
_pendingTraitVersions.erase(sentAvatarTraitVersions);
}
else {
} else {
// This can happen either the BulkAvatarTraits was sent with no simple traits,
// or if the avatar mixer restarts while there are pending
// BulkAvatarTraits messages in-flight.
if (seq > getTraitsMessageSequence()) {
qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from " << message.getSenderSockAddr();
qWarning() << "Received BulkAvatarTraitsAck with future seq (potential avatar mixer restart) " << seq << " from "
<< message.getSenderSockAddr();
}
}
}
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData &slaveSharedData, Node& sendingNode,
void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedData& slaveSharedData,
Node& sendingNode,
AvatarTraits::TraitVersion traitVersion) {
const auto& whitelist = slaveSharedData.skeletonURLWhitelist;
@ -244,8 +258,8 @@ void AvatarMixerClientData::checkSkeletonURLAgainstWhitelist(const SlaveSharedDa
// make sure we're not unecessarily overriding the default avatar with the default avatar
if (_avatar->getWireSafeSkeletonModelURL() != slaveSharedData.skeletonReplacementURL) {
// we need to change this avatar's skeleton URL, and send them a traits packet informing them of the change
qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL()
<< "to replacement" << slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID();
qDebug() << "Overwriting avatar URL" << _avatar->getWireSafeSkeletonModelURL() << "to replacement"
<< slaveSharedData.skeletonReplacementURL << "for" << sendingNode.getUUID();
_avatar->setSkeletonModelURL(slaveSharedData.skeletonReplacementURL);
auto packet = NLPacket::create(PacketType::SetAvatarTraits, -1, true);
@ -327,7 +341,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) {
_currentViewFrustums.clear();
auto sourceBuffer = reinterpret_cast<const unsigned char*>(message.constData());
uint8_t numFrustums = 0;
memcpy(&numFrustums, sourceBuffer, sizeof(numFrustums));
sourceBuffer += sizeof(numFrustums);
@ -342,9 +356,7 @@ void AvatarMixerClientData::readViewFrustumPacket(const QByteArray& message) {
bool AvatarMixerClientData::otherAvatarInView(const AABox& otherAvatarBox) {
return std::any_of(std::begin(_currentViewFrustums), std::end(_currentViewFrustums),
[&](const ConicalViewFrustum& viewFrustum) {
return viewFrustum.intersects(otherAvatarBox);
});
[&](const ConicalViewFrustum& viewFrustum) { return viewFrustum.intersects(otherAvatarBox); });
}
void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const {
@ -355,14 +367,15 @@ void AvatarMixerClientData::loadJSONStats(QJsonObject& jsonObject) const {
jsonObject["total_num_out_of_order_sends"] = _numOutOfOrderSends;
jsonObject[OUTBOUND_AVATAR_DATA_STATS_KEY] = getOutboundAvatarDataKbps();
jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float) BYTES_PER_KILOBIT;
jsonObject[INBOUND_AVATAR_DATA_STATS_KEY] = _avatar->getAverageBytesReceivedPerSecond() / (float)BYTES_PER_KILOBIT;
jsonObject["av_data_receive_rate"] = _avatar->getReceiveRate();
jsonObject["recent_other_av_in_view"] = _recentOtherAvatarsInView;
jsonObject["recent_other_av_out_of_view"] = _recentOtherAvatarsOutOfView;
}
AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint(Node::LocalID otherAvatar) const {
AvatarMixerClientData::TraitsCheckTimestamp AvatarMixerClientData::getLastOtherAvatarTraitsSendPoint(
Node::LocalID otherAvatar) const {
auto it = _lastSentTraitsTimestamps.find(otherAvatar);
if (it != _lastSentTraitsTimestamps.end()) {

View file

@ -150,6 +150,7 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
// get or create the sent trait versions for this trait type
auto& sentIDValuePairs = lastSentVersions.getInstanceIDValuePairs(traitType);
auto& ackIDValuePairs = lastAckedVersions.getInstanceIDValuePairs(traitType);
// enumerate each received instance
for (auto& receivedInstance : instancedReceivedIt->instances) {
@ -167,7 +168,16 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
{
return sentInstance.id == instanceID;
});
// look for existing acked version for this instance
auto ackedInstanceIt = std::find_if(ackIDValuePairs.begin(), ackIDValuePairs.end(),
[instanceID](auto& ackInstance) { return ackInstance.id == instanceID; });
// if we have a sent version, then we must have an acked instance of the same trait with the same
// version to go on, otherwise we drop the received trait
if (sentInstanceIt != sentIDValuePairs.end() &&
(ackedInstanceIt == ackIDValuePairs.end() || sentInstanceIt->value != ackedInstanceIt->value)) {
continue;
}
if (!isDeleted && (sentInstanceIt == sentIDValuePairs.end() || receivedVersion > sentInstanceIt->value)) {
addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten);
@ -179,6 +189,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
} else {
sentIDValuePairs.emplace_back(instanceID, receivedVersion);
}
auto& pendingTraitVersions =
listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(),
otherNodeLocalID);
pendingTraitVersions.instanceInsert(traitType, instanceID, receivedVersion);
} else if (isDeleted && sentInstanceIt != sentIDValuePairs.end() && absoluteReceivedVersion > sentInstanceIt->value) {
addTraitsNodeHeader(listeningNodeData, sendingNodeData, traitsPacketList, bytesWritten);
@ -187,6 +203,12 @@ qint64 AvatarMixerSlave::addChangedTraitsToBulkPacket(AvatarMixerClientData* lis
// update the last sent version for this trait instance to the absolute value of the deleted version
sentInstanceIt->value = absoluteReceivedVersion;
auto& pendingTraitVersions =
listeningNodeData->getPendingTraitVersions(listeningNodeData->getTraitsMessageSequence(),
otherNodeLocalID);
pendingTraitVersions.instanceInsert(traitType, instanceID, absoluteReceivedVersion);
}
}