From a492abc3a1410e5eaab76e72a203976c925b69ef Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Wed, 5 Nov 2014 14:40:56 -0800 Subject: [PATCH] complete LimitedNodeList changes for new cuckoo hash --- libraries/networking/src/LimitedNodeList.cpp | 133 ++++++------------- libraries/networking/src/LimitedNodeList.h | 9 +- libraries/networking/src/Node.cpp | 48 ++++--- libraries/networking/src/Node.h | 2 + libraries/networking/src/NodeList.cpp | 6 +- 5 files changed, 79 insertions(+), 119 deletions(-) diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 043f0621bb..0f86719761 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -68,7 +68,6 @@ LimitedNodeList* LimitedNodeList::getInstance() { LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) : _sessionUUID(), _nodeHash(), - _nodeHashMutex(QMutex::Recursive), _nodeSocket(this), _dtlsSocket(NULL), _localSockAddr(), @@ -344,18 +343,11 @@ int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packe } SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) { - const int WAIT_TIME = 10; // wait up to 10ms in the try lock case - SharedNodePointer node; - // if caller wants us to block and guarantee the correct answer, then honor that request - if (blockingLock) { - // this will block till we can get access - QMutexLocker locker(&_nodeHashMutex); - node = _nodeHash.value(nodeUUID); - } else if (_nodeHashMutex.tryLock(WAIT_TIME)) { // some callers are willing to get wrong answers but not block - node = _nodeHash.value(nodeUUID); - _nodeHashMutex.unlock(); + try { + return _nodeHash[nodeUUID]; + } catch (std::out_of_range) { + return SharedNodePointer(); } - return node; } SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet) { @@ -365,22 +357,15 @@ SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet return nodeWithUUID(nodeUUID); } -NodeHash LimitedNodeList::getNodeHash() { - QMutexLocker locker(&_nodeHashMutex); - return NodeHash(_nodeHash); -} - void LimitedNodeList::eraseAllNodes() { qDebug() << "Clearing the NodeList. Deleting all nodes in list."; - QMutexLocker locker(&_nodeHashMutex); - - NodeHash::iterator nodeItem = _nodeHash.begin(); - - // iterate the nodes in the list - while (nodeItem != _nodeHash.end()) { - nodeItem = killNodeAtHashIterator(nodeItem); + // iterate the current nodes and note that they are going down + for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) { + emit nodeKilled(it->second); } + + _nodeHash.clear(); } void LimitedNodeList::reset() { @@ -388,20 +373,13 @@ void LimitedNodeList::reset() { } void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) { - QMutexLocker locker(&_nodeHashMutex); - - NodeHash::iterator nodeItemToKill = _nodeHash.find(nodeUUID); - if (nodeItemToKill != _nodeHash.end()) { - killNodeAtHashIterator(nodeItemToKill); + SharedNodePointer matchingNode = nodeWithUUID(nodeUUID); + if (matchingNode) { + emit nodeKilled(matchingNode); + _nodeHash.erase(nodeUUID); } } -NodeHash::iterator LimitedNodeList::killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill) { - qDebug() << "Killed" << *nodeItemToKill.value(); - emit nodeKilled(nodeItemToKill.value()); - return _nodeHash.erase(nodeItemToKill); -} - void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) { // read the node id QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader(dataByteArray), NUM_BYTES_RFC4122_UUID)); @@ -411,62 +389,33 @@ void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) { } SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, - const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) { - _nodeHashMutex.lock(); - - if (!_nodeHash.contains(uuid)) { - + const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) { + try { + SharedNodePointer matchingNode = _nodeHash[uuid]; + matchingNode->updateSockets(publicSocket, localSocket); + return matchingNode; + } catch (std::out_of_range) { // we didn't have this node, so add them Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater); _nodeHash.insert(newNode->getUUID(), newNodeSharedPointer); - _nodeHashMutex.unlock(); - qDebug() << "Added" << *newNode; - + emit nodeAdded(newNodeSharedPointer); - + return newNodeSharedPointer; - } else { - _nodeHashMutex.unlock(); - - return updateSocketsForNode(uuid, publicSocket, localSocket); } } -SharedNodePointer LimitedNodeList::updateSocketsForNode(const QUuid& uuid, - const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) { - - SharedNodePointer matchingNode = nodeWithUUID(uuid); - - if (matchingNode) { - // perform appropriate updates to this node - QMutexLocker locker(&matchingNode->getMutex()); - - // check if we need to change this node's public or local sockets - if (publicSocket != matchingNode->getPublicSocket()) { - matchingNode->setPublicSocket(publicSocket); - qDebug() << "Public socket change for node" << *matchingNode; - } - - if (localSocket != matchingNode->getLocalSocket()) { - matchingNode->setLocalSocket(localSocket); - qDebug() << "Local socket change for node" << *matchingNode; - } - } - - return matchingNode; -} - unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) { unsigned n = 0; - - foreach (const SharedNodePointer& node, getNodeHash()) { - // only send to the NodeTypes we are asked to send to. - if (destinationNodeTypes.contains(node->getType())) { - writeDatagram(packet, node); + + SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table(); + for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { + if (destinationNodeTypes.contains(it->second->getType())) { + writeDatagram(packet, it->second); ++n; } } @@ -510,9 +459,11 @@ QByteArray LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacke SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) { if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) { - foreach (const SharedNodePointer& node, getNodeHash()) { - if (node->getType() == nodeType) { - return node; + SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table(); + + for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { + if (it->second->getType() == nodeType) { + return it->second; } } } @@ -531,28 +482,20 @@ void LimitedNodeList::resetPacketStats() { } void LimitedNodeList::removeSilentNodes() { - - _nodeHashMutex.lock(); - NodeHash::iterator nodeItem = _nodeHash.begin(); - - while (nodeItem != _nodeHash.end()) { - SharedNodePointer node = nodeItem.value(); - + SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table(); + + for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { + SharedNodePointer node = it->second; node->getMutex().lock(); - + if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * 1000)) { - // call our private method to kill this node (removes it and emits the right signal) - nodeItem = killNodeAtHashIterator(nodeItem); - } else { - // we didn't kill this node, push the iterator forwards - ++nodeItem; + // call the NodeHash erase to get rid of this node + _nodeHash.erase(it->first); } node->getMutex().unlock(); } - - _nodeHashMutex.unlock(); } const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442; diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 100e3a53ea..81daa7ace8 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -55,6 +55,7 @@ typedef QSharedPointer SharedNodePointer; Q_DECLARE_METATYPE(SharedNodePointer) typedef cuckoohash_map NodeHash; +typedef std::vector > SnapshotNodeHash; typedef quint8 PingType_t; namespace PingType { @@ -95,7 +96,7 @@ public: void(*linkedDataCreateCallback)(Node *); - NodeHash getNodeHash(); + const NodeHash& getNodeHash() { return _nodeHash; } int size() const { return _nodeHash.size(); } SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true); @@ -154,15 +155,11 @@ protected: qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr, const QUuid& connectionSecret); - - NodeHash::iterator killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill); - void changeSocketBufferSizes(int numBytes); QUuid _sessionUUID; - cuckoohash_map _nodeHash; - QMutex _nodeHashMutex; + NodeHash _nodeHash; QUdpSocket _nodeSocket; QUdpSocket* _dtlsSocket; HifiSockAddr _localSockAddr; diff --git a/libraries/networking/src/Node.cpp b/libraries/networking/src/Node.cpp index c2b35d3b7d..de82dde7f0 100644 --- a/libraries/networking/src/Node.cpp +++ b/libraries/networking/src/Node.cpp @@ -94,30 +94,48 @@ void Node::updateClockSkewUsec(int clockSkewSample) { } void Node::setPublicSocket(const HifiSockAddr& publicSocket) { - if (_activeSocket == &_publicSocket) { - // if the active socket was the public socket then reset it to NULL - _activeSocket = NULL; + if (publicSocket != _publicSocket) { + if (_activeSocket == &_publicSocket) { + // if the active socket was the public socket then reset it to NULL + _activeSocket = NULL; + } + + if (!_publicSocket.isNull()) { + qDebug() << "Public socket change for node" << *this; + } + + _publicSocket = publicSocket; } - - _publicSocket = publicSocket; } void Node::setLocalSocket(const HifiSockAddr& localSocket) { - if (_activeSocket == &_localSocket) { - // if the active socket was the local socket then reset it to NULL - _activeSocket = NULL; + if (localSocket != _localSocket) { + if (_activeSocket == &_localSocket) { + // if the active socket was the local socket then reset it to NULL + _activeSocket = NULL; + } + + if (!_localSocket.isNull()) { + qDebug() << "Local socket change for node" << *this; + } + + _localSocket = localSocket; } - - _localSocket = localSocket; } void Node::setSymmetricSocket(const HifiSockAddr& symmetricSocket) { - if (_activeSocket == &_symmetricSocket) { - // if the active socket was the symmetric socket then reset it to NULL - _activeSocket = NULL; + if (symmetricSocket != _symmetricSocket) { + if (_activeSocket == &_symmetricSocket) { + // if the active socket was the symmetric socket then reset it to NULL + _activeSocket = NULL; + } + + if (!_symmetricSocket.isNull()) { + qDebug() << "Symmetric socket change for node" << *this; + } + + _symmetricSocket = symmetricSocket; } - - _symmetricSocket = symmetricSocket; } void Node::activateLocalSocket() { diff --git a/libraries/networking/src/Node.h b/libraries/networking/src/Node.h index acb6c6f453..05b32d0528 100644 --- a/libraries/networking/src/Node.h +++ b/libraries/networking/src/Node.h @@ -81,6 +81,8 @@ public: const HifiSockAddr& getSymmetricSocket() const { return _symmetricSocket; } virtual void setSymmetricSocket(const HifiSockAddr& symmetricSocket); + void updateSockets(const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket); + const HifiSockAddr* getActiveSocket() const { return _activeSocket; } void activatePublicSocket(); diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index ff3b86880d..6621a6d2e2 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -449,10 +449,10 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::pingInactiveNodes() { - foreach (const SharedNodePointer& node, getNodeHash()) { - if (!node->getActiveSocket()) { + for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) { + if (!it->second->getActiveSocket()) { // we don't have an active link to this node, ping it to set that up - pingPunchForInactiveNode(node); + pingPunchForInactiveNode(it->second); } } }