build out bulk avatar data replication

This commit is contained in:
Stephen Birarda 2017-06-13 13:42:19 -07:00
parent 98abb23783
commit dc94f83591
6 changed files with 278 additions and 191 deletions

View file

@ -81,13 +81,11 @@ static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 start = usecTimestampNow();
if (node->getLinkedData()) {
if (node->getType() == NodeType::Agent && node->getActiveSocket() && !node->isUpstream()) {
if (node->getType() == NodeType::Agent && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) {
broadcastAvatarDataToAgent(node);
} else if (node->getType() == NodeType::DownstreamAvatarMixer) {
broadcastAvatarDataToDownstreamMixer(node);
}
}
quint64 end = usecTimestampNow();
_stats.jobElapsedTime += (end - start);
@ -174,12 +172,11 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
std::unordered_map<AvatarSharedPointer, SharedNodePointer> avatarDataToNodes;
std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) {
// make sure this is an agent that we have avatar data for before considering it for inclusion
if (otherNode->getType() == NodeType::Agent
&& otherNode->getLinkedData()) {
const AvatarMixerClientData* otherNodeData = reinterpret_cast<const AvatarMixerClientData*>(otherNode->getLinkedData());
// theoretically it's possible for a Node to be in the NodeList (and therefore end up here),
// but not have yet sent data that's linked to the node. Check for that case and don't
// consider those nodes.
if (otherNodeData) {
AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer();
avatarList << otherAvatar;
avatarDataToNodes[otherAvatar] = otherNode;
@ -190,7 +187,6 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
ViewFrustum cameraView = nodeData->getViewFrustom();
std::priority_queue<AvatarPriority> sortedAvatars;
AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars,
[&](AvatarSharedPointer avatar)->uint64_t{
auto avatarNode = avatarDataToNodes[avatar];
assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map
@ -426,6 +422,92 @@ void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node)
}
void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) {
_stats.nodesBroadcastedTo++;
// setup a PacketList for the replicated bulk avatar data
auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData);
int numAvatarDataBytes = 0;
std::for_each(_begin, _end, [&](const SharedNodePointer& agentNode) {
// collect agents that we have avatar data for
if (agentNode->getType() == NodeType::Agent && agentNode->getLinkedData()) {
const AvatarMixerClientData* agentNodeData = reinterpret_cast<const AvatarMixerClientData*>(agentNode->getLinkedData());
AvatarSharedPointer otherAvatar = agentNodeData->getAvatarSharedPointer();
quint64 startAvatarDataPacking = usecTimestampNow();
// we cannot send a downstream avatar mixer any updates that expect them to have previous state for this avatar
// since we have no idea if they're online and receiving our packets
// so we always send a full update for this avatar
quint64 start = usecTimestampNow();
AvatarDataPacket::HasFlags flagsOut;
QByteArray avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, {},
flagsOut, false, false,
glm::vec3(0), nullptr);
quint64 end = usecTimestampNow();
_stats.toByteArrayElapsedTime += (end - start);
// figure out how large our avatar byte array can be to fit in the packet list
// given that we need it and the avatar UUID and the size of the byte array (16 bit)
// to fit in a segment of the packet list
auto maxAvatarByteArraySize = avatarPacketList->getMaxSegmentSize();
maxAvatarByteArraySize -= NUM_BYTES_RFC4122_UUID;
maxAvatarByteArraySize -= sizeof(quint16);
if (avatarByteArray.size() > maxAvatarByteArraySize) {
qCWarning(avatars) << "Replicated avatar data too large for" << otherAvatar->getSessionUUID()
<< "-" << avatarByteArray.size() << "bytes";
avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, {},
flagsOut, true, false,
glm::vec3(0), nullptr);
if (avatarByteArray.size() > maxAvatarByteArraySize) {
qCWarning(avatars) << "Replicated avatar data without facial data still too large for"
<< otherAvatar->getSessionUUID() << "-" << avatarByteArray.size() << "bytes";
avatarByteArray = otherAvatar->toByteArray(AvatarData::MinimumData, 0, {},
flagsOut, true, false,
glm::vec3(0), nullptr);
}
}
if (avatarByteArray.size() <= maxAvatarByteArraySize) {
// start a new segment in the packet list for this avatar
avatarPacketList->startSegment();
numAvatarDataBytes += avatarPacketList->write(agentNode->getUUID().toRfc4122());
numAvatarDataBytes += avatarPacketList->writePrimitive((quint16) avatarByteArray.size());
numAvatarDataBytes += avatarPacketList->write(avatarByteArray);
avatarPacketList->endSegment();
} else {
qCWarning(avatars) << "Could not fit minimum data avatar for" << otherAvatar->getSessionUUID()
<< "to packet list -" << avatarByteArray.size() << "bytes";
}
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
}
});
quint64 startPacketSending = usecTimestampNow();
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
// send the replicated bulk avatar data
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket());
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
}

View file

@ -24,6 +24,8 @@ public:
PacketVersion getVersion() const { return _packetVersion; }
const QUuid& getSourceID() const { return _sourceID; }
qint64 getMaxSegmentSize() const override { return NLPacket::maxPayloadSize(_packetType, _isOrdered); }
private:
NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false,
bool isOrdered = false);

View file

@ -42,7 +42,7 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
<< PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement
<< PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame
<< PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedKillAvatar;
<< PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedKillAvatar << PacketType::ReplicatedBulkAvatarData;
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {

View file

@ -121,7 +121,7 @@ public:
ReplicatedSilentAudioFrame,
ReplicatedAvatarIdentity,
ReplicatedKillAvatar,
ReplicatedBulkAvatarData,
NUM_PACKET_TYPE
};
};

View file

@ -36,8 +36,8 @@ std::unique_ptr<PacketList> PacketList::fromReceivedPackets(std::list<std::uniqu
PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool isReliable, bool isOrdered) :
_packetType(packetType),
_isReliable(isReliable),
_isOrdered(isOrdered),
_isReliable(isReliable),
_extendedHeader(extendedHeader)
{
Q_ASSERT_X(!(!_isReliable && _isOrdered), "PacketList", "Unreliable ordered PacketLists are not currently supported");
@ -46,8 +46,8 @@ PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool is
PacketList::PacketList(PacketList&& other) :
_packetType(other._packetType),
_packets(std::move(other._packets)),
_isReliable(other._isReliable),
_isOrdered(other._isOrdered),
_isReliable(other._isReliable),
_extendedHeader(std::move(other._extendedHeader))
{
}

View file

@ -49,6 +49,8 @@ public:
void startSegment();
void endSegment();
virtual qint64 getMaxSegmentSize() const { return Packet::maxPayloadSize(_isOrdered); }
HifiSockAddr getSenderSockAddr() const;
void closeCurrentPacket(bool shouldSendEmpty = false);
@ -75,6 +77,8 @@ protected:
PacketType _packetType;
std::list<std::unique_ptr<Packet>> _packets;
bool _isOrdered = false;
private:
friend class ::LimitedNodeList;
friend class PacketQueue;
@ -93,7 +97,6 @@ private:
Packet::MessageNumber _messageNumber;
bool _isReliable = false;
bool _isOrdered = false;
std::unique_ptr<Packet> _currentPacket;