Merge pull request #10714 from highfidelity/replicants

add broadcasting/replication to downstream avatar mixers and audio mixers
This commit is contained in:
Stephen Birarda 2017-06-21 17:55:07 -07:00 committed by GitHub
commit c516dd06e4
38 changed files with 1259 additions and 416 deletions

View file

@ -55,7 +55,8 @@ QVector<AudioMixer::ZoneSettings> AudioMixer::_zoneSettings;
QVector<AudioMixer::ReverbSettings> AudioMixer::_zoneReverbSettings;
AudioMixer::AudioMixer(ReceivedMessage& message) :
ThreadedAssignment(message) {
ThreadedAssignment(message)
{
// Always clear settings first
// This prevents previous assignment settings from sticking around
@ -92,7 +93,21 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
packetReceiver.registerListener(PacketType::NodeMuteRequest, this, "handleNodeMuteRequestPacket");
packetReceiver.registerListener(PacketType::KillAvatar, this, "handleKillAvatarPacket");
packetReceiver.registerListenerForTypes({
PacketType::ReplicatedMicrophoneAudioNoEcho,
PacketType::ReplicatedMicrophoneAudioWithEcho,
PacketType::ReplicatedInjectAudio,
PacketType::ReplicatedSilentAudioFrame
},
this, "queueReplicatedAudioPacket"
);
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
if (node->getType() == NodeType::DownstreamAudioMixer) {
node->activatePublicSocket();
}
});
}
void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
@ -103,6 +118,33 @@ void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, Share
getOrCreateClientData(node.data())->queuePacket(message, node);
}
void AudioMixer::queueReplicatedAudioPacket(QSharedPointer<ReceivedMessage> message) {
// make sure we have a replicated node for the original sender of the packet
auto nodeList = DependencyManager::get<NodeList>();
QUuid nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
auto replicatedNode = nodeList->addOrUpdateNode(nodeID, NodeType::Agent,
message->getSenderSockAddr(), message->getSenderSockAddr(),
true, true);
replicatedNode->setLastHeardMicrostamp(usecTimestampNow());
// construct a "fake" audio received message from the byte array and packet list information
auto audioData = message->getMessage().mid(NUM_BYTES_RFC4122_UUID);
PacketType rewrittenType = REPLICATED_PACKET_MAPPING.key(message->getType());
if (rewrittenType == PacketType::Unknown) {
qDebug() << "Cannot unwrap replicated packet type not present in REPLICATED_PACKET_WRAPPING";
}
auto replicatedMessage = QSharedPointer<ReceivedMessage>::create(audioData, rewrittenType,
versionForPacketType(rewrittenType),
message->getSenderSockAddr(), nodeID);
getOrCreateClientData(replicatedNode.data())->queuePacket(replicatedMessage, replicatedNode);
}
void AudioMixer::handleMuteEnvironmentPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
auto nodeList = DependencyManager::get<NodeList>();
@ -347,7 +389,7 @@ void AudioMixer::start() {
auto nodeList = DependencyManager::get<NodeList>();
// prepare the NodeList
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer });
nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
// parse out any AudioMixer settings
@ -506,7 +548,7 @@ void AudioMixer::clearDomainSettings() {
_zoneReverbSettings.clear();
}
void AudioMixer::parseSettingsObject(const QJsonObject &settingsObject) {
void AudioMixer::parseSettingsObject(const QJsonObject& settingsObject) {
qDebug() << "AVX2 Support:" << (cpuSupportsAVX2() ? "enabled" : "disabled");
if (settingsObject.contains(AUDIO_THREADING_GROUP_KEY)) {

View file

@ -63,6 +63,7 @@ private slots:
void handleKillAvatarPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void queueAudioPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void queueReplicatedAudioPacket(QSharedPointer<ReceivedMessage> packet);
void removeHRTFsForFinishedInjector(const QUuid& streamID);
void start();

View file

@ -68,9 +68,21 @@ void AudioMixerClientData::processPackets() {
case PacketType::MicrophoneAudioWithEcho:
case PacketType::InjectAudio:
case PacketType::AudioStreamStats:
case PacketType::SilentAudioFrame: {
case PacketType::SilentAudioFrame:
case PacketType::ReplicatedMicrophoneAudioNoEcho:
case PacketType::ReplicatedMicrophoneAudioWithEcho:
case PacketType::ReplicatedInjectAudio:
case PacketType::ReplicatedSilentAudioFrame: {
if (node->isUpstream() && !_hasSetupCodecForUpstreamNode) {
setupCodecForReplicatedAgent(packet);
}
QMutexLocker lock(&getMutex());
parseData(*packet);
optionallyReplicatePacket(*packet, *node);
break;
}
case PacketType::NegotiateAudioFormat:
@ -97,6 +109,59 @@ void AudioMixerClientData::processPackets() {
assert(_packetQueue.empty());
}
bool isReplicatedPacket(PacketType packetType) {
return packetType == PacketType::ReplicatedMicrophoneAudioNoEcho
|| packetType == PacketType::ReplicatedMicrophoneAudioWithEcho
|| packetType == PacketType::ReplicatedInjectAudio
|| packetType == PacketType::ReplicatedSilentAudioFrame;
}
void AudioMixerClientData::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) {
// first, make sure that this is a packet from a node we are supposed to replicate
if (node.isReplicated()) {
// now make sure it's a packet type that we want to replicate
// first check if it is an original type that we should replicate
PacketType mirroredType = REPLICATED_PACKET_MAPPING.value(message.getType());
if (mirroredType == PacketType::Unknown) {
// if it wasn't check if it is a replicated type that we should re-replicate
if (REPLICATED_PACKET_MAPPING.key(message.getType()) != PacketType::Unknown) {
mirroredType = message.getType();
} else {
qDebug() << "Packet passed to optionallyReplicatePacket was not a replicatable type - returning";
return;
}
}
std::unique_ptr<NLPacket> packet;
auto nodeList = DependencyManager::get<NodeList>();
// enumerate the downstream audio mixers and send them the replicated version of this packet
nodeList->unsafeEachNode([&](const SharedNodePointer& downstreamNode) {
if (downstreamNode->getType() == NodeType::DownstreamAudioMixer) {
// construct the packet only once, if we have any downstream audio mixers to send to
if (!packet) {
// construct an NLPacket to send to the replicant that has the contents of the received packet
packet = NLPacket::create(mirroredType);
if (!isReplicatedPacket(message.getType())) {
// since this packet will be non-sourced, we add the replicated node's ID here
packet->write(node.getUUID().toRfc4122());
}
packet->write(message.getMessage());
}
nodeList->sendUnreliablePacket(*packet, *downstreamNode);
}
});
}
}
void AudioMixerClientData::negotiateAudioFormat(ReceivedMessage& message, const SharedNodePointer& node) {
quint8 numberOfCodecs;
message.readPrimitive(&numberOfCodecs);
@ -188,8 +253,11 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
bool isMicStream = false;
if (packetType == PacketType::MicrophoneAudioWithEcho
|| packetType == PacketType::ReplicatedMicrophoneAudioWithEcho
|| packetType == PacketType::MicrophoneAudioNoEcho
|| packetType == PacketType::SilentAudioFrame) {
|| packetType == PacketType::ReplicatedMicrophoneAudioNoEcho
|| packetType == PacketType::SilentAudioFrame
|| packetType == PacketType::ReplicatedSilentAudioFrame) {
QWriteLocker writeLocker { &_streamsLock };
@ -209,7 +277,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
avatarAudioStream->setupCodec(_codec, _selectedCodecName, AudioConstants::MONO);
qDebug() << "creating new AvatarAudioStream... codec:" << _selectedCodecName;
connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec, this, &AudioMixerClientData::handleMismatchAudioFormat);
connect(avatarAudioStream, &InboundAudioStream::mismatchedAudioCodec,
this, &AudioMixerClientData::handleMismatchAudioFormat);
auto emplaced = _audioStreams.emplace(
QUuid(),
@ -224,10 +293,12 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
writeLocker.unlock();
isMicStream = true;
} else if (packetType == PacketType::InjectAudio) {
} else if (packetType == PacketType::InjectAudio
|| packetType == PacketType::ReplicatedInjectAudio) {
// this is injected audio
// grab the stream identifier for this injected audio
message.seek(sizeof(quint16));
QUuid streamIdentifier = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));
bool isStereo;
@ -608,3 +679,22 @@ bool AudioMixerClientData::shouldIgnore(const SharedNodePointer self, const Shar
return shouldIgnore;
}
void AudioMixerClientData::setupCodecForReplicatedAgent(QSharedPointer<ReceivedMessage> message) {
// hop past the sequence number that leads the packet
message->seek(sizeof(quint16));
// pull the codec string from the packet
auto codecString = message->readString();
qDebug() << "Manually setting codec for replicated agent" << uuidStringWithoutCurlyBraces(getNodeID())
<< "-" << codecString;
const std::pair<QString, CodecPluginPointer> codec = AudioMixer::negotiateCodec({ codecString });
setupCodec(codec.second, codec.first);
_hasSetupCodecForUpstreamNode = true;
// seek back to the beginning of the message so other readers are in the right place
message->seek(0);
}

View file

@ -26,7 +26,6 @@
#include "PositionalAudioStream.h"
#include "AvatarAudioStream.h"
class AudioMixerClientData : public NodeData {
Q_OBJECT
public:
@ -108,6 +107,8 @@ public:
bool getRequestsDomainListData() { return _requestsDomainListData; }
void setRequestsDomainListData(bool requesting) { _requestsDomainListData = requesting; }
void setupCodecForReplicatedAgent(QSharedPointer<ReceivedMessage> message);
signals:
void injectorStreamFinished(const QUuid& streamIdentifier);
@ -124,6 +125,8 @@ private:
QReadWriteLock _streamsLock;
AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID
void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node);
using IgnoreZone = AABox;
class IgnoreZoneMemo {
public:
@ -181,6 +184,8 @@ private:
bool _shouldMuteClient { false };
bool _requestsDomainListData { false };
bool _hasSetupCodecForUpstreamNode { false };
};
#endif // hifi_AudioMixerClientData_h

View file

@ -74,6 +74,10 @@ void AudioMixerSlave::mix(const SharedNodePointer& node) {
return;
}
if (node->isUpstream()) {
return;
}
// check that the stream is valid
auto avatarStream = data->getAvatarAudioStream();
if (avatarStream == nullptr) {

View file

@ -54,8 +54,108 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
packetReceiver.registerListener(PacketType::RadiusIgnoreRequest, this, "handleRadiusIgnoreRequestPacket");
packetReceiver.registerListener(PacketType::RequestsDomainListData, this, "handleRequestsDomainListDataPacket");
packetReceiver.registerListenerForTypes({
PacketType::ReplicatedAvatarIdentity,
PacketType::ReplicatedKillAvatar
}, this, "handleReplicatedPacket");
packetReceiver.registerListener(PacketType::ReplicatedBulkAvatarData, this, "handleReplicatedBulkAvatarPacket");
auto nodeList = DependencyManager::get<NodeList>();
connect(nodeList.data(), &NodeList::packetVersionMismatch, this, &AvatarMixer::handlePacketVersionMismatch);
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
if (node->getType() == NodeType::DownstreamAvatarMixer) {
getOrCreateClientData(node);
node->activatePublicSocket();
}
});
}
SharedNodePointer addOrUpdateReplicatedNode(const QUuid& nodeID, const HifiSockAddr& senderSockAddr) {
auto replicatedNode = DependencyManager::get<NodeList>()->addOrUpdateNode(nodeID, NodeType::Agent,
senderSockAddr,
senderSockAddr,
true, true);
replicatedNode->setLastHeardMicrostamp(usecTimestampNow());
return replicatedNode;
}
void AvatarMixer::handleReplicatedPacket(QSharedPointer<ReceivedMessage> message) {
auto nodeList = DependencyManager::get<NodeList>();
auto nodeID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID));
auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr());
if (message->getType() == PacketType::ReplicatedAvatarIdentity) {
handleAvatarIdentityPacket(message, replicatedNode);
} else if (message->getType() == PacketType::ReplicatedKillAvatar) {
handleKillAvatarPacket(message, replicatedNode);
}
}
void AvatarMixer::handleReplicatedBulkAvatarPacket(QSharedPointer<ReceivedMessage> message) {
while (message->getBytesLeftToRead()) {
// first, grab the node ID for this replicated avatar
auto nodeID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
// make sure we have an upstream replicated node that matches
auto replicatedNode = addOrUpdateReplicatedNode(nodeID, message->getSenderSockAddr());
// grab the size of the avatar byte array so we know how much to read
quint16 avatarByteArraySize;
message->readPrimitive(&avatarByteArraySize);
// read the avatar byte array
auto avatarByteArray = message->read(avatarByteArraySize);
// construct a "fake" avatar data received message from the byte array and packet list information
auto replicatedMessage = QSharedPointer<ReceivedMessage>::create(avatarByteArray, PacketType::AvatarData,
versionForPacketType(PacketType::AvatarData),
message->getSenderSockAddr(), nodeID);
// queue up the replicated avatar data with the client data for the replicated node
auto start = usecTimestampNow();
getOrCreateClientData(replicatedNode)->queuePacket(replicatedMessage, replicatedNode);
auto end = usecTimestampNow();
_queueIncomingPacketElapsedTime += (end - start);
}
}
void AvatarMixer::optionallyReplicatePacket(ReceivedMessage& message, const Node& node) {
// first, make sure that this is a packet from a node we are supposed to replicate
if (node.isReplicated()) {
// check if this is a packet type we replicate
// which means it must be a packet type present in REPLICATED_PACKET_MAPPING or must be the
// replicated version of one of those packet types
PacketType replicatedType = REPLICATED_PACKET_MAPPING.value(message.getType());
if (replicatedType == PacketType::Unknown) {
if (REPLICATED_PACKET_MAPPING.key(message.getType()) != PacketType::Unknown) {
replicatedType = message.getType();
} else {
qDebug() << __FUNCTION__ << "called without replicatable packet type - returning";
return;
}
}
std::unique_ptr<NLPacket> packet;
auto nodeList = DependencyManager::get<NodeList>();
nodeList->eachMatchingNode([&](const SharedNodePointer& node) {
return node->getType() == NodeType::DownstreamAvatarMixer;
}, [&](const SharedNodePointer& node) {
if (!packet) {
// construct an NLPacket to send to the replicant that has the contents of the received packet
packet = NLPacket::create(replicatedType, message.getSize());
packet->write(message.getMessage());
}
nodeList->sendUnreliablePacket(*packet, *node);
});
}
}
void AvatarMixer::queueIncomingPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
@ -70,12 +170,14 @@ AvatarMixer::~AvatarMixer() {
}
void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
QByteArray individualData = nodeData->getAvatar().identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122());
auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true);
identityPackets->write(individualData);
DependencyManager::get<NodeList>()->sendPacketList(std::move(identityPackets), *destinationNode);
++_sumIdentityPackets;
if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) {
QByteArray individualData = nodeData->getAvatar().identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122());
auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true);
identityPackets->write(individualData);
DependencyManager::get<NodeList>()->sendPacketList(std::move(identityPackets), *destinationNode);
++_sumIdentityPackets;
}
}
std::chrono::microseconds AvatarMixer::timeFrame(p_high_resolution_clock::time_point& timestamp) {
@ -132,7 +234,10 @@ void AvatarMixer::start() {
auto start = usecTimestampNow();
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
std::for_each(cbegin, cend, [&](const SharedNodePointer& node) {
manageIdentityData(node);
if (node->getType() == NodeType::Agent && !node->isUpstream()) {
manageIdentityData(node);
}
++_sumListeners;
});
}, &lockWait, &nodeTransform, &functor);
@ -310,13 +415,38 @@ void AvatarMixer::nodeKilled(SharedNodePointer killedNode) {
}
}
std::unique_ptr<NLPacket> killPacket;
std::unique_ptr<NLPacket> replicatedKillPacket;
// this was an avatar we were sending to other people
// send a kill packet for it to our other nodes
auto killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason));
killPacket->write(killedNode->getUUID().toRfc4122());
killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected);
nodeList->eachMatchingNode([&](const SharedNodePointer& node) {
// we relay avatar kill packets to agents that are not upstream
// and downstream avatar mixers, if the node that was just killed was being replicated
return (node->getType() == NodeType::Agent && !node->isUpstream())
|| (killedNode->isReplicated() && node->getType() == NodeType::DownstreamAvatarMixer);
}, [&](const SharedNodePointer& node) {
if (node->getType() == NodeType::Agent) {
if (!killPacket) {
killPacket = NLPacket::create(PacketType::KillAvatar, NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason));
killPacket->write(killedNode->getUUID().toRfc4122());
killPacket->writePrimitive(KillAvatarReason::AvatarDisconnected);
}
nodeList->sendUnreliablePacket(*killPacket, *node);
} else {
// send a replicated kill packet to the downstream avatar mixer
if (!replicatedKillPacket) {
replicatedKillPacket = NLPacket::create(PacketType::ReplicatedKillAvatar,
NUM_BYTES_RFC4122_UUID + sizeof(KillAvatarReason));
replicatedKillPacket->write(killedNode->getUUID().toRfc4122());
replicatedKillPacket->writePrimitive(KillAvatarReason::AvatarDisconnected);
}
nodeList->sendUnreliablePacket(*replicatedKillPacket, *node);
}
});
nodeList->broadcastToNodes(std::move(killPacket), NodeSet() << NodeType::Agent);
// we also want to remove sequence number data for this avatar on our other avatars
// so invoke the appropriate method on the AvatarMixerClientData for other avatars
@ -429,12 +559,11 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer<ReceivedMessage> mes
AvatarData& avatar = nodeData->getAvatar();
// parse the identity packet and update the change timestamp if appropriate
AvatarData::Identity identity;
AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity);
bool identityChanged = false;
bool displayNameChanged = false;
bool skeletonModelUrlChanged = false;
avatar.processAvatarIdentity(identity, identityChanged, displayNameChanged, skeletonModelUrlChanged);
avatar.processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged, skeletonModelUrlChanged);
if (identityChanged) {
QMutexLocker nodeDataLocker(&nodeData->getMutex());
nodeData->flagIdentityChange();
@ -451,11 +580,13 @@ void AvatarMixer::handleAvatarIdentityPacket(QSharedPointer<ReceivedMessage> mes
_handleAvatarIdentityPacketElapsedTime += (end - start);
}
void AvatarMixer::handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message) {
void AvatarMixer::handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
auto start = usecTimestampNow();
DependencyManager::get<NodeList>()->processKillNode(*message);
auto end = usecTimestampNow();
_handleKillAvatarPacketElapsedTime += (end - start);
optionallyReplicatePacket(*message, *node);
}
void AvatarMixer::handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode) {
@ -707,7 +838,6 @@ void AvatarMixer::run() {
connect(&domainHandler, &DomainHandler::settingsReceiveFail, this, &AvatarMixer::domainSettingsRequestFailed);
ThreadedAssignment::commonInit(AVATAR_MIXER_LOGGING_NAME, NodeType::AvatarMixer);
}
AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node) {
@ -726,7 +856,7 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node
void AvatarMixer::domainSettingsRequestComplete() {
auto nodeList = DependencyManager::get<NodeList>();
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::EntityScriptServer });
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer });
// parse the settings to pull out the values we need
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
@ -803,12 +933,13 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
static const QString AVATAR_WHITELIST_OPTION = "avatar_whitelist";
_avatarWhitelist = domainSettings[AVATARS_SETTINGS_KEY].toObject()[AVATAR_WHITELIST_OPTION].toString(AVATAR_WHITELIST_DEFAULT).split(',', QString::KeepEmptyParts);
static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar";
static const QString REPLACEMENT_AVATAR_OPTION = "replacement_avatar";
_replacementAvatar = domainSettings[AVATARS_SETTINGS_KEY].toObject()[REPLACEMENT_AVATAR_OPTION].toString(REPLACEMENT_AVATAR_DEFAULT);
if ((_avatarWhitelist.count() == 1) && _avatarWhitelist[0].isEmpty()) {
_avatarWhitelist.clear(); // KeepEmptyParts above will parse "," as ["", ""] (which is ok), but "" as [""] (which is not ok).
}
if (_avatarWhitelist.isEmpty()) {
qCDebug(avatars) << "All avatars are allowed.";
} else {

View file

@ -42,10 +42,12 @@ private slots:
void handleAdjustAvatarSorting(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleViewFrustumPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleAvatarIdentityPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message);
void handleKillAvatarPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleNodeIgnoreRequestPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleRadiusIgnoreRequestPacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer sendingNode);
void handleRequestsDomainListDataPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer senderNode);
void handleReplicatedPacket(QSharedPointer<ReceivedMessage> message);
void handleReplicatedBulkAvatarPacket(QSharedPointer<ReceivedMessage> message);
void domainSettingsRequestComplete();
void handlePacketVersionMismatch(PacketType type, const HifiSockAddr& senderSockAddr, const QUuid& senderUUID);
void start();
@ -66,6 +68,8 @@ private:
QStringList _avatarWhitelist { };
QString _replacementAvatar { REPLACEMENT_AVATAR_DEFAULT };
void optionallyReplicatePacket(ReceivedMessage& message, const Node& node);
p_high_resolution_clock::time_point _lastFrameTimestamp;
// FIXME - new throttling - use these values somehow

