Merge commit 'a804d3532e6b04fb86e5e520147bd34c331b3f36'

This commit is contained in:
Roxanne Skelly 2019-03-05 15:08:39 -08:00
commit 38981a5f25
6 changed files with 132 additions and 38 deletions

View file

@ -954,6 +954,14 @@ void AvatarMixer::parseDomainServerSettings(const QJsonObject& domainSettings) {
qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads."; qCDebug(avatars) << "Avatar mixer will automatically determine number of threads to use. Using:" << _slavePool.numThreads() << "threads.";
} }
{
const QString CONNECTION_RATE = "connection_rate";
auto nodeList = DependencyManager::get<NodeList>();
auto defaultConnectionRate = nodeList->getMaxConnectionRate();
int connectionRate = avatarMixerGroupObject[CONNECTION_RATE].toInt((int)defaultConnectionRate);
nodeList->setMaxConnectionRate(connectionRate);
}
const QString AVATARS_SETTINGS_KEY = "avatars"; const QString AVATARS_SETTINGS_KEY = "avatars";
static const QString MIN_HEIGHT_OPTION = "min_avatar_height"; static const QString MIN_HEIGHT_OPTION = "min_avatar_height";

View file

@ -1302,6 +1302,14 @@
"placeholder": "1", "placeholder": "1",
"default": "1", "default": "1",
"advanced": true "advanced": true
},
{
"name": "connection_rate",
"label": "Connection Rate",
"help": "Number of new agents that can connect to the mixer every second",
"placeholder": "50",
"default": "50",
"advanced": true
} }
] ]
}, },

View file

