From 80320635125577c0370c7ebae39872f40525b518 Mon Sep 17 00:00:00 2001 From: Clement Date: Wed, 27 Feb 2019 19:14:36 -0800 Subject: [PATCH] Move to LimitedNodeList --- libraries/networking/src/LimitedNodeList.cpp | 57 +++++++++++++++++++- libraries/networking/src/LimitedNodeList.h | 27 ++++++++++ libraries/networking/src/NodeList.cpp | 56 ++++--------------- libraries/networking/src/NodeList.h | 24 --------- 4 files changed, 93 insertions(+), 71 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index eaa02f059e..f00614141c 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -40,6 +40,9 @@ static Setting::Handle LIMITED_NODELIST_LOCAL_PORT("LimitedNodeList.LocalPort", 0); +using namespace std::chrono_literals; +static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL = 1s; + const std::set SOLO_NODE_TYPES = { NodeType::AvatarMixer, NodeType::AudioMixer, @@ -88,6 +91,11 @@ LimitedNodeList::LimitedNodeList(int socketListenPort, int dtlsListenPort) : connect(statsSampleTimer, &QTimer::timeout, this, &LimitedNodeList::sampleConnectionStats); statsSampleTimer->start(CONNECTION_STATS_SAMPLE_INTERVAL_MSECS); + // Flush delayed adds every second + QTimer* delayedAddsFlushTimer = new QTimer(this); + connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds); + delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL); + // check the local socket right now updateLocalSocket(); @@ -367,7 +375,7 @@ bool LimitedNodeList::packetSourceAndHashMatchAndTrackBandwidth(const udt::Packe return true; - } else { + } else if (!isDelayedNode(sourceID)){ HIFI_FCDEBUG(networking(), "Packet of type" << headerType << "received from unknown node with Local ID" << sourceLocalID); } @@ -736,6 +744,53 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t return newNodePointer; } +void LimitedNodeList::addNewNode(NewNodeInfo info) { + // Throttle connection of new agents. + if (info.type == NodeType::Agent && _nodesAddedInCurrentTimeSlice >= _maxConnectionRate) { + delayNodeAdd(info); + return; + } + + SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket, + info.sessionLocalID, info.isReplicated, false, + info.connectionSecretUUID, info.permissions); + + ++_nodesAddedInCurrentTimeSlice; +} + +void LimitedNodeList::delayNodeAdd(NewNodeInfo info) { + _delayedNodeAdds.push_back(info); +} + +void LimitedNodeList::removeDelayedAdd(QUuid nodeUUID) { + auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) { + return info.uuid == nodeUUID; + }); + if (it != _delayedNodeAdds.end()) { + _delayedNodeAdds.erase(it); + } +} + +bool LimitedNodeList::isDelayedNode(QUuid nodeUUID) { + auto it = std::find_if(_delayedNodeAdds.begin(), _delayedNodeAdds.end(), [&](auto info) { + return info.uuid == nodeUUID; + }); + return it != _delayedNodeAdds.end(); +} + +void LimitedNodeList::processDelayedAdds() { + _nodesAddedInCurrentTimeSlice = 0; + + auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate); + auto firstNodeToAdd = _delayedNodeAdds.begin(); + auto lastNodeToAdd = firstNodeToAdd + nodesToAdd; + + for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) { + addNewNode(*it); + } + _delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd); +} + std::unique_ptr LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) { int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t); diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 450fad96a9..fbd37ae065 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -51,6 +51,8 @@ const int INVALID_PORT = -1; const quint64 NODE_SILENCE_THRESHOLD_MSECS = 5 * 1000; +static const size_t DEFAULT_MAX_CONNECTION_RATE { 50 }; + extern const std::set SOLO_NODE_TYPES; const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost"; @@ -316,6 +318,9 @@ public: void sendFakedHandshakeRequestToNode(SharedNodePointer node); #endif + size_t getMaxConnectionRate() { return _maxConnectionRate; } + void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; } + int getInboundPPS() const { return _inboundPPS; } int getOutboundPPS() const { return _outboundPPS; } float getInboundKbps() const { return _inboundKbps; } @@ -367,7 +372,20 @@ protected slots: void clientConnectionToSockAddrReset(const HifiSockAddr& sockAddr); + void processDelayedAdds(); + protected: + struct NewNodeInfo { + qint8 type; + QUuid uuid; + HifiSockAddr publicSocket; + HifiSockAddr localSocket; + NodePermissions permissions; + bool isReplicated; + Node::LocalID sessionLocalID; + QUuid connectionSecretUUID; + }; + LimitedNodeList(int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT); LimitedNodeList(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton void operator=(LimitedNodeList const&) = delete; // Don't implement, needed to avoid copies of singleton @@ -390,6 +408,11 @@ protected: bool sockAddrBelongsToNode(const HifiSockAddr& sockAddr); + void addNewNode(NewNodeInfo info); + void delayNodeAdd(NewNodeInfo info); + void removeDelayedAdd(QUuid nodeUUID); + bool isDelayedNode(QUuid nodeUUID); + NodeHash _nodeHash; mutable QReadWriteLock _nodeMutex { QReadWriteLock::Recursive }; udt::Socket _nodeSocket; @@ -440,6 +463,10 @@ private: Node::LocalID _sessionLocalID { 0 }; bool _flagTimeForConnectionStep { false }; // only keep track in interface + size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE }; + size_t _nodesAddedInCurrentTimeSlice { 0 }; + std::vector _delayedNodeAdds; + int _inboundPPS { 0 }; int _outboundPPS { 0 }; float _inboundKbps { 0.0f }; diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index 1de710e8ca..4097ddf1a3 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -38,7 +38,6 @@ #include using namespace std::chrono_literals; -static const std::chrono::milliseconds CONNECTION_RATE_INTERVAL = 1s; static const std::chrono::milliseconds KEEPALIVE_PING_INTERVAL = 1s; NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) : @@ -118,11 +117,6 @@ NodeList::NodeList(char newOwnerType, int socketListenPort, int dtlsListenPort) // we definitely want STUN to update our public socket, so call the LNL to kick that off startSTUNPublicSocketUpdate(); - // check for local socket updates every so often - QTimer* delayedAddsFlushTimer = new QTimer(this); - connect(delayedAddsFlushTimer, &QTimer::timeout, this, &NodeList::processDelayedAdds); - delayedAddsFlushTimer->start(CONNECTION_RATE_INTERVAL); - auto& packetReceiver = getPacketReceiver(); packetReceiver.registerListener(PacketType::DomainList, this, "processDomainServerList"); packetReceiver.registerListener(PacketType::Ping, this, "processPingPacket"); @@ -714,6 +708,7 @@ void NodeList::processDomainServerRemovedNode(QSharedPointer me QUuid nodeUUID = QUuid::fromRfc4122(message->readWithoutCopy(NUM_BYTES_RFC4122_UUID)); qCDebug(networking) << "Received packet from domain-server to remove node with UUID" << uuidStringWithoutCurlyBraces(nodeUUID); killNodeWithUUID(nodeUUID); + removeDelayedAdd(nodeUUID); } void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { @@ -734,45 +729,7 @@ void NodeList::parseNodeFromPacketStream(QDataStream& packetStream) { info.publicSocket.setAddress(_domainHandler.getIP()); } - // Throttle connection of new agents. - if (info.type != NodeType::Agent - || _nodesAddedInCurrentTimeSlice < _maxConnectionRate) { - - addNewNode(info); - ++_nodesAddedInCurrentTimeSlice; - } else { - delayNodeAdd(info); - } -} - -void NodeList::addNewNode(NewNodeInfo info) { - SharedNodePointer node = addOrUpdateNode(info.uuid, info.type, info.publicSocket, info.localSocket, - info.sessionLocalID, info.isReplicated, false, - info.connectionSecretUUID, info.permissions); - - // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server - // and always have their public socket as their active socket - if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) { - node->setLastHeardMicrostamp(usecTimestampNow()); - node->activatePublicSocket(); - } -}; - -void NodeList::delayNodeAdd(NewNodeInfo info) { - _delayedNodeAdds.push_back(info); -}; - -void NodeList::processDelayedAdds() { - _nodesAddedInCurrentTimeSlice = 0; - - auto nodesToAdd = glm::min(_delayedNodeAdds.size(), _maxConnectionRate); - auto firstNodeToAdd = _delayedNodeAdds.begin(); - auto lastNodeToAdd = firstNodeToAdd + nodesToAdd; - - for (auto it = firstNodeToAdd; it != lastNodeToAdd; ++it) { - addNewNode(*it); - } - _delayedNodeAdds.erase(firstNodeToAdd, lastNodeToAdd); + addNewNode(info); } void NodeList::sendAssignment(Assignment& assignment) { @@ -819,7 +776,6 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::startNodeHolePunch(const SharedNodePointer& node) { - // we don't hole punch to downstream servers, since it is assumed that we have a direct line to them // we also don't hole punch to relayed upstream nodes, since we do not communicate directly with them @@ -833,6 +789,14 @@ void NodeList::startNodeHolePunch(const SharedNodePointer& node) { // ping this node immediately pingPunchForInactiveNode(node); } + + // nodes that are downstream or upstream of our own type are kept alive when we hear about them from the domain server + // and always have their public socket as their active socket + if (node->getType() == NodeType::downstreamType(_ownerType) || node->getType() == NodeType::upstreamType(_ownerType)) { + node->setLastHeardMicrostamp(usecTimestampNow()); + node->activatePublicSocket(); + } + } void NodeList::handleNodePingTimeout() { diff --git a/libraries/networking/src/NodeList.h b/libraries/networking/src/NodeList.h index f913b7c3b9..e135bc937d 100644 --- a/libraries/networking/src/NodeList.h +++ b/libraries/networking/src/NodeList.h @@ -38,8 +38,6 @@ const quint64 DOMAIN_SERVER_CHECK_IN_MSECS = 1 * 1000; -static const size_t DEFAULT_MAX_CONNECTION_RATE { 50 }; - using PacketOrPacketList = std::pair, std::unique_ptr>; using NodePacketOrPacketListPair = std::pair; @@ -95,9 +93,6 @@ public: bool getSendDomainServerCheckInEnabled() { return _sendDomainServerCheckInEnabled; } void setSendDomainServerCheckInEnabled(bool enabled) { _sendDomainServerCheckInEnabled = enabled; } - size_t getMaxConnectionRate() { return _maxConnectionRate; } - void setMaxConnectionRate(size_t rate) { _maxConnectionRate = rate; } - void removeFromIgnoreMuteSets(const QUuid& nodeID); virtual bool isDomainServer() const override { return false; } @@ -151,17 +146,6 @@ private slots: void maybeSendIgnoreSetToNode(SharedNodePointer node); private: - struct NewNodeInfo { - qint8 type; - QUuid uuid; - HifiSockAddr publicSocket; - HifiSockAddr localSocket; - NodePermissions permissions; - bool isReplicated; - Node::LocalID sessionLocalID; - QUuid connectionSecretUUID; - }; - NodeList() : LimitedNodeList(INVALID_PORT, INVALID_PORT) { assert(false); } // Not implemented, needed for DependencyManager templates compile NodeList(char ownerType, int socketListenPort = INVALID_PORT, int dtlsListenPort = INVALID_PORT); NodeList(NodeList const&) = delete; // Don't implement, needed to avoid copies of singleton @@ -180,10 +164,6 @@ private: bool sockAddrBelongsToDomainOrNode(const HifiSockAddr& sockAddr); - void addNewNode(NewNodeInfo info); - void delayNodeAdd(NewNodeInfo info); - void processDelayedAdds(); - std::atomic _ownerType; NodeSet _nodeTypesOfInterest; DomainHandler _domainHandler; @@ -201,10 +181,6 @@ private: mutable QReadWriteLock _avatarGainMapLock; tbb::concurrent_unordered_map _avatarGainMap; - size_t _maxConnectionRate { DEFAULT_MAX_CONNECTION_RATE }; - size_t _nodesAddedInCurrentTimeSlice { 0 }; - std::vector _delayedNodeAdds; - void sendIgnoreRadiusStateToNode(const SharedNodePointer& destinationNode); #if defined(Q_OS_ANDROID) Setting::Handle _ignoreRadiusEnabled { "IgnoreRadiusEnabled", false };