View file

@ -65,15 +65,32 @@ void AvatarMixerSlave::processIncomingPackets(const SharedNodePointer& node) {
_stats.processIncomingPacketsElapsedTime += (end - start);
}
int AvatarMixerSlave::sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true);
identityPackets->write(individualData);
DependencyManager::get<NodeList>()->sendPacketList(std::move(identityPackets), *destinationNode);
_stats.numIdentityPackets++;
return individualData.size();
if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
auto identityPackets = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true);
identityPackets->write(individualData);
DependencyManager::get<NodeList>()->sendPacketList(std::move(identityPackets), *destinationNode);
_stats.numIdentityPackets++;
return individualData.size();
} else {
return 0;
}
}
int AvatarMixerSlave::sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) {
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
auto identityPacket = NLPacket::create(PacketType::ReplicatedAvatarIdentity);
identityPacket->write(individualData);
DependencyManager::get<NodeList>()->sendUnreliablePacket(*identityPacket, *destinationNode);
_stats.numIdentityPackets++;
return individualData.size();
} else {
return 0;
}
}
static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
@ -81,6 +98,18 @@ static const int AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND = 45;
void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
quint64 start = usecTimestampNow();
if (node->getType() == NodeType::Agent && node->getLinkedData() && node->getActiveSocket() && !node->isUpstream()) {
broadcastAvatarDataToAgent(node);
} else if (node->getType() == NodeType::DownstreamAvatarMixer) {
broadcastAvatarDataToDownstreamMixer(node);
}
quint64 end = usecTimestampNow();
_stats.jobElapsedTime += (end - start);
}
void AvatarMixerSlave::broadcastAvatarDataToAgent(const SharedNodePointer& node) {
auto nodeList = DependencyManager::get<NodeList>();
// setup for distributed random floating point values
@ -88,308 +117,432 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
std::mt19937 generator(randomDevice());
std::uniform_real_distribution<float> distribution;
if (node->getLinkedData() && (node->getType() == NodeType::Agent) && node->getActiveSocket()) {
_stats.nodesBroadcastedTo++;
_stats.nodesBroadcastedTo++;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
nodeData->resetInViewStats();
nodeData->resetInViewStats();
const AvatarData& avatar = nodeData->getAvatar();
glm::vec3 myPosition = avatar.getClientGlobalPosition();
const AvatarData& avatar = nodeData->getAvatar();
glm::vec3 myPosition = avatar.getClientGlobalPosition();
// reset the internal state for correct random number distribution
distribution.reset();
// reset the internal state for correct random number distribution
distribution.reset();
// reset the number of sent avatars
nodeData->resetNumAvatarsSentLastFrame();
// reset the number of sent avatars
nodeData->resetNumAvatarsSentLastFrame();
// keep a counter of the number of considered avatars
int numOtherAvatars = 0;
// keep a counter of the number of considered avatars
int numOtherAvatars = 0;
// keep track of outbound data rate specifically for avatar data
int numAvatarDataBytes = 0;
int identityBytesSent = 0;
// keep track of outbound data rate specifically for avatar data
int numAvatarDataBytes = 0;
int identityBytesSent = 0;
// max number of avatarBytes per frame
auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND;
// max number of avatarBytes per frame
auto maxAvatarBytesPerFrame = (_maxKbpsPerNode * BYTES_PER_KILOBIT) / AVATAR_MIXER_BROADCAST_FRAMES_PER_SECOND;
// FIXME - find a way to not send the sessionID for every avatar
int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID;
// FIXME - find a way to not send the sessionID for every avatar
int minimumBytesPerAvatar = AvatarDataPacket::AVATAR_HAS_FLAGS_SIZE + NUM_BYTES_RFC4122_UUID;
int overBudgetAvatars = 0;
int overBudgetAvatars = 0;
// keep track of the number of other avatars held back in this frame
int numAvatarsHeldBack = 0;
// keep track of the number of other avatars held back in this frame
int numAvatarsHeldBack = 0;
// keep track of the number of other avatar frames skipped
int numAvatarsWithSkippedFrames = 0;
// keep track of the number of other avatar frames skipped
int numAvatarsWithSkippedFrames = 0;
// When this is true, the AvatarMixer will send Avatar data to a client
// about avatars they've ignored or that are out of view
bool PALIsOpen = nodeData->getRequestsDomainListData();
// When this is true, the AvatarMixer will send Avatar data to a client
// about avatars they've ignored or that are out of view
bool PALIsOpen = nodeData->getRequestsDomainListData();
// When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them
bool getsAnyIgnored = PALIsOpen && node->getCanKick();
// When this is true, the AvatarMixer will send Avatar data to a client about avatars that have ignored them
bool getsAnyIgnored = PALIsOpen && node->getCanKick();
if (PALIsOpen) {
// Increase minimumBytesPerAvatar if the PAL is open
minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) +
sizeof(AvatarDataPacket::AudioLoudness);
}
if (PALIsOpen) {
// Increase minimumBytesPerAvatar if the PAL is open
minimumBytesPerAvatar += sizeof(AvatarDataPacket::AvatarGlobalPosition) +
sizeof(AvatarDataPacket::AudioLoudness);
}
// setup a PacketList for the avatarPackets
auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData);
// setup a PacketList for the avatarPackets
auto avatarPacketList = NLPacketList::create(PacketType::BulkAvatarData);
// Define the minimum bubble size
static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f);
// Define the scale of the box for the current node
glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Set up the bounding box for the current node
AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) {
nodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
nodeBox.embiggen(4.0f);
// Define the minimum bubble size
static const glm::vec3 minBubbleSize = glm::vec3(0.3f, 1.3f, 0.3f);
// Define the scale of the box for the current node
glm::vec3 nodeBoxScale = (nodeData->getPosition() - nodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Set up the bounding box for the current node
AABox nodeBox(nodeData->getGlobalBoundingBoxCorner(), nodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(nodeBoxScale, minBubbleSize))) {
nodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
nodeBox.embiggen(4.0f);
// setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes
// for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes
QList<AvatarSharedPointer> avatarList;
std::unordered_map<AvatarSharedPointer, SharedNodePointer> avatarDataToNodes;
// setup list of AvatarData as well as maps to map betweeen the AvatarData and the original nodes
// for calling the AvatarData::sortAvatars() function and getting our sorted list of client nodes
QList<AvatarSharedPointer> avatarList;
std::unordered_map<AvatarSharedPointer, SharedNodePointer> avatarDataToNodes;
std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) {
std::for_each(_begin, _end, [&](const SharedNodePointer& otherNode) {
// make sure this is an agent that we have avatar data for before considering it for inclusion
if (otherNode->getType() == NodeType::Agent
&& otherNode->getLinkedData()) {
const AvatarMixerClientData* otherNodeData = reinterpret_cast<const AvatarMixerClientData*>(otherNode->getLinkedData());
// theoretically it's possible for a Node to be in the NodeList (and therefore end up here),
// but not have yet sent data that's linked to the node. Check for that case and don't
// consider those nodes.
if (otherNodeData) {
AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer();
avatarList << otherAvatar;
avatarDataToNodes[otherAvatar] = otherNode;
AvatarSharedPointer otherAvatar = otherNodeData->getAvatarSharedPointer();
avatarList << otherAvatar;
avatarDataToNodes[otherAvatar] = otherNode;
}
});
AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer();
ViewFrustum cameraView = nodeData->getViewFrustom();
std::priority_queue<AvatarPriority> sortedAvatars;
AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars,
[&](AvatarSharedPointer avatar)->uint64_t {
auto avatarNode = avatarDataToNodes[avatar];
assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map
return nodeData->getLastBroadcastTime(avatarNode->getUUID());
}, [&](AvatarSharedPointer avatar)->float{
glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner());
return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z));
}, [&](AvatarSharedPointer avatar)->bool {
if (avatar == thisAvatar) {
return true; // ignore ourselves...
}
bool shouldIgnore = false;
// We will also ignore other nodes for a couple of different reasons:
// 1) ignore bubbles and ignore specific node
// 2) the node hasn't really updated it's frame data recently, this can
// happen if for example the avatar is connected on a desktop and sending
// updates at ~30hz. So every 3 frames we skip a frame.
auto avatarNode = avatarDataToNodes[avatar];
assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map
const AvatarMixerClientData* avatarNodeData = reinterpret_cast<const AvatarMixerClientData*>(avatarNode->getLinkedData());
assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data
quint64 startIgnoreCalculation = usecTimestampNow();
// make sure we have data for this avatar, that it isn't the same node,
// and isn't an avatar that the viewing node has ignored
// or that has ignored the viewing node
if (!avatarNode->getLinkedData()
|| avatarNode->getUUID() == node->getUUID()
|| (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen)
|| (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) {
shouldIgnore = true;
} else {
// Check to see if the space bubble is enabled
// Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored
if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) {
// Define the scale of the box for the current other node
glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Set up the bounding box for the current other node
AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) {
otherNodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
otherNodeBox.embiggen(4.0f);
// Perform the collision check between the two bounding boxes
if (nodeBox.touches(otherNodeBox)) {
nodeData->ignoreOther(node, avatarNode);
shouldIgnore = !getsAnyIgnored;
}
}
});
// Not close enough to ignore
if (!shouldIgnore) {
nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID());
}
}
quint64 endIgnoreCalculation = usecTimestampNow();
_stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation);
AvatarSharedPointer thisAvatar = nodeData->getAvatarSharedPointer();
ViewFrustum cameraView = nodeData->getViewFrustom();
std::priority_queue<AvatarPriority> sortedAvatars;
AvatarData::sortAvatars(avatarList, cameraView, sortedAvatars,
if (!shouldIgnore) {
AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID());
AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber();
[&](AvatarSharedPointer avatar)->uint64_t{
auto avatarNode = avatarDataToNodes[avatar];
assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map
return nodeData->getLastBroadcastTime(avatarNode->getUUID());
},
// FIXME - This code does appear to be working. But it seems brittle.
// It supports determining if the frame of data for this "other"
// avatar has already been sent to the reciever. This has been
// verified to work on a desktop display that renders at 60hz and
// therefore sends to mixer at 30hz. Each second you'd expect to
// have 15 (45hz-30hz) duplicate frames. In this case, the stat
// avg_other_av_skips_per_second does report 15.
//
// make sure we haven't already sent this data from this sender to this receiver
// or that somehow we haven't sent
if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) {
++numAvatarsHeldBack;
shouldIgnore = true;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
++numAvatarsWithSkippedFrames;
}
}
return shouldIgnore;
});
[&](AvatarSharedPointer avatar)->float{
glm::vec3 nodeBoxHalfScale = (avatar->getPosition() - avatar->getGlobalBoundingBoxCorner());
return glm::max(nodeBoxHalfScale.x, glm::max(nodeBoxHalfScale.y, nodeBoxHalfScale.z));
},
// loop through our sorted avatars and allocate our bandwidth to them accordingly
int avatarRank = 0;
[&](AvatarSharedPointer avatar)->bool{
if (avatar == thisAvatar) {
return true; // ignore ourselves...
}
// this is overly conservative, because it includes some avatars we might not consider
int remainingAvatars = (int)sortedAvatars.size();
bool shouldIgnore = false;
while (!sortedAvatars.empty()) {
AvatarPriority sortData = sortedAvatars.top();
sortedAvatars.pop();
const auto& avatarData = sortData.avatar;
avatarRank++;
remainingAvatars--;
// We will also ignore other nodes for a couple of different reasons:
// 1) ignore bubbles and ignore specific node
// 2) the node hasn't really updated it's frame data recently, this can
// happen if for example the avatar is connected on a desktop and sending
// updates at ~30hz. So every 3 frames we skip a frame.
auto avatarNode = avatarDataToNodes[avatar];
auto otherNode = avatarDataToNodes[avatarData];
assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map
assert(avatarNode); // we can't have gotten here without the avatarData being a valid key in the map
// NOTE: Here's where we determine if we are over budget and drop to bare minimum data
int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars;
bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame;
const AvatarMixerClientData* avatarNodeData = reinterpret_cast<const AvatarMixerClientData*>(avatarNode->getLinkedData());
assert(avatarNodeData); // we can't have gotten here without avatarNode having valid data
quint64 startIgnoreCalculation = usecTimestampNow();
quint64 startAvatarDataPacking = usecTimestampNow();
// make sure we have data for this avatar, that it isn't the same node,
// and isn't an avatar that the viewing node has ignored
// or that has ignored the viewing node
if (!avatarNode->getLinkedData()
|| avatarNode->getUUID() == node->getUUID()
|| (node->isIgnoringNodeWithID(avatarNode->getUUID()) && !PALIsOpen)
|| (avatarNode->isIgnoringNodeWithID(node->getUUID()) && !getsAnyIgnored)) {
shouldIgnore = true;
} else {
++numOtherAvatars;
// Check to see if the space bubble is enabled
// Don't bother with these checks if the other avatar has their bubble enabled and we're gettingAnyIgnored
if (node->isIgnoreRadiusEnabled() || (avatarNode->isIgnoreRadiusEnabled() && !getsAnyIgnored)) {
const AvatarMixerClientData* otherNodeData = reinterpret_cast<const AvatarMixerClientData*>(otherNode->getLinkedData());
// Define the scale of the box for the current other node
glm::vec3 otherNodeBoxScale = (avatarNodeData->getPosition() - avatarNodeData->getGlobalBoundingBoxCorner()) * 2.0f;
// Set up the bounding box for the current other node
AABox otherNodeBox(avatarNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
// Clamp the size of the bounding box to a minimum scale
if (glm::any(glm::lessThan(otherNodeBoxScale, minBubbleSize))) {
otherNodeBox.setScaleStayCentered(minBubbleSize);
}
// Quadruple the scale of both bounding boxes
otherNodeBox.embiggen(4.0f);
// If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO
// the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A.
if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) {
identityBytesSent += sendIdentityPacket(otherNodeData, node);
}
// Perform the collision check between the two bounding boxes
if (nodeBox.touches(otherNodeBox)) {
nodeData->ignoreOther(node, avatarNode);
shouldIgnore = !getsAnyIgnored;
}
}
// Not close enough to ignore
if (!shouldIgnore) {
nodeData->removeFromRadiusIgnoringSet(node, avatarNode->getUUID());
}
}
quint64 endIgnoreCalculation = usecTimestampNow();
_stats.ignoreCalculationElapsedTime += (endIgnoreCalculation - startIgnoreCalculation);
const AvatarData* otherAvatar = otherNodeData->getConstAvatarData();
glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition();
if (!shouldIgnore) {
AvatarDataSequenceNumber lastSeqToReceiver = nodeData->getLastBroadcastSequenceNumber(avatarNode->getUUID());
AvatarDataSequenceNumber lastSeqFromSender = avatarNodeData->getLastReceivedSequenceNumber();
// determine if avatar is in view, to determine how much data to include...
glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f;
AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
bool isInView = nodeData->otherAvatarInView(otherNodeBox);
// FIXME - This code does appear to be working. But it seems brittle.
// It supports determining if the frame of data for this "other"
// avatar has already been sent to the reciever. This has been
// verified to work on a desktop display that renders at 60hz and
// therefore sends to mixer at 30hz. Each second you'd expect to
// have 15 (45hz-30hz) duplicate frames. In this case, the stat
// avg_other_av_skips_per_second does report 15.
//
// make sure we haven't already sent this data from this sender to this receiver
// or that somehow we haven't sent
if (lastSeqToReceiver == lastSeqFromSender && lastSeqToReceiver != 0) {
++numAvatarsHeldBack;
shouldIgnore = true;
} else if (lastSeqFromSender - lastSeqToReceiver > 1) {
// this is a skip - we still send the packet but capture the presence of the skip so we see it happening
++numAvatarsWithSkippedFrames;
}
}
return shouldIgnore;
});
// start a new segment in the PacketList for this avatar
avatarPacketList->startSegment();
// loop through our sorted avatars and allocate our bandwidth to them accordingly
int avatarRank = 0;
AvatarData::AvatarDataDetail detail;
// this is overly conservative, because it includes some avatars we might not consider
int remainingAvatars = (int)sortedAvatars.size();
if (overBudget) {
overBudgetAvatars++;
_stats.overBudgetAvatars++;
detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData;
} else if (!isInView) {
detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData;
nodeData->incrementAvatarOutOfView();
} else {
detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO
? AvatarData::SendAllData : AvatarData::CullSmallData;
nodeData->incrementAvatarInView();
}
while (!sortedAvatars.empty()) {
AvatarPriority sortData = sortedAvatars.top();
sortedAvatars.pop();
const auto& avatarData = sortData.avatar;
avatarRank++;
remainingAvatars--;
bool includeThisAvatar = true;
auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID());
QVector<JointData>& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID());
bool distanceAdjust = true;
glm::vec3 viewerPosition = myPosition;
AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray
bool dropFaceTracking = false;
auto otherNode = avatarDataToNodes[avatarData];
assert(otherNode); // we can't have gotten here without the avatarData being a valid key in the map
quint64 start = usecTimestampNow();
QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
quint64 end = usecTimestampNow();
_stats.toByteArrayElapsedTime += (end - start);
// NOTE: Here's where we determine if we are over budget and drop to bare minimum data
int minimRemainingAvatarBytes = minimumBytesPerAvatar * remainingAvatars;
bool overBudget = (identityBytesSent + numAvatarDataBytes + minimRemainingAvatarBytes) > maxAvatarBytesPerFrame;
static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data";
dropFaceTracking = true; // first try dropping the facial data
bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData";
bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
}
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!";
includeThisAvatar = false;
}
}
if (includeThisAvatar) {
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
numAvatarDataBytes += avatarPacketList->write(bytes);
if (detail != AvatarData::NoData) {
_stats.numOthersIncluded++;
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
// set the last sent sequence number for this sender on the receiver
nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(),
otherNodeData->getLastReceivedSequenceNumber());
// remember the last time we sent details about this other node to the receiver
nodeData->setLastBroadcastTime(otherNode->getUUID(), start);
}
}
avatarPacketList->endSegment();
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
};
quint64 startPacketSending = usecTimestampNow();
// close the current packet so that we're always sending something
avatarPacketList->closeCurrentPacket(true);
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
// send the avatar data PacketList
nodeList->sendPacketList(std::move(avatarPacketList), *node);
// record the bytes sent for other avatar data in the AvatarMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
// record the number of avatars held back this frame
nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack);
nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames);
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
}
uint64_t REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US = 5 * 1000 * 1000;
void AvatarMixerSlave::broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node) {
_stats.downstreamMixersBroadcastedTo++;
AvatarMixerClientData* nodeData = reinterpret_cast<AvatarMixerClientData*>(node->getLinkedData());
if (!nodeData) {
return;
}
// setup a PacketList for the replicated bulk avatar data
auto avatarPacketList = NLPacketList::create(PacketType::ReplicatedBulkAvatarData);
int numAvatarDataBytes = 0;
// reset the number of sent avatars
nodeData->resetNumAvatarsSentLastFrame();
std::for_each(_begin, _end, [&](const SharedNodePointer& agentNode) {
// collect agents that we have avatar data for that we are supposed to replicate
if (agentNode->getType() == NodeType::Agent && agentNode->getLinkedData() && agentNode->isReplicated()) {
const AvatarMixerClientData* agentNodeData = reinterpret_cast<const AvatarMixerClientData*>(agentNode->getLinkedData());
AvatarSharedPointer otherAvatar = agentNodeData->getAvatarSharedPointer();
quint64 startAvatarDataPacking = usecTimestampNow();
++numOtherAvatars;
const AvatarMixerClientData* otherNodeData = reinterpret_cast<const AvatarMixerClientData*>(otherNode->getLinkedData());
// If the time that the mixer sent AVATAR DATA about Avatar B to Avatar A is BEFORE OR EQUAL TO
// the time that Avatar B flagged an IDENTITY DATA change, send IDENTITY DATA about Avatar B to Avatar A.
if (nodeData->getLastBroadcastTime(otherNode->getUUID()) <= otherNodeData->getIdentityChangeTimestamp()) {
identityBytesSent += sendIdentityPacket(otherNodeData, node);
}
const AvatarData* otherAvatar = otherNodeData->getConstAvatarData();
glm::vec3 otherPosition = otherAvatar->getClientGlobalPosition();
// determine if avatar is in view, to determine how much data to include...
glm::vec3 otherNodeBoxScale = (otherPosition - otherNodeData->getGlobalBoundingBoxCorner()) * 2.0f;
AABox otherNodeBox(otherNodeData->getGlobalBoundingBoxCorner(), otherNodeBoxScale);
bool isInView = nodeData->otherAvatarInView(otherNodeBox);
// start a new segment in the PacketList for this avatar
avatarPacketList->startSegment();
AvatarData::AvatarDataDetail detail;
if (overBudget) {
overBudgetAvatars++;
_stats.overBudgetAvatars++;
detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::NoData;
} else if (!isInView) {
detail = PALIsOpen ? AvatarData::PALMinimum : AvatarData::MinimumData;
nodeData->incrementAvatarOutOfView();
} else {
detail = distribution(generator) < AVATAR_SEND_FULL_UPDATE_RATIO
? AvatarData::SendAllData : AvatarData::CullSmallData;
nodeData->incrementAvatarInView();
}
bool includeThisAvatar = true;
auto lastEncodeForOther = nodeData->getLastOtherAvatarEncodeTime(otherNode->getUUID());
QVector<JointData>& lastSentJointsForOther = nodeData->getLastOtherAvatarSentJoints(otherNode->getUUID());
bool distanceAdjust = true;
glm::vec3 viewerPosition = myPosition;
AvatarDataPacket::HasFlags hasFlagsOut; // the result of the toByteArray
bool dropFaceTracking = false;
// we cannot send a downstream avatar mixer any updates that expect them to have previous state for this avatar
// since we have no idea if they're online and receiving our packets
// so we always send a full update for this avatar
quint64 start = usecTimestampNow();
QByteArray bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
AvatarDataPacket::HasFlags flagsOut;
QVector<JointData> emptyLastJointSendData { otherAvatar->getJointCount() };
QByteArray avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData,
flagsOut, false, false, glm::vec3(0), nullptr);
quint64 end = usecTimestampNow();
_stats.toByteArrayElapsedTime += (end - start);
static const int MAX_ALLOWED_AVATAR_DATA = (1400 - NUM_BYTES_RFC4122_UUID);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() resulted in very large buffer:" << bytes.size() << "... attempt to drop facial data";
auto lastBroadcastTime = nodeData->getLastBroadcastTime(agentNode->getUUID());
if (lastBroadcastTime <= agentNodeData->getIdentityChangeTimestamp()
|| (start - lastBroadcastTime) >= REBROADCAST_IDENTITY_TO_DOWNSTREAM_EVERY_US) {
sendReplicatedIdentityPacket(agentNodeData, node);
nodeData->setLastBroadcastTime(agentNode->getUUID(), start);
}
dropFaceTracking = true; // first try dropping the facial data
bytes = otherAvatar->toByteArray(detail, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
// figure out how large our avatar byte array can be to fit in the packet list
// given that we need it and the avatar UUID and the size of the byte array (16 bit)
// to fit in a segment of the packet list
auto maxAvatarByteArraySize = avatarPacketList->getMaxSegmentSize();
maxAvatarByteArraySize -= NUM_BYTES_RFC4122_UUID;
maxAvatarByteArraySize -= sizeof(quint16);
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() without facial data resulted in very large buffer:" << bytes.size() << "... reduce to MinimumData";
bytes = otherAvatar->toByteArray(AvatarData::MinimumData, lastEncodeForOther, lastSentJointsForOther,
hasFlagsOut, dropFaceTracking, distanceAdjust, viewerPosition, &lastSentJointsForOther);
}
auto sequenceNumberSize = sizeof(agentNodeData->getLastReceivedSequenceNumber());
maxAvatarByteArraySize -= sequenceNumberSize;
if (bytes.size() > MAX_ALLOWED_AVATAR_DATA) {
qCWarning(avatars) << "otherAvatar.toByteArray() MinimumData resulted in very large buffer:" << bytes.size() << "... FAIL!!";
includeThisAvatar = false;
if (avatarByteArray.size() > maxAvatarByteArraySize) {
qCWarning(avatars) << "Replicated avatar data too large for" << otherAvatar->getSessionUUID()
<< "-" << avatarByteArray.size() << "bytes";
avatarByteArray = otherAvatar->toByteArray(AvatarData::SendAllData, 0, emptyLastJointSendData,
flagsOut, true, false, glm::vec3(0), nullptr);
if (avatarByteArray.size() > maxAvatarByteArraySize) {
qCWarning(avatars) << "Replicated avatar data without facial data still too large for"
<< otherAvatar->getSessionUUID() << "-" << avatarByteArray.size() << "bytes";
avatarByteArray = otherAvatar->toByteArray(AvatarData::MinimumData, 0, emptyLastJointSendData,
flagsOut, true, false, glm::vec3(0), nullptr);
}
}
if (includeThisAvatar) {
numAvatarDataBytes += avatarPacketList->write(otherNode->getUUID().toRfc4122());
numAvatarDataBytes += avatarPacketList->write(bytes);
if (avatarByteArray.size() <= maxAvatarByteArraySize) {
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
if (detail != AvatarData::NoData) {
_stats.numOthersIncluded++;
// set the last sent sequence number for this sender on the receiver
nodeData->setLastBroadcastSequenceNumber(agentNode->getUUID(),
agentNodeData->getLastReceivedSequenceNumber());
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
// increment the number of avatars sent to this reciever
nodeData->incrementNumAvatarsSentLastFrame();
// set the last sent sequence number for this sender on the receiver
nodeData->setLastBroadcastSequenceNumber(otherNode->getUUID(),
otherNodeData->getLastReceivedSequenceNumber());
// start a new segment in the packet list for this avatar
avatarPacketList->startSegment();
// remember the last time we sent details about this other node to the receiver
nodeData->setLastBroadcastTime(otherNode->getUUID(), start);
}
// write the node's UUID, the size of the replicated avatar data,
// the sequence number of the replicated avatar data, and the replicated avatar data
numAvatarDataBytes += avatarPacketList->write(agentNode->getUUID().toRfc4122());
numAvatarDataBytes += avatarPacketList->writePrimitive((quint16) (avatarByteArray.size() + sequenceNumberSize));
numAvatarDataBytes += avatarPacketList->writePrimitive(agentNodeData->getLastReceivedSequenceNumber());
numAvatarDataBytes += avatarPacketList->write(avatarByteArray);
avatarPacketList->endSegment();
} else {
qCWarning(avatars) << "Could not fit minimum data avatar for" << otherAvatar->getSessionUUID()
<< "to packet list -" << avatarByteArray.size() << "bytes";
}
avatarPacketList->endSegment();
quint64 endAvatarDataPacking = usecTimestampNow();
_stats.avatarDataPackingElapsedTime += (endAvatarDataPacking - startAvatarDataPacking);
};
}
});
if (avatarPacketList->getNumPackets() > 0) {
quint64 startPacketSending = usecTimestampNow();
// close the current packet so that we're always sending something
@ -398,21 +551,15 @@ void AvatarMixerSlave::broadcastAvatarData(const SharedNodePointer& node) {
_stats.numPacketsSent += (int)avatarPacketList->getNumPackets();
_stats.numBytesSent += numAvatarDataBytes;
// send the avatar data PacketList
nodeList->sendPacketList(std::move(avatarPacketList), *node);
// send the replicated bulk avatar data
auto nodeList = DependencyManager::get<NodeList>();
nodeList->sendPacketList(std::move(avatarPacketList), node->getPublicSocket());
// record the bytes sent for other avatar data in the AvatarMixerClientData
nodeData->recordSentAvatarData(numAvatarDataBytes);
// record the number of avatars held back this frame
nodeData->recordNumOtherAvatarStarves(numAvatarsHeldBack);
nodeData->recordNumOtherAvatarSkips(numAvatarsWithSkippedFrames);
quint64 endPacketSending = usecTimestampNow();
_stats.packetSendingElapsedTime += (endPacketSending - startPacketSending);
}
quint64 end = usecTimestampNow();
_stats.jobElapsedTime += (end - start);
}

View file

@ -21,6 +21,7 @@ public:
quint64 processIncomingPacketsElapsedTime { 0 };
int nodesBroadcastedTo { 0 };
int downstreamMixersBroadcastedTo { 0 };
int numPacketsSent { 0 };
int numBytesSent { 0 };
int numIdentityPackets { 0 };
@ -41,6 +42,7 @@ public:
// sending job stats
nodesBroadcastedTo = 0;
downstreamMixersBroadcastedTo = 0;
numPacketsSent = 0;
numBytesSent = 0;
numIdentityPackets = 0;
@ -60,6 +62,7 @@ public:
processIncomingPacketsElapsedTime += rhs.processIncomingPacketsElapsedTime;
nodesBroadcastedTo += rhs.nodesBroadcastedTo;
downstreamMixersBroadcastedTo += rhs.downstreamMixersBroadcastedTo;
numPacketsSent += rhs.numPacketsSent;
numBytesSent += rhs.numBytesSent;
numIdentityPackets += rhs.numIdentityPackets;
@ -92,6 +95,10 @@ public:
private:
int sendIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode);
int sendReplicatedIdentityPacket(const AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode);
void broadcastAvatarDataToAgent(const SharedNodePointer& node);
void broadcastAvatarDataToDownstreamMixer(const SharedNodePointer& node);
// frame state
ConstIter _begin;

