From e0c4f14c8163f55c4c28abca1fa9df461b256440 Mon Sep 17 00:00:00 2001 From: Stephen Birarda Date: Thu, 6 Nov 2014 11:19:16 -0800 Subject: [PATCH] move networking lib to TBB concurrent_unordered_map --- libraries/networking/src/LimitedNodeList.cpp | 63 ++++++++++---------- libraries/networking/src/LimitedNodeList.h | 63 +++++++++++++++----- libraries/networking/src/NodeList.cpp | 9 ++- libraries/networking/src/UUIDCityHasher.cpp | 12 ---- libraries/networking/src/UUIDCityHasher.h | 26 -------- libraries/networking/src/UUIDHasher.h | 26 ++++++++ 6 files changed, 108 insertions(+), 91 deletions(-) delete mode 100644 libraries/networking/src/UUIDCityHasher.cpp delete mode 100644 libraries/networking/src/UUIDCityHasher.h create mode 100644 libraries/networking/src/UUIDHasher.h diff --git a/libraries/networking/src/LimitedNodeList.cpp b/libraries/networking/src/LimitedNodeList.cpp index 5a103076b6..818d35c0e5 100644 --- a/libraries/networking/src/LimitedNodeList.cpp +++ b/libraries/networking/src/LimitedNodeList.cpp @@ -21,6 +21,8 @@ #include +#include + #include "AccountManager.h" #include "Assignment.h" #include "HifiSockAddr.h" @@ -342,9 +344,9 @@ int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packe return 0; } -SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) { +SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID) { try { - return _nodeHash[nodeUUID]; + return _nodeHash.at(nodeUUID); } catch (std::out_of_range) { return SharedNodePointer(); } @@ -360,12 +362,12 @@ SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet void LimitedNodeList::eraseAllNodes() { qDebug() << "Clearing the NodeList. Deleting all nodes in list."; - // iterate the current nodes and note that they are going down - for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) { + // iterate the current nodes, emit that they are dying and remove them from the hash + QWriteLocker writeLock(&_nodeMutex); + for (NodeHash::iterator it = _nodeHash.begin(); it != _nodeHash.end(); ++it) { emit nodeKilled(it->second); + it = _nodeHash.unsafe_erase(it); } - - _nodeHash.clear(); } void LimitedNodeList::reset() { @@ -373,10 +375,13 @@ void LimitedNodeList::reset() { } void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) { - SharedNodePointer matchingNode = nodeWithUUID(nodeUUID); - if (matchingNode) { + NodeHash::iterator it = _nodeHash.find(nodeUUID); + if (it != _nodeHash.end()) { + SharedNodePointer matchingNode = it->second; + + QWriteLocker writeLocker(&_nodeMutex); + _nodeHash.unsafe_erase(it); emit nodeKilled(matchingNode); - _nodeHash.erase(nodeUUID); } } @@ -391,7 +396,7 @@ void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) { SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) { try { - SharedNodePointer matchingNode = _nodeHash[uuid]; + SharedNodePointer matchingNode = _nodeHash.at(uuid); matchingNode->setPublicSocket(publicSocket); matchingNode->setLocalSocket(localSocket); @@ -402,7 +407,7 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater); - _nodeHash.insert(newNode->getUUID(), newNodeSharedPointer); + _nodeHash.insert(UUIDNodePair(newNode->getUUID(), newNodeSharedPointer)); qDebug() << "Added" << *newNode; @@ -415,13 +420,12 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) { unsigned n = 0; - NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table(); - for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { - if (destinationNodeTypes.contains(it->second->getType())) { - writeDatagram(packet, it->second); + eachNode([&](const SharedNodePointer& node){ + if (destinationNodeTypes.contains(node->getType())) { + writeDatagram(packet, node); ++n; } - } + }); return n; } @@ -460,17 +464,9 @@ QByteArray LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacke } SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) { - - if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) { - NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table(); - - for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { - if (it->second->getType() == nodeType) { - return it->second; - } - } - } - return SharedNodePointer(); + return nodeMatchingPredicate([&](const SharedNodePointer& node){ + return node->getType() == nodeType; + }); } void LimitedNodeList::getPacketStats(float& packetsPerSecond, float& bytesPerSecond) { @@ -485,20 +481,21 @@ void LimitedNodeList::resetPacketStats() { } void LimitedNodeList::removeSilentNodes() { - - NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table(); - - for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { + eachNodeHashIterator([this](NodeHash::iterator& it){ SharedNodePointer node = it->second; node->getMutex().lock(); if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * 1000)) { // call the NodeHash erase to get rid of this node - _nodeHash.erase(it->first); + it = _nodeHash.unsafe_erase(it); + emit nodeKilled(node); + } else { + // we didn't erase this node, push the iterator forwards + ++it; } node->getMutex().unlock(); - } + }); } const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442; diff --git a/libraries/networking/src/LimitedNodeList.h b/libraries/networking/src/LimitedNodeList.h index 63f93458f3..c7a8ccecf5 100644 --- a/libraries/networking/src/LimitedNodeList.h +++ b/libraries/networking/src/LimitedNodeList.h @@ -20,19 +20,18 @@ #include // not on windows, not needed for mac or windows #endif -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include +#include -#include +#include #include "DomainHandler.h" #include "Node.h" -#include "UUIDCityHasher.h" +#include "UUIDHasher.h" const int MAX_PACKET_SIZE = 1500; @@ -54,8 +53,9 @@ typedef QSet NodeSet; typedef QSharedPointer SharedNodePointer; Q_DECLARE_METATYPE(SharedNodePointer) -typedef cuckoohash_map NodeHash; -typedef std::vector > NodeHashSnapshot; +using namespace tbb; +typedef std::pair UUIDNodePair; +typedef concurrent_unordered_map NodeHash; typedef quint8 PingType_t; namespace PingType { @@ -74,7 +74,6 @@ public: const QUuid& getSessionUUID() const { return _sessionUUID; } void setSessionUUID(const QUuid& sessionUUID); - void rebindNodeSocket(); QUdpSocket& getNodeSocket() { return _nodeSocket; } QUdpSocket& getDTLSSocket(); @@ -95,11 +94,10 @@ public: const HifiSockAddr& overridenSockAddr = HifiSockAddr()); void(*linkedDataCreateCallback)(Node *); - - const NodeHash& getNodeHash() { return _nodeHash; } + int size() const { return _nodeHash.size(); } - SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true); + SharedNodePointer nodeWithUUID(const QUuid& nodeUUID); SharedNodePointer sendingNodeForPacket(const QByteArray& packet); SharedNodePointer addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, @@ -128,6 +126,29 @@ public: void sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr, QUuid headerID = QUuid(), const QUuid& connectRequestID = QUuid()); + + template + void eachNode(NodeLambda functor) { + QReadLocker readLock(&_nodeMutex); + + for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) { + functor(it->second); + } + } + + template + SharedNodePointer nodeMatchingPredicate(const PredLambda predicate) { + QReadLocker readLock(&_nodeMutex); + + for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) { + if (predicate(it->second)) { + return it->second; + } + } + + return SharedNodePointer(); + } + public slots: void reset(); void eraseAllNodes(); @@ -158,6 +179,7 @@ protected: QUuid _sessionUUID; NodeHash _nodeHash; + QReadWriteLock _nodeMutex; QUdpSocket _nodeSocket; QUdpSocket* _dtlsSocket; HifiSockAddr _localSockAddr; @@ -166,6 +188,17 @@ protected: int _numCollectedPackets; int _numCollectedBytes; QElapsedTimer _packetStatTimer; + + template + void eachNodeHashIterator(IteratorLambda functor) { + QWriteLocker writeLock(&_nodeMutex); + NodeHash::iterator it = _nodeHash.begin(); + + while (it != _nodeHash.end()) { + functor(it); + } + } + }; #endif // hifi_LimitedNodeList_h diff --git a/libraries/networking/src/NodeList.cpp b/libraries/networking/src/NodeList.cpp index fe8efe42c4..bf992e7b88 100644 --- a/libraries/networking/src/NodeList.cpp +++ b/libraries/networking/src/NodeList.cpp @@ -449,13 +449,12 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) { } void NodeList::pingInactiveNodes() { - NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table(); - for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) { - if (!it->second->getActiveSocket()) { + eachNode([this](const SharedNodePointer& node){ + if (!node->getActiveSocket()) { // we don't have an active link to this node, ping it to set that up - pingPunchForInactiveNode(it->second); + pingPunchForInactiveNode(node); } - } + }); } void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, const SharedNodePointer& sendingNode) { diff --git a/libraries/networking/src/UUIDCityHasher.cpp b/libraries/networking/src/UUIDCityHasher.cpp deleted file mode 100644 index 731e287fc0..0000000000 --- a/libraries/networking/src/UUIDCityHasher.cpp +++ /dev/null @@ -1,12 +0,0 @@ -// -// UUIDCityHasher.cpp -// libraries/networking/src -// -// Created by Stephen Birarda on 2014-11-05. -// Copyright 2014 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#include "UUIDCityHasher.h" diff --git a/libraries/networking/src/UUIDCityHasher.h b/libraries/networking/src/UUIDCityHasher.h deleted file mode 100644 index e27e3bdf1b..0000000000 --- a/libraries/networking/src/UUIDCityHasher.h +++ /dev/null @@ -1,26 +0,0 @@ -// -// UUIDCityHasher.h -// libraries/networking/src -// -// Created by Stephen Birarda on 2014-11-05. -// Copyright 2014 High Fidelity, Inc. -// -// Distributed under the Apache License, Version 2.0. -// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html -// - -#ifndef hifi_UUIDCityHasher_h -#define hifi_UUIDCityHasher_h - -#include - -#include "UUID.h" - -class UUIDCityHasher { -public: - size_t operator()(const QUuid& key) const { - return CityHash64(key.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID); - } -}; - -#endif // hifi_UUIDCityHasher_h \ No newline at end of file diff --git a/libraries/networking/src/UUIDHasher.h b/libraries/networking/src/UUIDHasher.h new file mode 100644 index 0000000000..d5d16e21e9 --- /dev/null +++ b/libraries/networking/src/UUIDHasher.h @@ -0,0 +1,26 @@ +// +// UUIDHasher.h +// libraries/networking/src +// +// Created by Stephen Birarda on 2014-11-05. +// Copyright 2014 High Fidelity, Inc. +// +// Distributed under the Apache License, Version 2.0. +// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html +// + +#ifndef hifi_UUIDHasher_h +#define hifi_UUIDHasher_h + +#include "UUID.h" + +class UUIDHasher { +public: + size_t operator()(const QUuid& uuid) const { + return uuid.data1 ^ uuid.data2 ^ (uuid.data3 << 16) + ^ ((uuid.data4[0] << 24) | (uuid.data4[1] << 16) | (uuid.data4[2] << 8) | uuid.data4[3]) + ^ ((uuid.data4[4] << 24) | (uuid.data4[5] << 16) | (uuid.data4[6] << 8) | uuid.data4[7]); + } +}; + +#endif // hifi_UUIDHasher_h \ No newline at end of file