mirror of
https://github.com/overte-org/overte.git
synced 2025-08-10 07:53:08 +02:00
Merge pull request #10778 from birarda/feat/upstream-in-nodelist
add upstream nodes to NL and verify replicated packet sender
This commit is contained in:
commit
c47645e075
13 changed files with 210 additions and 60 deletions
|
@ -103,11 +103,6 @@ AudioMixer::AudioMixer(ReceivedMessage& message) :
|
||||||
);
|
);
|
||||||
|
|
||||||
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
|
connect(nodeList.data(), &NodeList::nodeKilled, this, &AudioMixer::handleNodeKilled);
|
||||||
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
|
|
||||||
if (node->getType() == NodeType::DownstreamAudioMixer) {
|
|
||||||
node->activatePublicSocket();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
|
void AudioMixer::queueAudioPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer node) {
|
||||||
|
@ -389,7 +384,10 @@ void AudioMixer::start() {
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
|
|
||||||
// prepare the NodeList
|
// prepare the NodeList
|
||||||
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAudioMixer, NodeType::EntityScriptServer });
|
nodeList->addSetOfNodeTypesToNodeInterestSet({
|
||||||
|
NodeType::Agent, NodeType::EntityScriptServer,
|
||||||
|
NodeType::UpstreamAudioMixer, NodeType::DownstreamAudioMixer
|
||||||
|
});
|
||||||
nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
|
nodeList->linkedDataCreateCallback = [&](Node* node) { getOrCreateClientData(node); };
|
||||||
|
|
||||||
// parse out any AudioMixer settings
|
// parse out any AudioMixer settings
|
||||||
|
|
|
@ -67,7 +67,6 @@ void AudioMixerClientData::processPackets() {
|
||||||
case PacketType::MicrophoneAudioNoEcho:
|
case PacketType::MicrophoneAudioNoEcho:
|
||||||
case PacketType::MicrophoneAudioWithEcho:
|
case PacketType::MicrophoneAudioWithEcho:
|
||||||
case PacketType::InjectAudio:
|
case PacketType::InjectAudio:
|
||||||
case PacketType::AudioStreamStats:
|
|
||||||
case PacketType::SilentAudioFrame:
|
case PacketType::SilentAudioFrame:
|
||||||
case PacketType::ReplicatedMicrophoneAudioNoEcho:
|
case PacketType::ReplicatedMicrophoneAudioNoEcho:
|
||||||
case PacketType::ReplicatedMicrophoneAudioWithEcho:
|
case PacketType::ReplicatedMicrophoneAudioWithEcho:
|
||||||
|
@ -80,11 +79,17 @@ void AudioMixerClientData::processPackets() {
|
||||||
|
|
||||||
QMutexLocker lock(&getMutex());
|
QMutexLocker lock(&getMutex());
|
||||||
parseData(*packet);
|
parseData(*packet);
|
||||||
|
|
||||||
optionallyReplicatePacket(*packet, *node);
|
optionallyReplicatePacket(*packet, *node);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case PacketType::AudioStreamStats: {
|
||||||
|
QMutexLocker lock(&getMutex());
|
||||||
|
parseData(*packet);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
case PacketType::NegotiateAudioFormat:
|
case PacketType::NegotiateAudioFormat:
|
||||||
negotiateAudioFormat(*packet, node);
|
negotiateAudioFormat(*packet, node);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -66,7 +66,6 @@ AvatarMixer::AvatarMixer(ReceivedMessage& message) :
|
||||||
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
|
connect(nodeList.data(), &NodeList::nodeAdded, this, [this](const SharedNodePointer& node) {
|
||||||
if (node->getType() == NodeType::DownstreamAvatarMixer) {
|
if (node->getType() == NodeType::DownstreamAvatarMixer) {
|
||||||
getOrCreateClientData(node);
|
getOrCreateClientData(node);
|
||||||
node->activatePublicSocket();
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -165,10 +164,6 @@ void AvatarMixer::queueIncomingPacket(QSharedPointer<ReceivedMessage> message, S
|
||||||
_queueIncomingPacketElapsedTime += (end - start);
|
_queueIncomingPacketElapsedTime += (end - start);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
AvatarMixer::~AvatarMixer() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
|
void AvatarMixer::sendIdentityPacket(AvatarMixerClientData* nodeData, const SharedNodePointer& destinationNode) {
|
||||||
if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) {
|
if (destinationNode->getType() == NodeType::Agent && !destinationNode->isUpstream()) {
|
||||||
QByteArray individualData = nodeData->getAvatar().identityByteArray();
|
QByteArray individualData = nodeData->getAvatar().identityByteArray();
|
||||||
|
@ -862,7 +857,10 @@ AvatarMixerClientData* AvatarMixer::getOrCreateClientData(SharedNodePointer node
|
||||||
|
|
||||||
void AvatarMixer::domainSettingsRequestComplete() {
|
void AvatarMixer::domainSettingsRequestComplete() {
|
||||||
auto nodeList = DependencyManager::get<NodeList>();
|
auto nodeList = DependencyManager::get<NodeList>();
|
||||||
nodeList->addSetOfNodeTypesToNodeInterestSet({ NodeType::Agent, NodeType::DownstreamAvatarMixer, NodeType::EntityScriptServer });
|
nodeList->addSetOfNodeTypesToNodeInterestSet({
|
||||||
|
NodeType::Agent, NodeType::EntityScriptServer,
|
||||||
|
NodeType::UpstreamAvatarMixer, NodeType::DownstreamAvatarMixer
|
||||||
|
});
|
||||||
|
|
||||||
// parse the settings to pull out the values we need
|
// parse the settings to pull out the values we need
|
||||||
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
|
parseDomainServerSettings(nodeList->getDomainHandler().getSettingsObject());
|
||||||
|
|
|
@ -28,7 +28,6 @@ class AvatarMixer : public ThreadedAssignment {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
public:
|
public:
|
||||||
AvatarMixer(ReceivedMessage& message);
|
AvatarMixer(ReceivedMessage& message);
|
||||||
~AvatarMixer();
|
|
||||||
public slots:
|
public slots:
|
||||||
/// runs the avatar mixer
|
/// runs the avatar mixer
|
||||||
void run() override;
|
void run() override;
|
||||||
|
|
|
@ -83,9 +83,9 @@ int AvatarMixerSlave::sendReplicatedIdentityPacket(const AvatarMixerClientData*
|
||||||
if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) {
|
if (destinationNode->getType() == NodeType::DownstreamAvatarMixer) {
|
||||||
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
|
QByteArray individualData = nodeData->getConstAvatarData()->identityByteArray();
|
||||||
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
|
individualData.replace(0, NUM_BYTES_RFC4122_UUID, nodeData->getNodeID().toRfc4122()); // FIXME, this looks suspicious
|
||||||
auto identityPacket = NLPacket::create(PacketType::ReplicatedAvatarIdentity);
|
auto identityPacket = NLPacketList::create(PacketType::ReplicatedAvatarIdentity, QByteArray(), true, true);
|
||||||
identityPacket->write(individualData);
|
identityPacket->write(individualData);
|
||||||
DependencyManager::get<NodeList>()->sendUnreliablePacket(*identityPacket, *destinationNode);
|
DependencyManager::get<NodeList>()->sendPacketList(std::move(identityPacket), *destinationNode);
|
||||||
_stats.numIdentityPackets++;
|
_stats.numIdentityPackets++;
|
||||||
return individualData.size();
|
return individualData.size();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1339,6 +1339,7 @@
|
||||||
{
|
{
|
||||||
"name": "broadcasting",
|
"name": "broadcasting",
|
||||||
"label": "Broadcasting",
|
"label": "Broadcasting",
|
||||||
|
"restart": false,
|
||||||
"settings": [
|
"settings": [
|
||||||
{
|
{
|
||||||
"name": "users",
|
"name": "users",
|
||||||
|
@ -1395,6 +1396,46 @@
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "upstream_servers",
|
||||||
|
"label": "Broadcasting Servers",
|
||||||
|
"assignment-types": [0,1],
|
||||||
|
"type": "table",
|
||||||
|
"advanced": true,
|
||||||
|
"can_add_new_rows": true,
|
||||||
|
"help": "Servers that broadcast data to this domain",
|
||||||
|
"numbered": false,
|
||||||
|
"columns": [
|
||||||
|
{
|
||||||
|
"name": "address",
|
||||||
|
"label": "Address",
|
||||||
|
"can_set": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "port",
|
||||||
|
"label": "Port",
|
||||||
|
"can_set": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "server_type",
|
||||||
|
"label": "Server Type",
|
||||||
|
"type": "select",
|
||||||
|
"placeholder": "Audio Mixer",
|
||||||
|
"default": "Audio Mixer",
|
||||||
|
"can_set": true,
|
||||||
|
"options": [
|
||||||
|
{
|
||||||
|
"value": "Audio Mixer",
|
||||||
|
"label": "Audio Mixer"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value": "Avatar Mixer",
|
||||||
|
"label": "Avatar Mixer"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,8 @@ DomainServer::DomainServer(int argc, char* argv[]) :
|
||||||
this, &DomainServer::updateReplicatedNodes);
|
this, &DomainServer::updateReplicatedNodes);
|
||||||
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
|
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
|
||||||
this, &DomainServer::updateDownstreamNodes);
|
this, &DomainServer::updateDownstreamNodes);
|
||||||
|
connect(&_settingsManager, &DomainServerSettingsManager::settingsUpdated,
|
||||||
|
this, &DomainServer::updateUpstreamNodes);
|
||||||
|
|
||||||
setupGroupCacheRefresh();
|
setupGroupCacheRefresh();
|
||||||
|
|
||||||
|
@ -135,6 +137,7 @@ DomainServer::DomainServer(int argc, char* argv[]) :
|
||||||
|
|
||||||
updateReplicatedNodes();
|
updateReplicatedNodes();
|
||||||
updateDownstreamNodes();
|
updateDownstreamNodes();
|
||||||
|
updateUpstreamNodes();
|
||||||
|
|
||||||
if (_type != NonMetaverse) {
|
if (_type != NonMetaverse) {
|
||||||
// if we have a metaverse domain, we'll use an access token for API calls
|
// if we have a metaverse domain, we'll use an access token for API calls
|
||||||
|
@ -2229,53 +2232,84 @@ void DomainServer::refreshStaticAssignmentAndAddToQueue(SharedAssignmentPointer&
|
||||||
|
|
||||||
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
|
static const QString BROADCASTING_SETTINGS_KEY = "broadcasting";
|
||||||
|
|
||||||
void DomainServer::updateDownstreamNodes() {
|
struct ReplicationServerInfo {
|
||||||
|
NodeType_t nodeType;
|
||||||
|
HifiSockAddr sockAddr;
|
||||||
|
};
|
||||||
|
|
||||||
|
ReplicationServerInfo serverInformationFromSettings(QVariantMap serverMap, ReplicationServerDirection direction) {
|
||||||
|
static const QString REPLICATION_SERVER_ADDRESS = "address";
|
||||||
|
static const QString REPLICATION_SERVER_PORT = "port";
|
||||||
|
static const QString REPLICATION_SERVER_TYPE = "server_type";
|
||||||
|
|
||||||
|
if (serverMap.contains(REPLICATION_SERVER_ADDRESS) && serverMap.contains(REPLICATION_SERVER_PORT)
|
||||||
|
&& serverMap.contains(REPLICATION_SERVER_TYPE)) {
|
||||||
|
|
||||||
|
auto nodeType = NodeType::fromString(serverMap[REPLICATION_SERVER_TYPE].toString());
|
||||||
|
|
||||||
|
ReplicationServerInfo serverInfo;
|
||||||
|
|
||||||
|
if (direction == Upstream) {
|
||||||
|
serverInfo.nodeType = NodeType::upstreamType(nodeType);
|
||||||
|
} else if (direction == Downstream) {
|
||||||
|
serverInfo.nodeType = NodeType::downstreamType(nodeType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// read the address and port and construct a HifiSockAddr from them
|
||||||
|
serverInfo.sockAddr = {
|
||||||
|
serverMap[REPLICATION_SERVER_ADDRESS].toString(),
|
||||||
|
(quint16) serverMap[REPLICATION_SERVER_PORT].toString().toInt()
|
||||||
|
};
|
||||||
|
|
||||||
|
return serverInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { NodeType::Unassigned, HifiSockAddr() };
|
||||||
|
}
|
||||||
|
|
||||||
|
void DomainServer::updateReplicationNodes(ReplicationServerDirection direction) {
|
||||||
auto settings = _settingsManager.getSettingsMap();
|
auto settings = _settingsManager.getSettingsMap();
|
||||||
|
|
||||||
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
|
if (settings.contains(BROADCASTING_SETTINGS_KEY)) {
|
||||||
auto nodeList = DependencyManager::get<LimitedNodeList>();
|
auto nodeList = DependencyManager::get<LimitedNodeList>();
|
||||||
std::vector<HifiSockAddr> downstreamNodesInSettings;
|
std::vector<HifiSockAddr> replicationNodesInSettings;
|
||||||
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
|
|
||||||
if (replicationSettings.contains("downstream_servers")) {
|
|
||||||
auto serversSettings = replicationSettings.value("downstream_servers").toList();
|
|
||||||
|
|
||||||
std::vector<HifiSockAddr> knownDownstreamNodes;
|
auto replicationSettings = settings.value(BROADCASTING_SETTINGS_KEY).toMap();
|
||||||
|
|
||||||
|
QString serversKey = direction == Upstream ? "upstream_servers" : "downstream_servers";
|
||||||
|
QString replicationDirection = direction == Upstream ? "upstream" : "downstream";
|
||||||
|
|
||||||
|
if (replicationSettings.contains(serversKey)) {
|
||||||
|
auto serversSettings = replicationSettings.value(serversKey).toList();
|
||||||
|
|
||||||
|
std::vector<HifiSockAddr> knownReplicationNodes;
|
||||||
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
|
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
|
||||||
if (NodeType::isDownstream(otherNode->getType())) {
|
if ((direction == Upstream && NodeType::isUpstream(otherNode->getType()))
|
||||||
knownDownstreamNodes.push_back(otherNode->getPublicSocket());
|
|| (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) {
|
||||||
|
knownReplicationNodes.push_back(otherNode->getPublicSocket());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
for (auto& server : serversSettings) {
|
for (auto& server : serversSettings) {
|
||||||
auto downstreamServer = server.toMap();
|
auto replicationServer = serverInformationFromSettings(server.toMap(), direction);
|
||||||
|
|
||||||
static const QString DOWNSTREAM_SERVER_ADDRESS = "address";
|
if (!replicationServer.sockAddr.isNull() && replicationServer.nodeType != NodeType::Unassigned) {
|
||||||
static const QString DOWNSTREAM_SERVER_PORT = "port";
|
// make sure we have the settings we need for this replication server
|
||||||
static const QString DOWNSTREAM_SERVER_TYPE = "server_type";
|
replicationNodesInSettings.push_back(replicationServer.sockAddr);
|
||||||
|
|
||||||
// make sure we have the settings we need for this downstream server
|
bool knownNode = find(knownReplicationNodes.cbegin(), knownReplicationNodes.cend(),
|
||||||
if (downstreamServer.contains(DOWNSTREAM_SERVER_ADDRESS) && downstreamServer.contains(DOWNSTREAM_SERVER_PORT)) {
|
replicationServer.sockAddr) != knownReplicationNodes.cend();
|
||||||
|
|
||||||
auto nodeType = NodeType::fromString(downstreamServer[DOWNSTREAM_SERVER_TYPE].toString());
|
|
||||||
auto downstreamNodeType = NodeType::downstreamType(nodeType);
|
|
||||||
|
|
||||||
// read the address and port and construct a HifiSockAddr from them
|
|
||||||
HifiSockAddr downstreamServerAddr {
|
|
||||||
downstreamServer[DOWNSTREAM_SERVER_ADDRESS].toString(),
|
|
||||||
(quint16) downstreamServer[DOWNSTREAM_SERVER_PORT].toString().toInt()
|
|
||||||
};
|
|
||||||
downstreamNodesInSettings.push_back(downstreamServerAddr);
|
|
||||||
|
|
||||||
bool knownNode = find(knownDownstreamNodes.cbegin(), knownDownstreamNodes.cend(),
|
|
||||||
downstreamServerAddr) != knownDownstreamNodes.cend();
|
|
||||||
if (!knownNode) {
|
if (!knownNode) {
|
||||||
// manually add the downstream node to our node list
|
// manually add the replication node to our node list
|
||||||
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), downstreamNodeType,
|
auto node = nodeList->addOrUpdateNode(QUuid::createUuid(), replicationServer.nodeType,
|
||||||
downstreamServerAddr, downstreamServerAddr);
|
replicationServer.sockAddr, replicationServer.sockAddr,
|
||||||
|
false, direction == Upstream);
|
||||||
node->setIsForcedNeverSilent(true);
|
node->setIsForcedNeverSilent(true);
|
||||||
|
|
||||||
qDebug() << "Adding downstream node:" << node->getUUID() << downstreamServerAddr;
|
qDebug() << "Adding" << (direction == Upstream ? "upstream" : "downstream")
|
||||||
|
<< "node:" << node->getUUID() << replicationServer.sockAddr;
|
||||||
|
|
||||||
// manually activate the public socket for the downstream node
|
// manually activate the public socket for the replication node
|
||||||
node->activatePublicSocket();
|
node->activatePublicSocket();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2288,11 +2322,13 @@ void DomainServer::updateDownstreamNodes() {
|
||||||
// we cannot recursively take the write lock required by handleKillNode)
|
// we cannot recursively take the write lock required by handleKillNode)
|
||||||
std::vector<SharedNodePointer> nodesToKill;
|
std::vector<SharedNodePointer> nodesToKill;
|
||||||
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
|
nodeList->eachNode([&](const SharedNodePointer& otherNode) {
|
||||||
if (NodeType::isDownstream(otherNode->getType())) {
|
if ((direction == Upstream && NodeType::isUpstream(otherNode->getType()))
|
||||||
bool nodeInSettings = find(downstreamNodesInSettings.cbegin(), downstreamNodesInSettings.cend(),
|
|| (direction == Downstream && NodeType::isDownstream(otherNode->getType()))) {
|
||||||
otherNode->getPublicSocket()) != downstreamNodesInSettings.cend();
|
bool nodeInSettings = find(replicationNodesInSettings.cbegin(), replicationNodesInSettings.cend(),
|
||||||
|
otherNode->getPublicSocket()) != replicationNodesInSettings.cend();
|
||||||
if (!nodeInSettings) {
|
if (!nodeInSettings) {
|
||||||
qDebug() << "Removing downstream node:" << otherNode->getUUID() << otherNode->getPublicSocket();
|
qDebug() << "Removing" << replicationDirection
|
||||||
|
<< "node:" << otherNode->getUUID() << otherNode->getPublicSocket();
|
||||||
nodesToKill.push_back(otherNode);
|
nodesToKill.push_back(otherNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2304,6 +2340,14 @@ void DomainServer::updateDownstreamNodes() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DomainServer::updateDownstreamNodes() {
|
||||||
|
updateReplicationNodes(Downstream);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DomainServer::updateUpstreamNodes() {
|
||||||
|
updateReplicationNodes(Upstream);
|
||||||
|
}
|
||||||
|
|
||||||
void DomainServer::updateReplicatedNodes() {
|
void DomainServer::updateReplicatedNodes() {
|
||||||
// Make sure we have downstream nodes in our list
|
// Make sure we have downstream nodes in our list
|
||||||
auto settings = _settingsManager.getSettingsMap();
|
auto settings = _settingsManager.getSettingsMap();
|
||||||
|
|
|
@ -39,6 +39,11 @@ typedef QMultiHash<QUuid, WalletTransaction*> TransactionHash;
|
||||||
using Subnet = QPair<QHostAddress, int>;
|
using Subnet = QPair<QHostAddress, int>;
|
||||||
using SubnetList = std::vector<Subnet>;
|
using SubnetList = std::vector<Subnet>;
|
||||||
|
|
||||||
|
enum ReplicationServerDirection {
|
||||||
|
Upstream,
|
||||||
|
Downstream
|
||||||
|
};
|
||||||
|
|
||||||
class DomainServer : public QCoreApplication, public HTTPSRequestHandler {
|
class DomainServer : public QCoreApplication, public HTTPSRequestHandler {
|
||||||
Q_OBJECT
|
Q_OBJECT
|
||||||
public:
|
public:
|
||||||
|
@ -104,6 +109,7 @@ private slots:
|
||||||
|
|
||||||
void updateReplicatedNodes();
|
void updateReplicatedNodes();
|
||||||
void updateDownstreamNodes();
|
void updateDownstreamNodes();
|
||||||
|
void updateUpstreamNodes();
|
||||||
|
|
||||||
signals:
|
signals:
|
||||||
void iceServerChanged();
|
void iceServerChanged();
|
||||||
|
@ -170,6 +176,8 @@ private:
|
||||||
|
|
||||||
QString pathForRedirect(QString path = QString()) const;
|
QString pathForRedirect(QString path = QString()) const;
|
||||||
|
|
||||||
|
void updateReplicationNodes(ReplicationServerDirection direction);
|
||||||
|
|
||||||
SubnetList _acSubnetWhitelist;
|
SubnetList _acSubnetWhitelist;
|
||||||
|
|
||||||
std::vector<QString> _replicatedUsernames;
|
std::vector<QString> _replicatedUsernames;
|
||||||
|
|
|
@ -257,8 +257,40 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
|
||||||
PacketType headerType = NLPacket::typeInHeader(packet);
|
PacketType headerType = NLPacket::typeInHeader(packet);
|
||||||
|
|
||||||
if (NON_SOURCED_PACKETS.contains(headerType)) {
|
if (NON_SOURCED_PACKETS.contains(headerType)) {
|
||||||
emit dataReceived(NodeType::Unassigned, packet.getPayloadSize());
|
if (REPLICATED_PACKET_MAPPING.key(headerType) != PacketType::Unknown) {
|
||||||
return true;
|
// this is a replicated packet type - make sure the socket that sent it to us matches
|
||||||
|
// one from one of our current upstream nodes
|
||||||
|
|
||||||
|
NodeType_t sendingNodeType { NodeType::Unassigned };
|
||||||
|
|
||||||
|
eachNodeBreakable([&packet, &sendingNodeType](const SharedNodePointer& node){
|
||||||
|
if (NodeType::isUpstream(node->getType()) && node->getPublicSocket() == packet.getSenderSockAddr()) {
|
||||||
|
sendingNodeType = node->getType();
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (sendingNodeType != NodeType::Unassigned) {
|
||||||
|
emit dataReceived(sendingNodeType, packet.getPayloadSize());
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
static const QString UNSOLICITED_REPLICATED_REGEX =
|
||||||
|
"Replicated packet of type \\d+ \\([\\sa-zA-Z:]+\\) received from unknown upstream";
|
||||||
|
static QString repeatedMessage
|
||||||
|
= LogHandler::getInstance().addRepeatedMessageRegex(UNSOLICITED_REPLICATED_REGEX);
|
||||||
|
|
||||||
|
qCDebug(networking) << "Replicated packet of type" << headerType
|
||||||
|
<< "received from unknown upstream" << packet.getSenderSockAddr();
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
emit dataReceived(NodeType::Unassigned, packet.getPayloadSize());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
QUuid sourceID = NLPacket::sourceIDInHeader(packet);
|
QUuid sourceID = NLPacket::sourceIDInHeader(packet);
|
||||||
|
|
||||||
|
@ -583,14 +615,14 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
|
||||||
matchingNode->setPermissions(permissions);
|
matchingNode->setPermissions(permissions);
|
||||||
matchingNode->setConnectionSecret(connectionSecret);
|
matchingNode->setConnectionSecret(connectionSecret);
|
||||||
matchingNode->setIsReplicated(isReplicated);
|
matchingNode->setIsReplicated(isReplicated);
|
||||||
matchingNode->setIsUpstream(isUpstream);
|
matchingNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType));
|
||||||
|
|
||||||
return matchingNode;
|
return matchingNode;
|
||||||
} else {
|
} else {
|
||||||
// we didn't have this node, so add them
|
// we didn't have this node, so add them
|
||||||
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
|
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
|
||||||
newNode->setIsReplicated(isReplicated);
|
newNode->setIsReplicated(isReplicated);
|
||||||
newNode->setIsUpstream(isUpstream);
|
newNode->setIsUpstream(isUpstream || NodeType::isUpstream(nodeType));
|
||||||
newNode->setConnectionSecret(connectionSecret);
|
newNode->setConnectionSecret(connectionSecret);
|
||||||
newNode->setPermissions(permissions);
|
newNode->setPermissions(permissions);
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,8 @@ void NodeType::init() {
|
||||||
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
|
TypeNameHash.insert(NodeType::MessagesMixer, "Messages Mixer");
|
||||||
TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
|
TypeNameHash.insert(NodeType::AssetServer, "Asset Server");
|
||||||
TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server");
|
TypeNameHash.insert(NodeType::EntityScriptServer, "Entity Script Server");
|
||||||
|
TypeNameHash.insert(NodeType::UpstreamAudioMixer, "Upstream Audio Mixer");
|
||||||
|
TypeNameHash.insert(NodeType::UpstreamAvatarMixer, "Upstream Avatar Mixer");
|
||||||
TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer");
|
TypeNameHash.insert(NodeType::DownstreamAudioMixer, "Downstream Audio Mixer");
|
||||||
TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer");
|
TypeNameHash.insert(NodeType::DownstreamAvatarMixer, "Downstream Avatar Mixer");
|
||||||
TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
|
TypeNameHash.insert(NodeType::Unassigned, "Unassigned");
|
||||||
|
@ -52,8 +54,23 @@ const QString& NodeType::getNodeTypeName(NodeType_t nodeType) {
|
||||||
return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME;
|
return matchedTypeName != TypeNameHash.end() ? matchedTypeName.value() : UNKNOWN_NodeType_t_NAME;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool NodeType::isUpstream(NodeType_t nodeType) {
|
||||||
|
return nodeType == NodeType::UpstreamAudioMixer || nodeType == NodeType::UpstreamAvatarMixer;
|
||||||
|
}
|
||||||
|
|
||||||
bool NodeType::isDownstream(NodeType_t nodeType) {
|
bool NodeType::isDownstream(NodeType_t nodeType) {
|
||||||
return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer;
|
return nodeType == NodeType::DownstreamAudioMixer || nodeType == NodeType::DownstreamAvatarMixer;
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeType_t NodeType::upstreamType(NodeType_t primaryType) {
|
||||||
|
switch (primaryType) {
|
||||||
|
case AudioMixer:
|
||||||
|
return UpstreamAudioMixer;
|
||||||
|
case AvatarMixer:
|
||||||
|
return UpstreamAvatarMixer;
|
||||||
|
default:
|
||||||
|
return Unassigned;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
|
NodeType_t NodeType::downstreamType(NodeType_t primaryType) {
|
||||||
|
|
|
@ -669,9 +669,11 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
|
||||||
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket,
|
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket,
|
||||||
nodeLocalSocket, isReplicated, false, connectionUUID, permissions);
|
nodeLocalSocket, isReplicated, false, connectionUUID, permissions);
|
||||||
|
|
||||||
// nodes that are downstream of our own type are kept alive when we hear about them from the domain server
|
// nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server
|
||||||
if (node->getType() == NodeType::downstreamType(_ownerType)) {
|
// and always have their public socket as their active socket
|
||||||
|
if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) {
|
||||||
node->setLastHeardMicrostamp(usecTimestampNow());
|
node->setLastHeardMicrostamp(usecTimestampNow());
|
||||||
|
node->activatePublicSocket();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,8 @@ namespace NodeType {
|
||||||
const NodeType_t AssetServer = 'A';
|
const NodeType_t AssetServer = 'A';
|
||||||
const NodeType_t MessagesMixer = 'm';
|
const NodeType_t MessagesMixer = 'm';
|
||||||
const NodeType_t EntityScriptServer = 'S';
|
const NodeType_t EntityScriptServer = 'S';
|
||||||
|
const NodeType_t UpstreamAudioMixer = 'B';
|
||||||
|
const NodeType_t UpstreamAvatarMixer = 'C';
|
||||||
const NodeType_t DownstreamAudioMixer = 'a';
|
const NodeType_t DownstreamAudioMixer = 'a';
|
||||||
const NodeType_t DownstreamAvatarMixer = 'w';
|
const NodeType_t DownstreamAvatarMixer = 'w';
|
||||||
const NodeType_t Unassigned = 1;
|
const NodeType_t Unassigned = 1;
|
||||||
|
@ -32,9 +34,12 @@ namespace NodeType {
|
||||||
void init();
|
void init();
|
||||||
|
|
||||||
const QString& getNodeTypeName(NodeType_t nodeType);
|
const QString& getNodeTypeName(NodeType_t nodeType);
|
||||||
|
bool isUpstream(NodeType_t nodeType);
|
||||||
bool isDownstream(NodeType_t nodeType);
|
bool isDownstream(NodeType_t nodeType);
|
||||||
|
NodeType_t upstreamType(NodeType_t primaryType);
|
||||||
NodeType_t downstreamType(NodeType_t primaryType);
|
NodeType_t downstreamType(NodeType_t primaryType);
|
||||||
|
|
||||||
|
|
||||||
NodeType_t fromString(QString type);
|
NodeType_t fromString(QString type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,7 @@ const QHash<PacketType, PacketType> REPLICATED_PACKET_MAPPING {
|
||||||
{ PacketType::SilentAudioFrame, PacketType::ReplicatedSilentAudioFrame },
|
{ PacketType::SilentAudioFrame, PacketType::ReplicatedSilentAudioFrame },
|
||||||
{ PacketType::AvatarIdentity, PacketType::ReplicatedAvatarIdentity },
|
{ PacketType::AvatarIdentity, PacketType::ReplicatedAvatarIdentity },
|
||||||
{ PacketType::KillAvatar, PacketType::ReplicatedKillAvatar },
|
{ PacketType::KillAvatar, PacketType::ReplicatedKillAvatar },
|
||||||
|
{ PacketType::BulkAvatarData, PacketType::ReplicatedBulkAvatarData }
|
||||||
};
|
};
|
||||||
|
|
||||||
PacketVersion versionForPacketType(PacketType packetType) {
|
PacketVersion versionForPacketType(PacketType packetType) {
|
||||||
|
|
Loading…
Reference in a new issue