View file

@ -76,8 +76,8 @@ void AvatarMixerSlavePool::processIncomingPackets(ConstIter begin, ConstIter end
}
void AvatarMixerSlavePool::broadcastAvatarData(ConstIter begin, ConstIter end,
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio) {
p_high_resolution_clock::time_point lastFrameTimestamp,
float maxKbpsPerNode, float throttlingRatio) {
_function = &AvatarMixerSlave::broadcastAvatarData;
_configure = [&](AvatarMixerSlave& slave) {
slave.configureBroadcast(begin, end, lastFrameTimestamp, maxKbpsPerNode, throttlingRatio);

View file

@ -1335,6 +1335,66 @@
"advanced": true
}
]
},
{
"name": "broadcasting",
"label": "Broadcasting",
"settings": [
{
"name": "users",
"label": "Broadcasted Users",
"type": "table",
"can_add_new_rows": true,
"help": "Users that are broadcasted to downstream servers",
"numbered": false,
"columns": [
{
"name": "user",
"label": "User",
"can_set": true
}
]
},
{
"name": "downstream_servers",
"label": "Receiving Servers",
"assignment-types": [0,1],
"type": "table",
"can_add_new_rows": true,
"help": "Servers that receive data for broadcasted users",
"numbered": false,
"columns": [
{
"name": "address",
"label": "Address",
"can_set": true
},
{
"name": "port",
"label": "Port",
"can_set": true
},
{
"name": "server_type",
"label": "Server Type",
"type": "select",
"placeholder": "Audio Mixer",
"default": "Audio Mixer",
"can_set": true,
"options": [
{
"value": "Audio Mixer",
"label": "Audio Mixer"
},
{
"value": "Avatar Mixer",
"label": "Avatar Mixer"
}
]
}
]
}
]
}
]
}