@ -1243,12 +1243,11 @@ void DomainServer::broadcastNewNode(const SharedNodePointer& addedNode) {
limitedNodeList->eachMatchingNode( limitedNodeList->eachMatchingNode(
[this, addedNode](const SharedNodePointer& node)->bool { [this, addedNode](const SharedNodePointer& node)->bool {
if (node->getLinkedData() && node->getActiveSocket() && node != addedNode) { // is the added Node in this node's interest list?
// is the added Node in this node's interest list? return node->getLinkedData()
return isInInterestSet(node, addedNode); && node->getActiveSocket()
} else { && node != addedNode
return false; && isInInterestSet(node, addedNode);
}
}, },
[this, &addNodePacket, connectionSecretIndex, addedNode, limitedNodeListWeak](const SharedNodePointer& node) { [this, &addNodePacket, connectionSecretIndex, addedNode, limitedNodeListWeak](const SharedNodePointer& node) {
// send off this packet to the node // send off this packet to the node

View file

@ -40,6 +40,9 @@
static Setting::Handle<quint16> LIMITED_NODELIST_LOCAL_PORT("LimitedNodeList.LocalPort", 0); static Setting::Handle<quint16> LIMITED_NODELIST_LOCAL_PORT("LimitedNodeList.LocalPort", 0);
using namespace std::chrono_literals;
static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL_MS = 1s;
const std::set<NodeType_t> SOLO_NODE_TYPES = { const std::set<NodeType_t> SOLO_NODE_TYPES = {
NodeType::AvatarMixer, NodeType::AvatarMixer,
NodeType::AudioMixer, NodeType::AudioMixer,
@ -88,6 +91,11 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) :
connect(statsSampleTimer, &QTimer::timeout, this, &LimitedNodeList::sampleConnectionStats); connect(statsSampleTimer, &QTimer::timeout, this, &LimitedNodeList::sampleConnectionStats);
statsSampleTimer->start(CONNECTION_STATS_SAMPLE_INTERVAL_MSECS); statsSampleTimer->start(CONNECTION_STATS_SAMPLE_INTERVAL_MSECS);
// Flush delayed adds every second
QTimer* delayedAddsFlushTimer = new QTimer(this);
connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds);
delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL_MS.count());
// check the local socket right now // check the local socket right now
updateLocalSocket(); updateLocalSocket();
@ -367,7 +375,7 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe
return true; return true;
} else { } else if (!isDelayedNode(sourceID)){
HIFI_FCDEBUG(networking(), HIFI_FCDEBUG(networking(),
"Packet of type" << headerType << "received from unknown node with Local ID" << sourceLocalID); "Packet of type" << headerType << "received from unknown node with Local ID" << sourceLocalID);
} }
@ -736,6 +744,53 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
return newNodePointer; return newNodePointer;
} }
void LimitedNodeList::addNewNode(NewNodeInfo info) {
// Throttle connection of new agents.
if (info.type == NodeType::Agent && _nodesAddedInCurrentTimeSlice >= _maxConnectionRate) {
delayNodeAdd(info);
return;
}
SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket,
info.sessionLocalID, info.isReplicated, false,
info.connectionSecretUUID, info.permissions);
++_nodesAddedInCurrentTimeSlice;
}
void LimitedNodeList::delayNodeAdd(NewNodeInfo info) {
_delayedNodeAdds.push_back(info);
}
void LimitedNodeList::removeDelayedAdd(QUuid nodeUUID) {
auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) {
return info.uuid == nodeUUID;
});
if (it != _delayedNodeAdds.end()) {
_delayedNodeAdds.erase(it);
}
}
bool LimitedNodeList::isDelayedNode(QUuid nodeUUID) {
auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) {
return info.uuid == nodeUUID;
});
return it != _delayedNodeAdds.end();
}
void LimitedNodeList::processDelayedAdds() {
_nodesAddedInCurrentTimeSlice = 0;
auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate);
auto firstNodeToAdd = _delayedNodeAdds.begin();
auto lastNodeToAdd = firstNodeToAdd + nodesToAdd;
for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) {
addNewNode(*it);
}
_delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd);
}
std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) { std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) {
int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t); int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t);
@ -793,13 +848,13 @@ unsigned int LimitedNodeList::broadcastToNodes(std::unique_ptr<NLPacket> packet,
eachNode([&](const SharedNodePointer& node){ eachNode([&](const SharedNodePointer& node){
if (node && destinationNodeTypes.contains(node->getType())) { if (node && destinationNodeTypes.contains(node->getType())) {
if (packet->isReliable()) { if (packet->isReliable()) {
auto packetCopy = NLPacket::createCopy(*packet); auto packetCopy = NLPacket::createCopy(*packet);
sendPacket(std::move(packetCopy), *node); sendPacket(std::move(packetCopy), *node);
} else { } else {
sendUnreliablePacket(*packet, *node); sendUnreliablePacket(*packet, *node);
} }
++n; ++n;
} }
}); });

View file

@ -51,6 +51,8 @@ const int INVALID_PORT = -1;
const quint64 NODE_SILENCE_THRESHOLD_MSECS = 5 * 1000; const quint64 NODE_SILENCE_THRESHOLD_MSECS = 5 * 1000;
static const size_t DEFAULT_MAX_CONNECTION_RATE { std::numeric_limits<size_t>::max() };
extern const std::set<NodeType_t> SOLO_NODE_TYPES; extern const std::set<NodeType_t> SOLO_NODE_TYPES;
const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost"; const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost";
@ -316,6 +318,9 @@ public:
void sendFakedHandshakeRequestToNode(SharedNodePointer node); void sendFakedHandshakeRequestToNode(SharedNodePointer node);
#endif #endif
size_t getMaxConnectionRate() const { return _maxConnectionRate; }
void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; }
int getInboundPPS() const { return _inboundPPS; } int getInboundPPS() const { return _inboundPPS; }
int getOutboundPPS() const { return _outboundPPS; } int getOutboundPPS() const { return _outboundPPS; }
float getInboundKbps() const { return _inboundKbps; } float getInboundKbps() const { return _inboundKbps; }
@ -367,7 +372,20 @@ protected slots:
void clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr); void clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr);
void processDelayedAdds();
protected: protected:
struct NewNodeInfo {
qint8 type;
QUuid uuid;
HifiSockAddr publicSocket;
HifiSockAddr localSocket;
NodePermissions permissions;
bool isReplicated;
Node::LocalID sessionLocalID;
QUuid connectionSecretUUID;
};
LimitedNodeList(int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT); LimitedNodeList(int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT);
LimitedNodeList(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton LimitedNodeList(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton
void operator=(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton void operator=(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton
@ -390,6 +408,11 @@ protected:
bool sockAddrBelongsToNode(const HifiSockAddr& sockAddr); bool sockAddrBelongsToNode(const HifiSockAddr& sockAddr);
void addNewNode(NewNodeInfo info);
void delayNodeAdd(NewNodeInfo info);
void removeDelayedAdd(QUuid nodeUUID);
bool isDelayedNode(QUuid nodeUUID);
NodeHash _nodeHash; NodeHash _nodeHash;
mutable QReadWriteLock _nodeMutex { QReadWriteLock::Recursive }; mutable QReadWriteLock _nodeMutex { QReadWriteLock::Recursive };
udt::Socket _nodeSocket; udt::Socket _nodeSocket;
@ -440,6 +463,10 @@ private:
Node::LocalID _sessionLocalID { 0 }; Node::LocalID _sessionLocalID { 0 };
bool _flagTimeForConnectionStep { false }; // only keep track in interface bool _flagTimeForConnectionStep { false }; // only keep track in interface
size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE };
size_t _nodesAddedInCurrentTimeSlice { 0 };
std::vector<NewNodeInfo> _delayedNodeAdds;
int _inboundPPS { 0 }; int _inboundPPS { 0 };
int _outboundPPS { 0 }; int _outboundPPS { 0 };
float _inboundKbps { 0.0f }; float _inboundKbps { 0.0f };

View file

@ -200,7 +200,6 @@ void NodeList::timePingReply(ReceivedMessage& message, const SharedNodePointer&
} }
void NodeList::processPingPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) { void NodeList::processPingPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
// send back a reply // send back a reply
auto replyPacket = constructPingReplyPacket(*message); auto replyPacket = constructPingReplyPacket(*message);
const HifiSockAddr& senderSockAddr = message->getSenderSockAddr(); const HifiSockAddr& senderSockAddr = message->getSenderSockAddr();
@ -708,37 +707,28 @@ void NodeList::processDomainServerRemovedNode(QSharedPointer<ReceivedMessage> me
QUuid nodeUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); QUuid nodeUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID));
qCDebug(networking) << "Received packet from domain-server to remove node with UUID" << uuidStringWithoutCurlyBraces(nodeUUID); qCDebug(networking) << "Received packet from domain-server to remove node with UUID" << uuidStringWithoutCurlyBraces(nodeUUID);
killNodeWithUUID(nodeUUID); killNodeWithUUID(nodeUUID);
removeDelayedAdd(nodeUUID);
} }
void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) {
// setup variables to read into from QDataStream NewNodeInfo info;
qint8 nodeType;
QUuid nodeUUID, connectionSecretUUID;
HifiSockAddr nodePublicSocket, nodeLocalSocket;
NodePermissions permissions;
bool isReplicated;
Node::LocalID sessionLocalID;
packetStream >> nodeType >> nodeUUID >> nodePublicSocket >> nodeLocalSocket >> permissions packetStream >> info.type
>> isReplicated >> sessionLocalID; >> info.uuid
>> info.publicSocket
>> info.localSocket
>> info.permissions
>> info.isReplicated
>> info.sessionLocalID
>> info.connectionSecretUUID;
// if the public socket address is 0 then it's reachable at the same IP // if the public socket address is 0 then it's reachable at the same IP
// as the domain server // as the domain server
if (nodePublicSocket.getAddress().isNull()) { if (info.publicSocket.getAddress().isNull()) {
nodePublicSocket.setAddress(_domainHandler.getIP()); info.publicSocket.setAddress(_domainHandler.getIP());
} }
packetStream >> connectionSecretUUID; addNewNode(info);
SharedNodePointer node = addOrUpdateNode(nodeUUID, nodeType, nodePublicSocket, nodeLocalSocket,
sessionLocalID, isReplicated, false, connectionSecretUUID, permissions);
// nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server
// and always have their public socket as their active socket
if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) {
node->setLastHeardMicrostamp(usecTimestampNow());
node->activatePublicSocket();
}
} }
void NodeList::sendAssignment(Assignment& assignment) { void NodeList::sendAssignment(Assignment& assignment) {
@ -785,7 +775,6 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
} }
void NodeList::startNodeHolePunch(const SharedNodePointer& node) { void NodeList::startNodeHolePunch(const SharedNodePointer& node) {
// we don't hole punch to downstream servers, since it is assumed that we have a direct line to them // we don't hole punch to downstream servers, since it is assumed that we have a direct line to them
// we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them // we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them
@ -799,6 +788,14 @@ void NodeList::startNodeHolePunch(const SharedNodePointer& node) {
// ping this node immediately // ping this node immediately
pingPunchForInactiveNode(node); pingPunchForInactiveNode(node);
} }
// nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server
// and always have their public socket as their active socket
if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) {
node->setLastHeardMicrostamp(usecTimestampNow());
node->activatePublicSocket();
}
} }
void NodeList::handleNodePingTimeout() { void NodeList::handleNodePingTimeout() {