First pass at AvatarMixer replication

This commit is contained in:
Atlante45 2017-06-12 15:25:22 -07:00 committed by Stephen Birarda
parent dadbf445a1
commit d0e8612a65
6 changed files with 93 additions and 3 deletions

View file

@ -54,15 +54,79 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket");
packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket");
packetReceiver.registerListenerForTypes({
PacketType::ReplicatedAvatarIdentity,
PacketType::ReplicatedAvatarData,
PacketType::ReplicatedKillAvatar
}, this, "handleReplicatedPackets");
auto nodeList = DependencyManager::get<NodeList>();
connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch);
}
void AvatarMixer::handleReplicatedPackets(QSharedPointer<ReceivedMessage> message) {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
auto replicatedNode = nodeList->nodeWithUUID(nodeID);
if (!replicatedNode) {
QMetaObject::invokeMethod(nodeList.data(), "addOrUpdateNode", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(SharedNodePointer, replicatedNode),
Q_ARG(QUuid, nodeID), Q_ARG(NodeType_t, NodeType::Agent),
Q_ARG(HifiSockAddr, message->getSenderSockAddr()),
Q_ARG(HifiSockAddr, message->getSenderSockAddr()));
}
replicatedNode->setLastHeardMicrostamp(usecTimestampNow());
replicatedNode->setIsUpstream(true);
switch (message->getType()) {
case PacketType::AvatarData:
queueIncomingPacket(message, replicatedNode);
break;
case PacketType::AvatarIdentity:
handleAvatarIdentityPacket(message, replicatedNode);
break;
case PacketType::KillAvatar:
handleKillAvatarPacket(message);
break
}
}
void AvatarMixer::replicatePacket(ReceivedMessage& message) {
PacketType replicatedType;
if (message.getType() == PacketType::AvatarData) {
replicatedType = PacketType::ReplicatedAvatarData;
} else if (message.getType() == PacketType::AvatarIdentity) {
replicatedType = PacketType::ReplicatedAvatarIdentity;
} else if (message.getType() == PacketType::KillAvatar) {
replicatedType = PacketType::ReplicatedKillAvatar;
} else {
Q_UNREACHABLE();
return;
}
auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachMatchingNode([&](const SharedNodePointer& node) {
return node->getType() == NodeType::ReplicantAvatarMixer;
}, [&](const SharedNodePointer& node) {
// construct an NLPacket to send to the replicant that has the contents of the received packet
auto packet = NLPacket::create(replicatedType, message.getSize() + NUM_BYTES_RFC4122_UUID);
packet->write(message.getSourceID().toRfc4122());
packet->write(message.getMessage());
nodeList->sendPacket(std::move(packet), *node);
});
}
void AvatarMixer::queueIncomingPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
auto start = usecTimestampNow();
getOrCreateClientData(node)->queuePacket(message, node);
auto end = usecTimestampNow();
_queueIncomingPacketElapsedTime += (end - start);
replicatePacket(*message);
}
@ -397,6 +461,13 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer<ReceivedMessage> mes
if (nodeData != nullptr) {
AvatarData& avatar = nodeData->getAvatar();
auto data = message->getMessage();
if (message->getType() == PacketType::ReplicatedAvatarData) {
data = data.mid(NUM_BYTES_RFC4122_UUID);
}
// parse the identity packet and update the change timestamp if appropriate
AvatarData::Identity identity;
AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity);
@ -414,6 +485,8 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer<ReceivedMessage> mes
}
auto end = usecTimestampNow();
_handleAvatarIdentityPacketElapsedTime += (end - start);
replicatePacket(*message);
}
void AvatarMixer::handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message) {
@ -421,6 +494,8 @@ void AvatarMixer::handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message
DependencyManager::get<NodeList>()->processKillNode(*message);
auto end = usecTimestampNow();
_handleKillAvatarPacketElapsedTime += (end - start);
replicatePacket(*message);
}
void AvatarMixer::handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {

View file

@ -46,6 +46,7 @@ private slots:
void handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleRadiusIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleRequestsDomainListDataPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleReplicatedPackets(QSharedPointer<ReceivedMessage> message);
void domainSettingsRequestComplete();
void handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID);
void start();
@ -61,6 +62,8 @@ private:
void manageDisplayName(const SharedNodePointer& node);
void replicatePacket(ReceivedMessage& message);
p_high_resolution_clock::time_point _lastFrameTimestamp;
// FIXME - new throttling - use these values somehow

View file

@ -62,6 +62,10 @@ int AvatarMixerClientData::parseData(ReceivedMessage& message) {
// pull the sequence number from the data first
uint16_t sequenceNumber;
if (message.getType() == PacketType::ReplicatedAvatarData) {
message.seek(NUM_BYTES_RFC4122_UUID);
}
message.readPrimitive(&sequenceNumber);
if (sequenceNumber < _lastReceivedSequenceNumber && _lastReceivedSequenceNumber != UINT16_MAX) {

View file

@ -549,6 +549,10 @@ bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
}
void LimitedNodeList::processKillNode(ReceivedMessage& message) {
if (message.getType() == PacketType::ReplicatedAvatarData) {
message.seek(NUM_BYTES_RFC4122_UUID);
}
// read the node id
QUuid nodeUUID = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));

View file

@ -41,7 +41,8 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
<< PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode
<< PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement
<< PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame;
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame
<< PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedAvatarData << PacketType::ReplicatedKillAvatar;
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {
@ -121,7 +122,7 @@ static void ensureProtocolVersionsSignature() {
std::call_once(once, [&] {
QByteArray buffer;
QDataStream stream(&buffer, QIODevice::WriteOnly);
uint8_t numberOfProtocols = static_cast<uint8_t>(PacketType::LAST_PACKET_TYPE) + 1;
uint8_t numberOfProtocols = static_cast<uint8_t>(PacketType::NUM_PACKET_TYPE);
stream << numberOfProtocols;
for (uint8_t packetType = 0; packetType < numberOfProtocols; packetType++) {
uint8_t packetTypeVersion = static_cast<uint8_t>(versionForPacketType(static_cast<PacketType>(packetType)));

View file

@ -119,8 +119,11 @@ public:
ReplicatedMicrophoneAudioWithEcho,
ReplicatedInjectAudio,
ReplicatedSilentAudioFrame,
LAST_PACKET_TYPE = ReplicatedSilentAudioFrame,
ReplicatedAvatarIdentity,
ReplicatedAvatarData,
ReplicatedKillAvatar,
NUM_PACKET_TYPE
};
};