View file

@ -39,7 +39,8 @@ var Settings = {
ACCESS_TOKEN_SELECTOR: '[name="metaverse.access_token"]',
PLACES_TABLE_ID: 'places-table',
FORM_ID: 'settings-form',
INVALID_ROW_CLASS: 'invalid-input'
INVALID_ROW_CLASS: 'invalid-input',
DATA_ROW_INDEX: 'data-row-index'
};
var viewHelpers = {
@ -223,10 +224,10 @@ $(document).ready(function(){
// set focus to the first input in the new row
$target.closest('table').find('tr.inputs input:first').focus();
}
var tableRows = sibling.parent();
var tableBody = tableRows.parent();
// if theres no more siblings, we should jump to a new row
if (sibling.next().length == 0 && tableRows.nextAll().length == 1) {
tableBody.find("." + Settings.ADD_ROW_BUTTON_CLASS).click();
@ -1005,7 +1006,7 @@ function saveSettings() {
var password = formJSON["security"]["http_password"];
var verify_password = formJSON["security"]["verify_http_password"];
// if they've only emptied out the default password field, we should go ahead and acknowledge
// if they've only emptied out the default password field, we should go ahead and acknowledge
// the verify password field
if (password != undefined && verify_password == undefined) {
verify_password = "";
@ -1158,8 +1159,9 @@ function makeTable(setting, keypath, setting_value) {
}
html += "<tr class='" + Settings.DATA_ROW_CLASS + "' " +
(isCategorized ? ("data-category='" + categoryValue + "'") : "") + " " +
(isArray ? "" : "name='" + keypath + "." + rowIndexOrName + "'") + ">";
(isCategorized ? ("data-category='" + categoryValue + "'") : "") + " " +
(isArray ? "" : "name='" + keypath + "." + rowIndexOrName + "'") +
(isArray ? Settings.DATA_ROW_INDEX + "='" + (row_num - 1) + "'" : "" ) + ">";
if (setting.numbered === true) {
html += "<td class='numbered'>" + row_num + "</td>"
@ -1289,6 +1291,17 @@ function makeTableHiddenInputs(setting, initialValues, categoryValue) {
"<input type='checkbox' style='display: none;' class='form-control table-checkbox' " +
"name='" + col.name + "'" + (defaultValue ? " checked" : "") + "/>" +
"</td>";
} else if (col.type === "select") {
html += "<td class='" + Settings.DATA_COL_CLASS + "'name='" + col.name + "'>"
html += "<select style='display: none;' class='form-control' data-hidden-input='" + col.name + "'>'"
for (var i in col.options) {
var option = col.options[i];
html += "<option value='" + option.value + "' " + (option.value == defaultValue ? 'selected' : '') + ">" + option.label + "</option>";
}
html += "</select>";
html += "<input type='hidden' class='table-dropdown form-control trigger-change' name='" + col.name + "' value='" + defaultValue + "'></td>";
} else {
html +=
"<td " + (col.hidden ? "style='display: none;'" : "") + " class='" + Settings.DATA_COL_CLASS + "' " +
@ -1411,6 +1424,21 @@ function addTableRow(row) {
var setting_name = table.attr("name");
row.addClass(Settings.DATA_ROW_CLASS + " " + Settings.NEW_ROW_CLASS);
// if this is an array, add the row index (which is the index of the last row + 1)
// as a data attribute to the row
var row_index = 0;
if (isArray) {
var previous_row = row.siblings('.' + Settings.DATA_ROW_CLASS + ':last');
if (previous_row.length > 0) {
row_index = parseInt(previous_row.attr(Settings.DATA_ROW_INDEX), 10) + 1;
} else {
row_index = 0;
}
row.attr(Settings.DATA_ROW_INDEX, row_index);
}
var focusChanged = false;
_.each(row.children(), function(element) {
@ -1440,19 +1468,23 @@ function addTableRow(row) {
input.show();
var isCheckbox = input.hasClass("table-checkbox");
var isDropdown = input.hasClass("table-dropdown");
if (isArray) {
var row_index = row.siblings('.' + Settings.DATA_ROW_CLASS).length
var key = $(element).attr('name');
// are there multiple columns or just one?
// with multiple we have an array of Objects, with one we have an array of whatever the value type is
var num_columns = row.children('.' + Settings.DATA_COL_CLASS).length
var newName = setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : "");
if (isCheckbox) {
input.attr("name", setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : ""))
} else {
input.attr("name", setting_name + "[" + row_index + "]" + (num_columns > 1 ? "." + key : ""))
input.attr("name", newName);
if (isDropdown) {
// default values for hidden inputs inside child selects gets cleared so we need to remind it
var selectElement = $(element).children("select");
selectElement.attr("data-hidden-input", newName);
$(element).children("input").val(selectElement.val());
}
} else {
// because the name of the setting in question requires the key
@ -1468,6 +1500,12 @@ function addTableRow(row) {
focusChanged = true;
}
// if we are adding a dropdown, we should go ahead and make its select
// element is visible
if (isDropdown) {
$(element).children("select").attr("style", "");
}
if (isCheckbox) {
$(input).find("input").attr("data-changed", "true");
} else {

View file

@ -117,6 +117,10 @@ DomainServer::DomainServer(int argc, char* argv[]) :
// if permissions are updated, relay the changes to the Node datastructures
connect(&_settingsManager, &DomainServerSettingsManager::updateNodePermissions,
&_gatekeeper, &DomainGatekeeper::updateNodePermissions);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateReplicatedNodes);
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
this, &DomainServer::updateDownstreamNodes);
setupGroupCacheRefresh();
@ -129,6 +133,9 @@ DomainServer::DomainServer(int argc, char* argv[]) :
setupNodeListAndAssignments();
updateReplicatedNodes();
updateDownstreamNodes();
if (_type != NonMetaverse) {
// if we have a metaverse domain, we'll use an access token for API calls
resetAccountManagerAccessToken();
@ -958,6 +965,11 @@ void DomainServer::handleConnectedNode(SharedNodePointer newNode) {
emit userConnected();
}
if (shouldReplicateNode(*newNode)) {
qDebug() << "Setting node to replicated: " << newNode->getUUID();
newNode->setIsReplicated(true);
}
// send out this node to our other connected nodes
broadcastNewNode(newNode);
}
@ -2215,6 +2227,131 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
_unfulfilledAssignments.enqueue(assignment);
}
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
void DomainServer::updateDownstreamNodes() {
auto settings = _settingsManager.getSettingsMap();
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
auto nodeList = DependencyManager::get<LimitedNodeList>();
std::vector<HifiSockAddr> downstreamNodesInSettings;
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
if (replicationSettings.contains("downstream_servers")) {
auto serversSettings = replicationSettings.value("downstream_servers").toList();
std::vector<HifiSockAddr> knownDownstreamNodes;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
knownDownstreamNodes.push_back(otherNode->getPublicSocket());
}
});
for (auto& server : serversSettings) {
auto downstreamServer = server.toMap();
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
static const QString DOWNSTREAM_SERVER_PORT = "port";
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
// make sure we have the settings we need for this downstream server
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) {
auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
auto downstreamNodeType = NodeType::downstreamType(nodeType);
// read the address and port and construct a HifiSockAddr from them
HifiSockAddr downstreamServerAddr {
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
};
downstreamNodesInSettings.push_back(downstreamServerAddr);
bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
downstreamServerAddr) != knownDownstreamNodes.cend();
if (!knownNode) {
// manually add the downstream node to our node list
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType,
downstreamServerAddr, downstreamServerAddr);
node->setIsForcedNeverSilent(true);
qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr;
// manually activate the public socket for the downstream node
node->activatePublicSocket();
}
}
}
}
// enumerate the nodes to determine which are no longer downstream for this domain
// collect them in a vector to separately remove them with handleKillNode (since eachNode has a read lock and
// we cannot recursively take the write lock required by handleKillNode)
std::vector<SharedNodePointer> nodesToKill;
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
if (NodeType::isDownstream(otherNode->getType())) {
bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(),
otherNode->getPublicSocket()) != downstreamNodesInSettings.cend();
if (!nodeInSettings) {
qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket();
nodesToKill.push_back(otherNode);
}
}
});
for (auto& node : nodesToKill) {
handleKillNode(node);
}
}
}
void DomainServer::updateReplicatedNodes() {
// Make sure we have downstream nodes in our list
auto settings = _settingsManager.getSettingsMap();
static const QString REPLICATED_USERS_KEY = "users";
_replicatedUsernames.clear();
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
if (replicationSettings.contains(REPLICATED_USERS_KEY)) {
auto usersSettings = replicationSettings.value(REPLICATED_USERS_KEY).toList();
for (auto& username : usersSettings) {
_replicatedUsernames.push_back(username.toString().toLower());
}
}
}
auto nodeList = DependencyManager::get<LimitedNodeList>();
nodeList->eachMatchingNode([this](const SharedNodePointer& otherNode) -> bool {
return otherNode->getType() == NodeType::Agent;
}, [this](const SharedNodePointer& otherNode) {
auto shouldReplicate = shouldReplicateNode(*otherNode);
auto isReplicated = otherNode->isReplicated();
if (isReplicated && !shouldReplicate) {
qDebug() << "Setting node to NOT be replicated:"
<< otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID();
} else if (!isReplicated && shouldReplicate) {
qDebug() << "Setting node to replicated:"
<< otherNode->getPermissions().getVerifiedUserName() << otherNode->getUUID();
}
otherNode->setIsReplicated(shouldReplicate);
}
);
}
bool DomainServer::shouldReplicateNode(const Node& node) {
if (node.getType() == NodeType::Agent) {
QString verifiedUsername = node.getPermissions().getVerifiedUserName();
// Both the verified username and usernames in _replicatedUsernames are lowercase, so
// comparisons here are case-insensitive.
auto it = find(_replicatedUsernames.cbegin(), _replicatedUsernames.cend(), verifiedUsername);
return it != _replicatedUsernames.end();
} else {
return false;
}
};
void DomainServer::nodeAdded(SharedNodePointer node) {
// we don't use updateNodeWithData, so add the DomainServerNodeData to the node here
node->setLinkedData(std::unique_ptr<DomainServerNodeData> { new DomainServerNodeData() });

View file

@ -102,6 +102,9 @@ private slots:
void handleOctreeFileReplacement(QByteArray octreeFile);
void updateReplicatedNodes();
void updateDownstreamNodes();
signals:
void iceServerChanged();
void userConnected();
@ -161,12 +164,16 @@ private:
QJsonObject jsonForSocket(const HifiSockAddr& socket);
QJsonObject jsonObjectForNode(const SharedNodePointer& node);
bool shouldReplicateNode(const Node& node);
void setupGroupCacheRefresh();
QString pathForRedirect(QString path = QString()) const;
SubnetList _acSubnetWhitelist;
std::vector<QString> _replicatedUsernames;
DomainGatekeeper _gatekeeper;
HTTPManager _httpManager;

View file

@ -991,6 +991,7 @@ bool DomainServerSettingsManager::handleAuthenticatedHTTPRequest(HTTPConnection
unpackPermissions();
apiRefreshGroupInformation();
emit updateNodePermissions();
emit settingsUpdated();
}
return true;
@ -1196,13 +1197,14 @@ QJsonObject DomainServerSettingsManager::settingDescriptionFromGroup(const QJson
bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJsonObject& postedObject) {
static const QString SECURITY_ROOT_KEY = "security";
static const QString AC_SUBNET_WHITELIST_KEY = "ac_subnet_whitelist";
static const QString BROADCASTING_KEY = "broadcasting";
auto& settingsVariant = _configMap.getConfig();
bool needRestart = false;
// Iterate on the setting groups
foreach(const QString& rootKey, postedObject.keys()) {
QJsonValue rootValue = postedObject[rootKey];
const QJsonValue& rootValue = postedObject[rootKey];
if (!settingsVariant.contains(rootKey)) {
// we don't have a map below this key yet, so set it up now
@ -1247,7 +1249,7 @@ bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJ
if (!matchingDescriptionObject.isEmpty()) {
updateSetting(rootKey, rootValue, *thisMap, matchingDescriptionObject);
if (rootKey != SECURITY_ROOT_KEY) {
if (rootKey != SECURITY_ROOT_KEY && rootKey != BROADCASTING_KEY) {
needRestart = true;
}
} else {
@ -1261,9 +1263,10 @@ bool DomainServerSettingsManager::recurseJSONObjectAndOverwriteSettings(const QJ
// if we matched the setting then update the value
if (!matchingDescriptionObject.isEmpty()) {
QJsonValue settingValue = rootValue.toObject()[settingKey];
const QJsonValue& settingValue = rootValue.toObject()[settingKey];
updateSetting(settingKey, settingValue, *thisMap, matchingDescriptionObject);
if (rootKey != SECURITY_ROOT_KEY || settingKey == AC_SUBNET_WHITELIST_KEY) {
if ((rootKey != SECURITY_ROOT_KEY && rootKey != BROADCASTING_KEY)
|| settingKey == AC_SUBNET_WHITELIST_KEY) {
needRestart = true;
}
} else {

View file

@ -108,6 +108,7 @@ public:
signals:
void updateNodePermissions();
void settingsUpdated();
public slots:
void apiGetGroupIDJSONCallback(QNetworkReply& requestReply);

View file

@ -127,6 +127,7 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
// parse the info after the seq number and before the audio data (the stream properties)
int prePropertyPosition = message.getPosition();
int propertyBytes = parseStreamProperties(message.getType(), message.readWithoutCopy(message.getBytesLeftToRead()), networkFrames);
message.seek(prePropertyPosition + propertyBytes);
// handle this packet based on its arrival status.
@ -147,7 +148,8 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
}
case SequenceNumberStats::OnTime: {
// Packet is on time; parse its data to the ringbuffer
if (message.getType() == PacketType::SilentAudioFrame) {
if (message.getType() == PacketType::SilentAudioFrame
|| message.getType() == PacketType::ReplicatedSilentAudioFrame) {
// If we recieved a SilentAudioFrame from our sender, we might want to drop
// some of the samples in order to catch up to our desired jitter buffer size.
writeDroppableSilentFrames(networkFrames);
@ -168,7 +170,10 @@ int InboundAudioStream::parseData(ReceivedMessage& message) {
// inform others of the mismatch
auto sendingNode = DependencyManager::get<NodeList>()->nodeWithUUID(message.getSourceID());
emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket);
if (sendingNode) {
emit mismatchedAudioCodec(sendingNode, _selectedCodecName, codecInPacket);
}
}
}
break;

View file

@ -1473,27 +1473,6 @@ QStringList AvatarData::getJointNames() const {
return _jointNames;
}
void AvatarData::parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut) {
QDataStream packetStream(data);
packetStream >> identityOut.uuid
>> identityOut.skeletonModelURL
>> identityOut.attachmentData
>> identityOut.displayName
>> identityOut.sessionDisplayName
>> identityOut.avatarEntityData
>> identityOut.sequenceId;
#ifdef WANT_DEBUG
qCDebug(avatars) << __FUNCTION__
<< "identityOut.uuid:" << identityOut.uuid
<< "identityOut.skeletonModelURL:" << identityOut.skeletonModelURL
<< "identityOut.displayName:" << identityOut.displayName
<< "identityOut.sessionDisplayName:" << identityOut.sessionDisplayName;
#endif
}
glm::quat AvatarData::getOrientationOutbound() const {
return (getLocalOrientation());
}
@ -1504,47 +1483,82 @@ QUrl AvatarData::cannonicalSkeletonModelURL(const QUrl& emptyURL) const {
return _skeletonModelURL.scheme() == "file" ? emptyURL : _skeletonModelURL;
}
void AvatarData::processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged, bool& skeletonModelUrlChanged) {
void AvatarData::processAvatarIdentity(const QByteArray& identityData, bool& identityChanged,
bool& displayNameChanged, bool& skeletonModelUrlChanged) {
if (identity.sequenceId < _identitySequenceId) {
qCDebug(avatars) << "Ignoring older identity packet for avatar" << getSessionUUID()
<< "_identitySequenceId (" << _identitySequenceId << ") is greater than" << identity.sequenceId;
return;
QDataStream packetStream(identityData);
QUuid avatarSessionID;
// peek the sequence number, this will tell us if we should be processing this identity packet at all
udt::SequenceNumber::Type incomingSequenceNumberType;
packetStream >> avatarSessionID >> incomingSequenceNumberType;
udt::SequenceNumber incomingSequenceNumber(incomingSequenceNumberType);
if (!_hasProcessedFirstIdentity) {
_lastSequenceNumber = incomingSequenceNumber - 1;
_hasProcessedFirstIdentity = true;
qCDebug(avatars) << "Processing first identity packet for" << avatarSessionID << "-"
<< (udt::SequenceNumber::Type) incomingSequenceNumber;
}
// otherwise, set the identitySequenceId to match the incoming identity
_identitySequenceId = identity.sequenceId;
if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) {
setSkeletonModelURL(identity.skeletonModelURL);
identityChanged = true;
skeletonModelUrlChanged = true;
if (_firstSkeletonCheck) {
if (incomingSequenceNumber > _lastSequenceNumber) {
Identity identity;
packetStream >> identity.skeletonModelURL
>> identity.attachmentData
>> identity.displayName
>> identity.sessionDisplayName
>> identity.avatarEntityData;
// set the store identity sequence number to match the incoming identity
_lastSequenceNumber = incomingSequenceNumber;
if (_firstSkeletonCheck || (identity.skeletonModelURL != cannonicalSkeletonModelURL(emptyURL))) {
setSkeletonModelURL(identity.skeletonModelURL);
identityChanged = true;
skeletonModelUrlChanged = true;
if (_firstSkeletonCheck) {
displayNameChanged = true;
}
_firstSkeletonCheck = false;
}
if (identity.displayName != _displayName) {
_displayName = identity.displayName;
identityChanged = true;
displayNameChanged = true;
}
_firstSkeletonCheck = false;
}
maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName);
if (identity.displayName != _displayName) {
_displayName = identity.displayName;
identityChanged = true;
displayNameChanged = true;
}
maybeUpdateSessionDisplayNameFromTransport(identity.sessionDisplayName);
if (identity.attachmentData != _attachmentData) {
setAttachmentData(identity.attachmentData);
identityChanged = true;
}
if (identity.attachmentData != _attachmentData) {
setAttachmentData(identity.attachmentData);
identityChanged = true;
}
bool avatarEntityDataChanged = false;
_avatarEntitiesLock.withReadLock([&] {
avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData);
});
if (avatarEntityDataChanged) {
setAvatarEntityData(identity.avatarEntityData);
identityChanged = true;
}
bool avatarEntityDataChanged = false;
_avatarEntitiesLock.withReadLock([&] {
avatarEntityDataChanged = (identity.avatarEntityData != _avatarEntityData);
});
if (avatarEntityDataChanged) {
setAvatarEntityData(identity.avatarEntityData);
identityChanged = true;
}
#ifdef WANT_DEBUG
qCDebug(avatars) << __FUNCTION__
<< "identity.uuid:" << identity.uuid
<< "identity.skeletonModelURL:" << identity.skeletonModelURL
<< "identity.displayName:" << identity.displayName
<< "identity.sessionDisplayName:" << identity.sessionDisplayName;
} else {
qCDebug(avatars) << "Refusing to process identity for" << uuidStringWithoutCurlyBraces(avatarSessionID) << "since"
<< (udt::SequenceNumber::Type) _lastSequenceNumber
<< "is >=" << (udt::SequenceNumber::Type) incomingSequenceNumber;
#endif
}
}
QByteArray AvatarData::identityByteArray() const {
@ -1552,14 +1566,17 @@ QByteArray AvatarData::identityByteArray() const {
QDataStream identityStream(&identityData, QIODevice::Append);
const QUrl& urlToSend = cannonicalSkeletonModelURL(emptyURL); // depends on _skeletonModelURL
// when mixers send identity packets to agents, they simply forward along the last incoming sequence number they received
// whereas agents send a fresh outgoing sequence number when identity data has changed
_avatarEntitiesLock.withReadLock([&] {
identityStream << getSessionUUID()
<< urlToSend
<< _attachmentData
<< _displayName
<< getSessionDisplayNameForTransport() // depends on _sessionDisplayName
<< _avatarEntityData
<< _identitySequenceId;
<< (udt::SequenceNumber::Type) _lastSequenceNumber
<< urlToSend
<< _attachmentData
<< _displayName
<< getSessionDisplayNameForTransport() // depends on _sessionDisplayName
<< _avatarEntityData;
});
return identityData;
@ -1735,6 +1752,12 @@ void AvatarData::sendAvatarDataPacket() {
void AvatarData::sendIdentityPacket() {
auto nodeList = DependencyManager::get<NodeList>();
if (_identityDataChanged) {
// if the identity data has changed, push the sequence number forwards
++_lastSequenceNumber;
}
QByteArray identityData = identityByteArray();
auto packetList = NLPacketList::create(PacketType::AvatarIdentity, QByteArray(), true, true);
@ -1745,7 +1768,7 @@ void AvatarData::sendIdentityPacket() {
},
[&](const SharedNodePointer& node) {
nodeList->sendPacketList(std::move(packetList), *node);
});
});
_avatarEntityDataLocallyEdited = false;
_identityDataChanged = false;

View file

@ -52,15 +52,16 @@ typedef unsigned long long quint64;
#include <JointData.h>
#include <NLPacket.h>
#include <Node.h>
#include <RegisteredMetaTypes.h>
#include <SimpleMovingAverage.h>
#include <SpatiallyNestable.h>
#include <NumericalConstants.h>
#include <Packed.h>
#include <ThreadSafeValueCache.h>
#include <RegisteredMetaTypes.h>
#include <SharedUtil.h>
#include <shared/RateCounter.h>
#include <SimpleMovingAverage.h>
#include <SpatiallyNestable.h>
#include <ThreadSafeValueCache.h>
#include <ViewFrustum.h>
#include <shared/RateCounter.h>
#include <udt/SequenceNumber.h>
#include "AABox.h"
#include "HeadData.h"
@ -526,19 +527,17 @@ public:
const HeadData* getHeadData() const { return _headData; }
struct Identity {
QUuid uuid;
QUrl skeletonModelURL;
QVector<AttachmentData> attachmentData;
QString displayName;
QString sessionDisplayName;
AvatarEntityMap avatarEntityData;
quint64 sequenceId;
};
static void parseAvatarIdentityPacket(const QByteArray& data, Identity& identityOut);
// identityChanged returns true if identity has changed, false otherwise.
// identityChanged returns true if identity has changed, false otherwise. Similarly for displayNameChanged and skeletonModelUrlChange.
void processAvatarIdentity(const Identity& identity, bool& identityChanged, bool& displayNameChanged, bool& skeletonModelUrlChanged);
void processAvatarIdentity(const QByteArray& identityData, bool& identityChanged,
bool& displayNameChanged, bool& skeletonModelUrlChanged);
QByteArray identityByteArray() const;
@ -624,10 +623,7 @@ public:
static float _avatarSortCoefficientAge;
bool getIdentityDataChanged() const { return _identityDataChanged; } // has the identity data changed since the last time sendIdentityPacket() was called
void markIdentityDataChanged() {
_identityDataChanged = true;
_identitySequenceId++;
}
void markIdentityDataChanged() { _identityDataChanged = true; }
float getDensity() const { return _density; }
@ -785,7 +781,8 @@ protected:
float _audioAverageLoudness { 0.0f };
bool _identityDataChanged { false };
quint64 _identitySequenceId { 0 };
udt::SequenceNumber _lastSequenceNumber { 0 };
bool _hasProcessedFirstIdentity { false };
float _density;
private:

View file

@ -126,8 +126,14 @@ AvatarSharedPointer AvatarHashMap::parseAvatarData(QSharedPointer<ReceivedMessag
}
void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
AvatarData::Identity identity;
AvatarData::parseAvatarIdentityPacket(message->getMessage(), identity);
// peek the avatar UUID from the incoming packet
QUuid identityUUID = QUuid::fromRfc4122(message->peek(NUM_BYTES_RFC4122_UUID));
if (identityUUID.isNull()) {
qCDebug(avatars) << "Refusing to process identity packet for null avatar ID";
return;
}
// make sure this isn't for an ignored avatar
auto nodeList = DependencyManager::get<NodeList>();
@ -136,21 +142,22 @@ void AvatarHashMap::processAvatarIdentityPacket(QSharedPointer<ReceivedMessage>
{
QReadLocker locker(&_hashLock);
auto me = _avatarHash.find(EMPTY);
if ((me != _avatarHash.end()) && (identity.uuid == me.value()->getSessionUUID())) {
if ((me != _avatarHash.end()) && (identityUUID == me.value()->getSessionUUID())) {
// We add MyAvatar to _avatarHash with an empty UUID. Code relies on this. In order to correctly handle an
// identity packet for ourself (such as when we are assigned a sessionDisplayName by the mixer upon joining),
// we make things match here.
identity.uuid = EMPTY;
identityUUID = EMPTY;
}
}
if (!nodeList->isIgnoringNode(identity.uuid) || nodeList->getRequestsDomainListData()) {
if (!nodeList->isIgnoringNode(identityUUID) || nodeList->getRequestsDomainListData()) {
// mesh URL for a UUID, find avatar in our list
auto avatar = newOrExistingAvatar(identity.uuid, sendingNode);
auto avatar = newOrExistingAvatar(identityUUID, sendingNode);
bool identityChanged = false;
bool displayNameChanged = false;
bool skeletonModelUrlChanged = false;
// In this case, the "sendingNode" is the Avatar Mixer.
avatar->processAvatarIdentity(identity, identityChanged, displayNameChanged, skeletonModelUrlChanged);
avatar->processAvatarIdentity(message->getMessage(), identityChanged, displayNameChanged, skeletonModelUrlChanged);
}
}

View file

@ -446,7 +446,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr<NLPacketList> packetList,
return _nodeSocket.writePacketList(std::move(packetList), *activeSocket);
} else {
qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node. Not sending.";
qCDebug(networking) << "LimitedNodeList::sendPacketList called without active socket for node "
<< destinationNode.getUUID() << ". Not sending.";
return ERROR_SENDING_PACKET_BYTES;
}
}
@ -454,7 +455,8 @@ qint64 LimitedNodeList::sendPacketList(std::unique_ptr<NLPacketList> packetList,
qint64 LimitedNodeList::sendPacket(std::unique_ptr<NLPacket> packet, const Node& destinationNode,
const HifiSockAddr& overridenSockAddr) {
if (overridenSockAddr.isNull() && !destinationNode.getActiveSocket()) {
qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node. Not sending.";
qCDebug(networking) << "LimitedNodeList::sendPacket called without active socket for node"
<< destinationNode.getUUID() << ". Not sending.";
return ERROR_SENDING_PACKET_BYTES;
}
@ -568,8 +570,8 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) {
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
const NodePermissions& permissions,
const QUuid& connectionSecret) {
bool isReplicated, bool isUpstream,
const QUuid& connectionSecret, const NodePermissions& permissions) {
QReadLocker readLocker(&_nodeMutex);
NodeHash::const_iterator it = _nodeHash.find(uuid);
@ -580,11 +582,20 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
matchingNode->setLocalSocket(localSocket);
matchingNode->setPermissions(permissions);
matchingNode->setConnectionSecret(connectionSecret);
matchingNode->setIsReplicated(isReplicated);
matchingNode->setIsUpstream(isUpstream);
return matchingNode;
} else {
// we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket, permissions, connectionSecret);
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
newNode->setIsReplicated(isReplicated);
newNode->setIsUpstream(isUpstream);
newNode->setConnectionSecret(connectionSecret);
newNode->setPermissions(permissions);
// move the newly constructed node to the LNL thread
newNode->moveToThread(thread());
if (nodeType == NodeType::AudioMixer) {
LimitedNodeList::flagTimeForConnectionStep(LimitedNodeList::AddedAudioMixer);
@ -742,7 +753,8 @@ void LimitedNodeList::removeSilentNodes() {
SharedNodePointer node = it->second;
node->getMutex().lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) {
if (!node->isForcedNeverSilent()
&& (usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * USECS_PER_MSEC)) {
// call the NodeHash erase to get rid of this node
it = _nodeHash.unsafe_erase(it);

View file

@ -145,8 +145,9 @@ public:
SharedNodePointer addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
const NodePermissions& permissions = DEFAULT_AGENT_PERMISSIONS,
const QUuid& connectionSecret = QUuid());
bool isReplicated = false, bool isUpstream = false,
const QUuid& connectionSecret = QUuid(),
const NodePermissions& permissions = DEFAULT_AGENT_PERMISSIONS);
static bool parseSTUNResponse(udt::BasePacket* packet, QHostAddress& newPublicAddress, uint16_t& newPublicPort);
bool hasCompletedInitialSTUN() const { return _hasCompletedInitialSTUN; }
@ -257,6 +258,16 @@ public:
return SharedNodePointer();
}
// This is unsafe because it does not take a lock
// Must only be called when you know that a read lock on the node mutex is held
// and will be held for the duration of your iteration
template<typename NodeLambda>
void unsafeEachNode(NodeLambda functor) {
for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) {
functor(it->second);
}
}
void putLocalPortIntoSharedMemory(const QString key, QObject* parent, quint16 localPort);
bool getLocalServerPortFromSharedMemory(const QString key, quint16& localPort);
@ -386,6 +397,7 @@ protected:
}
}
private slots:
void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp);
void possiblyTimeoutSTUNAddressLookup();

View file

@ -42,6 +42,7 @@ public:
static std::unique_ptr<NLPacket> fromReceivedPacket(std::unique_ptr<char[]> data, qint64 size,
const HifiSockAddr& senderSockAddr);
static std::unique_ptr<NLPacket> fromBase(std::unique_ptr<Packet> packet);
// Provided for convenience, try to limit use

View file

@ -23,6 +23,8 @@ public:
PacketVersion getVersion() const { return _packetVersion; }
const QUuid& getSourceID() const { return _sourceID; }
qint64 getMaxSegmentSize() const override { return NLPacket::maxPayloadSize(_packetType, _isOrdered); }
private:
NLPacketList(PacketType packetType, QByteArray extendedHeader = QByteArray(), bool isReliable = false,

View file

@ -76,6 +76,14 @@ public:
float getOutboundBandwidth() const; // in kbps
float getInboundBandwidth() const; // in kbps
// Typically the LimitedNodeList removes nodes after they are "silent"
// meaning that we have not received any packets (including simple keepalive pings) from them for a set interval.
// The _isForcedNeverSilent flag tells the LimitedNodeList that a Node should never be killed by removeSilentNodes()
// even if its the timestamp of when it was last heard from has never been updated.
bool isForcedNeverSilent() const { return _isForcedNeverSilent; }
void setIsForcedNeverSilent(bool isForcedNeverSilent) { _isForcedNeverSilent = isForcedNeverSilent; }
friend QDataStream& operator<<(QDataStream& out, const NetworkPeer& peer);
friend QDataStream& operator>>(QDataStream& in, NetworkPeer& peer);
public slots:
@ -103,6 +111,8 @@ protected:
QTimer* _pingTimer = NULL;
int _connectionAttempts;
bool _isForcedNeverSilent { false };
};
QDebug operator<<(QDebug debug, const NetworkPeer &peer);

View file

@ -42,6 +42,8 @@ void NodeType::init() {
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server");
TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer");
TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer");
TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
}
@ -50,17 +52,34 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) {
return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME;
}
bool NodeType::isDownstream(NodeType_t nodeType) {
return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer;
}
NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
switch (primaryType) {
case AudioMixer:
return DownstreamAudioMixer;
case AvatarMixer:
return DownstreamAvatarMixer;
default:
return Unassigned;
}
}
NodeType_t NodeType::fromString(QString type) {
return TypeNameHash.key(type, NodeType::Unassigned);
}
Node::Node(const QUuid& uuid, NodeType_t type, const HifiSockAddr& publicSocket,
const HifiSockAddr& localSocket, const NodePermissions& permissions, const QUuid& connectionSecret,
QObject* parent) :
const HifiSockAddr& localSocket, QObject* parent) :
NetworkPeer(uuid, publicSocket, localSocket, parent),
_type(type),
_connectionSecret(connectionSecret),
_pingMs(-1), // "Uninitialized"
_clockSkewUsec(0),
_mutex(),
_clockSkewMovingPercentile(30, 0.8f), // moving 80th percentile of 30 samples
_permissions(permissions)
_clockSkewMovingPercentile(30, 0.8f) // moving 80th percentile of 30 samples
{
// Update socket's object name
setType(_type);
@ -135,6 +154,7 @@ QDataStream& operator<<(QDataStream& out, const Node& node) {
out << node._publicSocket;
out << node._localSocket;
out << node._permissions;
out << node._isReplicated;
return out;
}
@ -144,6 +164,7 @@ QDataStream& operator>>(QDataStream& in, Node& node) {
in >> node._publicSocket;
in >> node._localSocket;
in >> node._permissions;
in >> node._isReplicated;
return in;
}

View file

@ -37,9 +37,9 @@
class Node : public NetworkPeer {
Q_OBJECT
public:
Node(const QUuid& uuid, NodeType_t type,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket,
const NodePermissions& permissions, const QUuid& connectionSecret = QUuid(),
QObject* parent = nullptr);
bool operator==(const Node& otherNode) const { return _uuid == otherNode._uuid; }
@ -48,6 +48,12 @@ public:
char getType() const { return _type; }
void setType(char type);
bool isReplicated() const { return _isReplicated; }
void setIsReplicated(bool isReplicated) { _isReplicated = isReplicated; }
bool isUpstream() const { return _isUpstream; }
void setIsUpstream(bool isUpstream) { _isUpstream = isUpstream; }
const QUuid& getConnectionSecret() const { return _connectionSecret; }
void setConnectionSecret(const QUuid& connectionSecret) { _connectionSecret = connectionSecret; }
@ -89,13 +95,16 @@ private:
QUuid _connectionSecret;
std::unique_ptr<NodeData> _linkedData;
bool _isReplicated { false };
int _pingMs;
qint64 _clockSkewUsec;
QMutex _mutex;
MovingPercentile _clockSkewMovingPercentile;
NodePermissions _permissions;
bool _isUpstream { false };
tbb::concurrent_unordered_set<QUuid, UUIDHasher> _ignoredNodeIDSet;
mutable QReadWriteLock _ignoredNodeIDSetLock;
std::vector<QString> _replicatedUsernames { };
std::atomic_bool _ignoreRadiusEnabled;
};

View file

@ -654,8 +654,9 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
QUuid nodeUUID, connectionUUID;
HifiSockAddr nodePublicSocket, nodeLocalSocket;
NodePermissions permissions;
bool isReplicated;
packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions;
packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions >> isReplicated;
// if the public socket address is 0 then it's reachable at the same IP
// as the domain server
@ -666,7 +667,12 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
packetStream >> connectionUUID;
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket,
nodeLocalSocket, permissions, connectionUUID);
nodeLocalSocket, isReplicated, false, connectionUUID, permissions);
// nodes that are downstream of our own type are kept alive when we hear about them from the domain server
if (node->getType() == NodeType::downstreamType(_ownerType)) {
node->setLastHeardMicrostamp(usecTimestampNow());
}
}
void NodeList::sendAssignment(Assignment& assignment) {
@ -711,14 +717,20 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
}
void NodeList::startNodeHolePunch(const SharedNodePointer& node) {
// connect to the correct signal on this node so we know when to ping it
connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout);
// start the ping timer for this node
node->startPingTimer();
// we don't hole punch to downstream servers, since it is assumed that we have a direct line to them
// we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them
// ping this node immediately
pingPunchForInactiveNode(node);
if (!NodeType::isDownstream(node->getType()) && !node->isUpstream()) {
// connect to the correct signal on this node so we know when to ping it
connect(node.data(), &Node::pingTimerTimeout, this, &NodeList::handleNodePingTimeout);
// start the ping timer for this node
node->startPingTimer();
// ping this node immediately
pingPunchForInactiveNode(node);
}
}
void NodeList::handleNodePingTimeout() {
@ -761,8 +773,11 @@ void NodeList::stopKeepalivePingTimer() {
}
void NodeList::sendKeepAlivePings() {
// send keep-alive ping packets to nodes of types we care about that are not relayed to us from an upstream node
eachMatchingNode([this](const SharedNodePointer& node)->bool {
return _nodeTypesOfInterest.contains(node->getType());
auto type = node->getType();
return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type);
}, [&](const SharedNodePointer& node) {
sendPacket(constructPingPacket(), *node);
});
@ -1120,4 +1135,4 @@ void NodeList::setRequestsDomainListData(bool isRequesting) {
void NodeList::startThread() {
moveToNewNamedThread(this, "NodeList Thread", QThread::TimeCriticalPriority);
}
}

View file

@ -25,10 +25,17 @@ namespace NodeType {
const NodeType_t AssetServer = 'A';
const NodeType_t MessagesMixer = 'm';
const NodeType_t EntityScriptServer = 'S';
const NodeType_t DownstreamAudioMixer = 'a';
const NodeType_t DownstreamAvatarMixer = 'w';
const NodeType_t Unassigned = 1;
void init();
const QString& getNodeTypeName(NodeType_t nodeType);
bool isDownstream(NodeType_t nodeType);
NodeType_t downstreamType(NodeType_t primaryType);
NodeType_t fromString(QString type);
}
typedef QSet<NodeType_t> NodeSet;

View file

@ -42,6 +42,20 @@ ReceivedMessage::ReceivedMessage(NLPacket& packet)
{
}
ReceivedMessage::ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion,
const HifiSockAddr& senderSockAddr, QUuid sourceID) :
_data(byteArray),
_headData(_data.mid(0, HEAD_DATA_SIZE)),
_numPackets(1),
_sourceID(sourceID),
_packetType(packetType),
_packetVersion(packetVersion),
_senderSockAddr(senderSockAddr),
_isComplete(true)
{
}
void ReceivedMessage::setFailed() {
_failed = true;
_isComplete = true;

View file

@ -24,6 +24,8 @@ class ReceivedMessage : public QObject {
public:
ReceivedMessage(const NLPacketList& packetList);
ReceivedMessage(NLPacket& packet);
ReceivedMessage(QByteArray byteArray, PacketType packetType, PacketVersion packetVersion,
const HifiSockAddr& senderSockAddr, QUuid sourceID = QUuid());
QByteArray getMessage() const { return _data; }
const char* getRawMessage() const { return _data.constData(); }

View file

@ -10,6 +10,7 @@
//
#include <QtCore/QCoreApplication>
#include <QtCore/QJsonArray>
#include <QtCore/QJsonObject>
#include <QtCore/QThread>
#include <QtCore/QTimer>

View file

@ -18,6 +18,8 @@
#include "Assignment.h"
using DownstreamNodeFoundCallback = std::function<void(Node& downstreamNode)>;
class ThreadedAssignment : public Assignment {
Q_OBJECT
public:
@ -40,6 +42,7 @@ signals:
protected:
void commonInit(const QString& targetName, NodeType_t nodeType);
bool _isFinished;
QTimer _domainServerTimer;
QTimer _statsTimer;

View file

@ -39,7 +39,19 @@ const QSet<PacketType> NON_SOURCED_PACKETS = QSet<PacketType>()
<< PacketType::ICEServerPeerInformation << PacketType::ICEServerQuery << PacketType::ICEServerHeartbeat
<< PacketType::ICEServerHeartbeatACK << PacketType::ICEPing << PacketType::ICEPingReply
<< PacketType::ICEServerHeartbeatDenied << PacketType::AssignmentClientStatus << PacketType::StopNode
<< PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement;
<< PacketType::DomainServerRemovedNode << PacketType::UsernameFromIDReply << PacketType::OctreeFileReplacement
<< PacketType::ReplicatedMicrophoneAudioNoEcho << PacketType::ReplicatedMicrophoneAudioWithEcho
<< PacketType::ReplicatedInjectAudio << PacketType::ReplicatedSilentAudioFrame
<< PacketType::ReplicatedAvatarIdentity << PacketType::ReplicatedKillAvatar << PacketType::ReplicatedBulkAvatarData;
const QHash<PacketType, PacketType> REPLICATED_PACKET_MAPPING {
{ PacketType::MicrophoneAudioNoEcho, PacketType::ReplicatedMicrophoneAudioNoEcho },
{ PacketType::MicrophoneAudioWithEcho, PacketType::ReplicatedMicrophoneAudioWithEcho },
{ PacketType::InjectAudio, PacketType::ReplicatedInjectAudio },
{ PacketType::SilentAudioFrame, PacketType::ReplicatedSilentAudioFrame },
{ PacketType::AvatarIdentity, PacketType::ReplicatedAvatarIdentity },
{ PacketType::KillAvatar, PacketType::ReplicatedKillAvatar },
};
PacketVersion versionForPacketType(PacketType packetType) {
switch (packetType) {
@ -56,7 +68,7 @@ PacketVersion versionForPacketType(PacketType packetType) {
case PacketType::AvatarData:
case PacketType::BulkAvatarData:
case PacketType::KillAvatar:
return static_cast<PacketVersion>(AvatarMixerPacketVersion::MannequinDefaultAvatar);
return static_cast<PacketVersion>(AvatarMixerPacketVersion::AvatarIdentitySequenceFront);
case PacketType::MessagesData:
return static_cast<PacketVersion>(MessageDataVersion::TextOrBinaryData);
case PacketType::ICEServerHeartbeat:
@ -119,7 +131,7 @@ static void ensureProtocolVersionsSignature() {
std::call_once(once, [&] {
QByteArray buffer;
QDataStream stream(&buffer, QIODevice::WriteOnly);
uint8_t numberOfProtocols = static_cast<uint8_t>(PacketType::LAST_PACKET_TYPE) + 1;
uint8_t numberOfProtocols = static_cast<uint8_t>(PacketType::NUM_PACKET_TYPE);
stream << numberOfProtocols;
for (uint8_t packetType = 0; packetType < numberOfProtocols; packetType++) {
uint8_t packetTypeVersion = static_cast<uint8_t>(versionForPacketType(static_cast<PacketType>(packetType)));

View file

@ -115,12 +115,21 @@ public:
AdjustAvatarSorting,
OctreeFileReplacement,
CollisionEventChanges,
LAST_PACKET_TYPE = CollisionEventChanges
ReplicatedMicrophoneAudioNoEcho,
ReplicatedMicrophoneAudioWithEcho,
ReplicatedInjectAudio,
ReplicatedSilentAudioFrame,
ReplicatedAvatarIdentity,
ReplicatedKillAvatar,
ReplicatedBulkAvatarData,
NUM_PACKET_TYPE
};
};
using PacketType = PacketTypeEnum::Value;
extern const QHash<PacketType, PacketType> REPLICATED_PACKET_MAPPING;
const int NUM_BYTES_MD5_HASH = 16;
typedef char PacketVersion;
@ -237,7 +246,8 @@ enum class AvatarMixerPacketVersion : PacketVersion {
StickAndBallDefaultAvatar,
IdentityPacketsIncludeUpdateTime,
AvatarIdentitySequenceId,
MannequinDefaultAvatar
MannequinDefaultAvatar,
AvatarIdentitySequenceFront
};
enum class DomainConnectRequestVersion : PacketVersion {

View file

@ -36,8 +36,8 @@ std::unique_ptr<PacketList> PacketList::fromReceivedPackets(std::list<std::uniqu
PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool isReliable, bool isOrdered) :
_packetType(packetType),
_isReliable(isReliable),
_isOrdered(isOrdered),
_isReliable(isReliable),
_extendedHeader(extendedHeader)
{
Q_ASSERT_X(!(!_isReliable && _isOrdered), "PacketList", "Unreliable ordered PacketLists are not currently supported");
@ -46,8 +46,8 @@ PacketList::PacketList(PacketType packetType, QByteArray extendedHeader, bool is
PacketList::PacketList(PacketList&& other) :
_packetType(other._packetType),
_packets(std::move(other._packets)),
_isReliable(other._isReliable),
_isOrdered(other._isOrdered),
_isReliable(other._isReliable),
_extendedHeader(std::move(other._extendedHeader))
{
}

View file

@ -49,6 +49,8 @@ public:
void startSegment();
void endSegment();
virtual qint64 getMaxSegmentSize() const { return Packet::maxPayloadSize(_isOrdered); }
HifiSockAddr getSenderSockAddr() const;
void closeCurrentPacket(bool shouldSendEmpty = false);
@ -74,6 +76,8 @@ protected:
PacketType _packetType;
std::list<std::unique_ptr<Packet>> _packets;
bool _isOrdered = false;
private:
friend class ::LimitedNodeList;
@ -93,7 +97,6 @@ private:
Packet::MessageNumber _messageNumber;
bool _isReliable = false;
bool _isOrdered = false;
std::unique_ptr<Packet> _currentPacket;

View file

@ -35,8 +35,8 @@ public:
explicit SequenceNumber(char* value) { _value = (*reinterpret_cast<int32_t*>(value)) & MAX; }
explicit SequenceNumber(Type value) { _value = (value <= MAX) ? ((value >= 0) ? value : 0) : MAX; }
explicit SequenceNumber(UType value) { _value = (value <= MAX) ? value : MAX; }
explicit operator Type() { return _value; }
explicit operator UType() { return static_cast<UType>(_value); }
explicit operator Type() const { return _value; }
explicit operator UType() const { return static_cast<UType>(_value); }
inline SequenceNumber& operator++() {
_value = (_value + 1) % (MAX + 1);