Fix nodelist connections not resetting on both ends

This change adds a connection ID to ping packets. Each node keeps a
connection id for each other node that it has connected to. When a node
is removed the connection id is incremented. If a node sees another node
with a higher connection id, it will reset its connection with the new
connection id, ensuring that local state is reset on both ends when
nodes lose contact.
This commit is contained in:
Ryan Huffman 2018-03-28 15:05:51 -07:00
parent 46fb2033f2
commit 2546ff91ca
4 changed files with 58 additions and 16 deletions

View file

@ -451,11 +451,12 @@ SharedNodePointer DomainGatekeeper::processAgentConnectRequest(const NodeConnect
return SharedNodePointer();
}
QUuid hintNodeID;
QUuid existingNodeID;
// in case this is a node that's failing to connect
// double check we don't have the same node whose sockets match exactly already in the list
limitedNodeList->eachNodeBreakable([&](const SharedNodePointer& node){
if (node->getPublicSocket() == nodeConnection.publicSockAddr && node->getLocalSocket() == nodeConnection.localSockAddr) {
// we have a node that already has these exact sockets - this can occur if a node
// is failing to connect to the domain
@ -465,15 +466,20 @@ SharedNodePointer DomainGatekeeper::processAgentConnectRequest(const NodeConnect
auto existingNodeData = static_cast<DomainServerNodeData*>(node->getLinkedData());
if (existingNodeData->getUsername() == username) {
hintNodeID = node->getUUID();
qDebug() << "Deleting existing connection from same sockaddr: " << node->getUUID();
existingNodeID = node->getUUID();
return false;
}
}
return true;
});
if (!existingNodeID.isNull()) {
limitedNodeList->killNodeWithUUID(existingNodeID);
}
// add the connecting node (or re-use the matched one from eachNodeBreakable above)
SharedNodePointer newNode = addVerifiedNodeFromConnectRequest(nodeConnection, hintNodeID);
SharedNodePointer newNode = addVerifiedNodeFromConnectRequest(nodeConnection);
// set the edit rights for this user
newNode->setPermissions(userPerms);

View file

@ -578,9 +578,10 @@ void LimitedNodeList::reset() {
// we need to make sure any socket connections are gone so wait on that here
_nodeSocket.clearConnections();
_connectionIDs.clear();
}
bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID) {
QReadLocker readLocker(&_nodeMutex);
NodeHash::iterator it = _nodeHash.find(nodeUUID);
@ -594,7 +595,7 @@ bool LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
_nodeHash.unsafe_erase(it);
}
handleNodeKill(matchingNode);
handleNodeKill(matchingNode, newConnectionID);
return true;
}
@ -609,7 +610,7 @@ void LimitedNodeList::processKillNode(ReceivedMessage& message) {
killNodeWithUUID(nodeUUID);
}
void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) {
void LimitedNodeList::handleNodeKill(const SharedNodePointer& node, ConnectionID nextConnectionID) {
qCDebug(networking) << "Killed" << *node;
node->stopPingTimer();
emit nodeKilled(node);
@ -617,6 +618,15 @@ void LimitedNodeList::handleNodeKill(const SharedNodePointer& node) {
if (auto activeSocket = node->getActiveSocket()) {
_nodeSocket.cleanupConnection(*activeSocket);
}
auto it = _connectionIDs.find(node->getUUID());
if (it != _connectionIDs.end()) {
if (nextConnectionID == NULL_CONNECTION_ID) {
it->second++;
} else {
it->second = nextConnectionID;
}
}
}
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
@ -638,6 +648,11 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
return matchingNode;
} else {
auto it = _connectionIDs.find(uuid);
if (it == _connectionIDs.end()) {
_connectionIDs[uuid] = INITIAL_CONNECTION_ID;
}
// we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
newNode->setIsReplicated(isReplicated);
@ -712,13 +727,13 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
}
}
std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(PingType_t pingType) {
int packetSize = sizeof(PingType_t) + sizeof(quint64);
std::unique_ptr<NLPacket> LimitedNodeList::constructPingPacket(const QUuid& nodeId, PingType_t pingType) {
int packetSize = sizeof(PingType_t) + sizeof(quint64) + sizeof(int64_t);
auto pingPacket = NLPacket::create(PacketType::Ping, packetSize);
pingPacket->writePrimitive(pingType);
pingPacket->writePrimitive(usecTimestampNow());
pingPacket->writePrimitive(_connectionIDs[nodeId]);
return pingPacket;
}

View file

@ -66,6 +66,10 @@ const QHostAddress DEFAULT_ASSIGNMENT_CLIENT_MONITOR_HOSTNAME = QHostAddress::Lo
const QString USERNAME_UUID_REPLACEMENT_STATS_KEY = "$username";
using ConnectionID = int64_t;
const ConnectionID NULL_CONNECTION_ID { -1 };
const ConnectionID INITIAL_CONNECTION_ID { 0 };
typedef std::pair<QUuid, SharedNodePointer> UUIDNodePair;
typedef tbb::concurrent_unordered_map<QUuid, SharedNodePointer, UUIDHasher> NodeHash;
@ -180,7 +184,7 @@ public:
void getPacketStats(float& packetsInPerSecond, float& bytesInPerSecond, float& packetsOutPerSecond, float& bytesOutPerSecond);
void resetPacketStats();
std::unique_ptr<NLPacket> constructPingPacket(PingType_t pingType = PingType::Agnostic);
std::unique_ptr<NLPacket> constructPingPacket(const QUuid& nodeId, PingType_t pingType = PingType::Agnostic);
std::unique_ptr<NLPacket> constructPingReplyPacket(ReceivedMessage& message);
static std::unique_ptr<NLPacket> constructICEPingPacket(PingType_t pingType, const QUuid& iceID);
@ -319,7 +323,7 @@ public slots:
void startSTUNPublicSocketUpdate();
virtual void sendSTUNRequest();
bool killNodeWithUUID(const QUuid& nodeUUID);
bool killNodeWithUUID(const QUuid& nodeUUID, ConnectionID newConnectionID = NULL_CONNECTION_ID);
signals:
void dataSent(quint8 channelType, int bytes);
@ -371,7 +375,7 @@ protected:
bool packetSourceAndHashMatchAndTrackBandwidth(const udt::Packet& packet, Node* sourceNode = nullptr);
void processSTUNResponse(std::unique_ptr<udt::BasePacket> packet);
void handleNodeKill(const SharedNodePointer& node);
void handleNodeKill(const SharedNodePointer& node, ConnectionID newConnectionID = NULL_CONNECTION_ID);
void stopInitialSTUNUpdate(bool success);
@ -418,6 +422,7 @@ protected:
}
}
std::unordered_map<QUuid, ConnectionID> _connectionIDs;
private slots:
void flagTimeForConnectionStep(ConnectionStep connectionStep, quint64 timestamp);

