From 2546ff91ca4427de972968794fdccb55924dc9d4 Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Wed, 28 Mar 2018 15:05:51 -0700 Subject: [PATCH 1/3] Fix nodelist connections not resetting on both ends This change adds a connection ID to ping packets. Each node keeps a connection id for each other node that it has connected to. When a node is removed the connection id is incremented. If a node sees another node with a higher connection id, it will reset its connection with the new connection id, ensuring that local state is reset on both ends when nodes lose contact. --- domain-server/src/DomainGatekeeper.cpp | 12 ++++++--- libraries/networking/src/LimitedNodeList.cpp | 27 +++++++++++++++----- libraries/networking/src/LimitedNodeList.h | 11 +++++--- libraries/networking/src/NodeList.cpp | 24 ++++++++++++++--- 4 files changed, 58 insertions(+), 16 deletions(-) diff --git a/domain-server/src/DomainGatekeeper.cpp b/domain-server/src/DomainGatekeeper.cpp index 7d0b538f6e..9f24036e92 100644 --- a/domain-server/src/DomainGatekeeper.cpp +++ b/domain-server/src/DomainGatekeeper.cpp @@ -451,11 +451,12 @@ SharedNodePointer DomainGatekeeper::processAgentConnectRequest(const NodeConnect return SharedNodePointer(); } - QUuid hintNodeID; + QUuid existingNodeID; // in case this is a node that's failing to connect // double check we don't have the same node whose sockets match exactly already in the list limitedNodeList->eachNodeBreakable([&](const SharedNodePointer& node){ + if (node->getPublicSocket() == nodeConnection.publicSockAddr && node->getLocalSocket() == nodeConnection.localSockAddr) { // we have a node that already has these exact sockets - this can occur if a node // is failing to connect to the domain @@ -465,15 +466,20 @@ SharedNodePointer DomainGatekeeper::processAgentConnectRequest(const NodeConnect auto existingNodeData = static_cast(node->getLinkedData()); if (existingNodeData->getUsername() == username) { - hintNodeID = node->getUUID(); + qDebug() << "Deleting existing connection from same sockaddr: " << node->getUUID(); + existingNodeID = node->getUUID(); return false; } } return true; }); + if (!existingNodeID.isNull()) { + limitedNodeList->killNodeWithUUID(existingNodeID); + } + // add the connecting node (or re-use the matched one from eachNodeBreakable above) - SharedNodePointer newNode = addVerifiedNodeFromConnectRequest(nodeConnection, hintNodeID); + SharedNodePointer newNode = addVerifiedNodeFromConnectRequest(nodeConnection); // set the edit rights for this user newNode->setPermissions(userPerms); diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 0803e380f2..e27e2d6d08 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -578,9 +578,10 @@ void LimitedNodeList::reset() { // we need to make sure any socket connections are gone so wait on that here _nodeSocket.clearConnections(); + _connectionIDs.clear(); } -bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) { +bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID) { QReadLocker readLocker(&_nodeMutex); NodeHash::iterator it = _nodeHash.find(nodeUUID); @@ -594,7 +595,7 @@ bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) { _nodeHash.unsafe_erase(it); } - handleNodeKill(matchingNode); + handleNodeKill(matchingNode, newConnectionID); return true; } @@ -609,7 +610,7 @@ void LimitedNodeList::processKillNode(ReceivedMessage& message) { killNodeWithUUID(nodeUUID); } -void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) { +void LimitedNodeList::handleNodeKill(const SharedNodePointer& node, ConnectionID nextConnectionID) { qCDebug(networking) << "Killed" << *node; node->stopPingTimer(); emit nodeKilled(node); @@ -617,6 +618,15 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) { if (auto activeSocket = node->getActiveSocket()) { _nodeSocket.cleanupConnection(*activeSocket); } + + auto it = _connectionIDs.find(node->getUUID()); + if (it != _connectionIDs.end()) { + if (nextConnectionID == NULL_CONNECTION_ID) { + it->second++; + } else { + it->second = nextConnectionID; + } + } } SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, @@ -638,6 +648,11 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t return matchingNode; } else { + auto it = _connectionIDs.find(uuid); + if (it == _connectionIDs.end()) { + _connectionIDs[uuid] = INITIAL_CONNECTION_ID; + } + // we didn't have this node, so add them Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); newNode->setIsReplicated(isReplicated); @@ -712,13 +727,13 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t } } -std::unique_ptr LimitedNodeList::constructPingPacket(PingType_t pingType) { - int packetSize = sizeof(PingType_t) + sizeof(quint64); +std::unique_ptr LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) { + int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t); auto pingPacket = NLPacket::create(PacketType::Ping, packetSize); - pingPacket->writePrimitive(pingType); pingPacket->writePrimitive(usecTimestampNow()); + pingPacket->writePrimitive(_connectionIDs[nodeId]); return pingPacket; } diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 7165b3dd63..7ec3a41450 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -66,6 +66,10 @@ const QHostAddress DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME = QHostAddress::Lo const QString USERNAME_UUID_REPLACEMENT_STATS_KEY = "$username"; +using ConnectionID = int64_t; +const ConnectionID NULL_CONNECTION_ID { -1 }; +const ConnectionID INITIAL_CONNECTION_ID { 0 }; + typedef std::pair UUIDNodePair; typedef tbb::concurrent_unordered_map NodeHash; @@ -180,7 +184,7 @@ public: void getPacketStats(float& packetsInPerSecond, float& bytesInPerSecond, float& packetsOutPerSecond, float& bytesOutPerSecond); void resetPacketStats(); - std::unique_ptr constructPingPacket(PingType_t pingType = PingType::Agnostic); + std::unique_ptr constructPingPacket(const QUuid& nodeId, PingType_t pingType = PingType::Agnostic); std::unique_ptr constructPingReplyPacket(ReceivedMessage& message); static std::unique_ptr constructICEPingPacket(PingType_t pingType, const QUuid& iceID); @@ -319,7 +323,7 @@ public slots: void startSTUNPublicSocketUpdate(); virtual void sendSTUNRequest(); - bool killNodeWithUUID(const QUuid& nodeUUID); + bool killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID = NULL_CONNECTION_ID); signals: void dataSent(quint8 channelType, int bytes); @@ -371,7 +375,7 @@ protected: bool packetSourceAndHashMatchAndTrackBandwidth(const udt::Packet& packet, Node* sourceNode = nullptr); void processSTUNResponse(std::unique_ptr packet); - void handleNodeKill(const SharedNodePointer& node); + void handleNodeKill(const SharedNodePointer& node, ConnectionID newConnectionID = NULL_CONNECTION_ID); void stopInitialSTUNUpdate(bool success); @@ -418,6 +422,7 @@ protected: } } + std::unordered_map _connectionIDs; private slots: void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp); diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index cb0d2e4cd5..d33a81841a 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -214,6 +214,20 @@ void NodeList::processPingPacket(QSharedPointer message, Shared sendingNode->setSymmetricSocket(senderSockAddr); } } + + int64_t connectionId; + + message->readPrimitive(&connectionId); + + auto it = _connectionIDs.find(sendingNode->getUUID()); + if (it != _connectionIDs.end()) { + if (connectionId > it->second) { + qDebug() << "Received a ping packet with a larger connection id (" << connectionId << ">" << it->second << ") from " + << sendingNode->getUUID(); + killNodeWithUUID(sendingNode->getUUID(), connectionId); + } + } + } void NodeList::processPingReplyPacket(QSharedPointer message, SharedNodePointer sendingNode) { @@ -689,16 +703,18 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { if (node->getConnectionAttempts() > 0 && node->getConnectionAttempts() % NUM_DEBUG_CONNECTION_ATTEMPTS == 0) { qCDebug(networking) << "No response to UDP hole punch pings for node" << node->getUUID() << "in last second."; } + + auto nodeId = node->getUUID(); // send the ping packet to the local and public sockets for this node - auto localPingPacket = constructPingPacket(PingType::Local); + auto localPingPacket = constructPingPacket(nodeId, PingType::Local); sendPacket(std::move(localPingPacket), *node, node->getLocalSocket()); - auto publicPingPacket = constructPingPacket(PingType::Public); + auto publicPingPacket = constructPingPacket(nodeId, PingType::Public); sendPacket(std::move(publicPingPacket), *node, node->getPublicSocket()); if (!node->getSymmetricSocket().isNull()) { - auto symmetricPingPacket = constructPingPacket(PingType::Symmetric); + auto symmetricPingPacket = constructPingPacket(nodeId, PingType::Symmetric); sendPacket(std::move(symmetricPingPacket), *node, node->getSymmetricSocket()); } @@ -768,7 +784,7 @@ void NodeList::sendKeepAlivePings() { auto type = node->getType(); return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type); }, [&](const SharedNodePointer& node) { - sendPacket(constructPingPacket(), *node); + sendPacket(constructPingPacket(node->getUUID()), *node); }); } From e9d291257677c20944148faae66a22497c6000ff Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Mon, 16 Apr 2018 12:03:58 -0700 Subject: [PATCH 2/3] Update variable naming in NodeList from Id to ID --- libraries/networking/src/NodeList.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index d33a81841a..e099a2c527 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -215,16 +215,16 @@ void NodeList::processPingPacket(QSharedPointer message, Shared } } - int64_t connectionId; + int64_t connectionID; - message->readPrimitive(&connectionId); + message->readPrimitive(&connectionID); auto it = _connectionIDs.find(sendingNode->getUUID()); if (it != _connectionIDs.end()) { - if (connectionId > it->second) { - qDebug() << "Received a ping packet with a larger connection id (" << connectionId << ">" << it->second << ") from " + if (connectionID > it->second) { + qDebug() << "Received a ping packet with a larger connection id (" << connectionID << ">" << it->second << ") from " << sendingNode->getUUID(); - killNodeWithUUID(sendingNode->getUUID(), connectionId); + killNodeWithUUID(sendingNode->getUUID(), connectionID); } } @@ -704,17 +704,17 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { qCDebug(networking) << "No response to UDP hole punch pings for node" << node->getUUID() << "in last second."; } - auto nodeId = node->getUUID(); + auto nodeID = node->getUUID(); // send the ping packet to the local and public sockets for this node - auto localPingPacket = constructPingPacket(nodeId, PingType::Local); + auto localPingPacket = constructPingPacket(nodeID, PingType::Local); sendPacket(std::move(localPingPacket), *node, node->getLocalSocket()); - auto publicPingPacket = constructPingPacket(nodeId, PingType::Public); + auto publicPingPacket = constructPingPacket(nodeID, PingType::Public); sendPacket(std::move(publicPingPacket), *node, node->getPublicSocket()); if (!node->getSymmetricSocket().isNull()) { - auto symmetricPingPacket = constructPingPacket(nodeId, PingType::Symmetric); + auto symmetricPingPacket = constructPingPacket(nodeID, PingType::Symmetric); sendPacket(std::move(symmetricPingPacket), *node, node->getSymmetricSocket()); } From bef4eb1d05c8a6b98dbb7fdcf9d1f20585bf0de3 Mon Sep 17 00:00:00 2001 From: Ryan Huffman Date: Thu, 19 Apr 2018 14:03:51 -0700 Subject: [PATCH 3/3] Increase packet version for Ping --- libraries/networking/src/udt/PacketHeaders.cpp | 2 ++ libraries/networking/src/udt/PacketHeaders.h | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/libraries/networking/src/udt/PacketHeaders.cpp b/libraries/networking/src/udt/PacketHeaders.cpp index a83924ee58..11b2c516f8 100644 --- a/libraries/networking/src/udt/PacketHeaders.cpp +++ b/libraries/networking/src/udt/PacketHeaders.cpp @@ -75,6 +75,8 @@ PacketVersion versionForPacketType(PacketType packetType) { return static_cast(IcePingVersion::SendICEPeerID); case PacketType::DomainSettings: return 18; // replace min_avatar_scale and max_avatar_scale with min_avatar_height and max_avatar_height + case PacketType::Ping: + return static_cast(PingVersion::IncludeConnectionID); default: return 17; } diff --git a/libraries/networking/src/udt/PacketHeaders.h b/libraries/networking/src/udt/PacketHeaders.h index 09fd31a41e..091fcb1091 100644 --- a/libraries/networking/src/udt/PacketHeaders.h +++ b/libraries/networking/src/udt/PacketHeaders.h @@ -322,4 +322,8 @@ enum class IcePingVersion : PacketVersion { SendICEPeerID = 18 }; +enum class PingVersion : PacketVersion { + IncludeConnectionID = 18 +}; + #endif // hifi_PacketHeaders_h