View file

@ -214,6 +214,20 @@ void NodeList::processPingPacket(QSharedPointer<ReceivedMessage> message, Shared
sendingNode->setSymmetricSocket(senderSockAddr);
}
}
int64_t connectionId;
message->readPrimitive(&connectionId);
auto it = _connectionIDs.find(sendingNode->getUUID());
if (it != _connectionIDs.end()) {
if (connectionId > it->second) {
qDebug() << "Received a ping packet with a larger connection id (" << connectionId << ">" << it->second << ") from "
<< sendingNode->getUUID();
killNodeWithUUID(sendingNode->getUUID(), connectionId);
}
}
}
void NodeList::processPingReplyPacket(QSharedPointer<ReceivedMessage> message, SharedNodePointer sendingNode) {
@ -689,16 +703,18 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
if (node->getConnectionAttempts() > 0 && node->getConnectionAttempts() % NUM_DEBUG_CONNECTION_ATTEMPTS == 0) {
qCDebug(networking) << "No response to UDP hole punch pings for node" << node->getUUID() << "in last second.";
}
auto nodeId = node->getUUID();
// send the ping packet to the local and public sockets for this node
auto localPingPacket = constructPingPacket(PingType::Local);
auto localPingPacket = constructPingPacket(nodeId, PingType::Local);
sendPacket(std::move(localPingPacket), *node, node->getLocalSocket());
auto publicPingPacket = constructPingPacket(PingType::Public);
auto publicPingPacket = constructPingPacket(nodeId, PingType::Public);
sendPacket(std::move(publicPingPacket), *node, node->getPublicSocket());
if (!node->getSymmetricSocket().isNull()) {
auto symmetricPingPacket = constructPingPacket(PingType::Symmetric);
auto symmetricPingPacket = constructPingPacket(nodeId, PingType::Symmetric);
sendPacket(std::move(symmetricPingPacket), *node, node->getSymmetricSocket());
}
@ -768,7 +784,7 @@ void NodeList::sendKeepAlivePings() {
auto type = node->getType();
return !node->isUpstream() && _nodeTypesOfInterest.contains(type) && !NodeType::isDownstream(type);
}, [&](const SharedNodePointer& node) {
sendPacket(constructPingPacket(), *node);
sendPacket(constructPingPacket(node->getUUID()), *node);
});
}