move networking lib to TBB concurrent_unordered_map

This commit is contained in:
Stephen Birarda 2014-11-06 11:19:16 -08:00
parent a39ed798ae
commit e0c4f14c81
6 changed files with 108 additions and 91 deletions

View file

@ -21,6 +21,8 @@
#include <LogHandler.h>
#include <tbb/parallel_for.h>
#include "AccountManager.h"
#include "Assignment.h"
#include "HifiSockAddr.h"
@ -342,9 +344,9 @@ int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packe
return 0;
}
SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) {
SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID) {
try {
return _nodeHash[nodeUUID];
return _nodeHash.at(nodeUUID);
} catch (std::out_of_range) {
return SharedNodePointer();
}
@ -360,12 +362,12 @@ SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet
void LimitedNodeList::eraseAllNodes() {
qDebug() << "Clearing the NodeList. Deleting all nodes in list.";
// iterate the current nodes and note that they are going down
for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) {
// iterate the current nodes, emit that they are dying and remove them from the hash
QWriteLocker writeLock(&_nodeMutex);
for (NodeHash::iterator it = _nodeHash.begin(); it != _nodeHash.end(); ++it) {
emit nodeKilled(it->second);
it = _nodeHash.unsafe_erase(it);
}
_nodeHash.clear();
}
void LimitedNodeList::reset() {
@ -373,10 +375,13 @@ void LimitedNodeList::reset() {
}
void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
SharedNodePointer matchingNode = nodeWithUUID(nodeUUID);
if (matchingNode) {
NodeHash::iterator it = _nodeHash.find(nodeUUID);
if (it != _nodeHash.end()) {
SharedNodePointer matchingNode = it->second;
QWriteLocker writeLocker(&_nodeMutex);
_nodeHash.unsafe_erase(it);
emit nodeKilled(matchingNode);
_nodeHash.erase(nodeUUID);
}
}
@ -391,7 +396,7 @@ void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) {
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
try {
SharedNodePointer matchingNode = _nodeHash[uuid];
SharedNodePointer matchingNode = _nodeHash.at(uuid);
matchingNode->setPublicSocket(publicSocket);
matchingNode->setLocalSocket(localSocket);
@ -402,7 +407,7 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater);
_nodeHash.insert(newNode->getUUID(), newNodeSharedPointer);
_nodeHash.insert(UUIDNodePair(newNode->getUUID(), newNodeSharedPointer));
qDebug() << "Added" << *newNode;
@ -415,13 +420,12 @@ SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t
unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) {
unsigned n = 0;
NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table();
for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
if (destinationNodeTypes.contains(it->second->getType())) {
writeDatagram(packet, it->second);
eachNode([&](const SharedNodePointer& node){
if (destinationNodeTypes.contains(node->getType())) {
writeDatagram(packet, node);
++n;
}
}
});
return n;
}
@ -460,17 +464,9 @@ QByteArray LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacke
}
SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) {
if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table();
for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
if (it->second->getType() == nodeType) {
return it->second;
}
}
}
return SharedNodePointer();
return nodeMatchingPredicate([&](const SharedNodePointer& node){
return node->getType() == nodeType;
});
}
void LimitedNodeList::getPacketStats(float& packetsPerSecond, float& bytesPerSecond) {
@ -485,20 +481,21 @@ void LimitedNodeList::resetPacketStats() {
}
void LimitedNodeList::removeSilentNodes() {
NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table();
for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
eachNodeHashIterator([this](NodeHash::iterator& it){
SharedNodePointer node = it->second;
node->getMutex().lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * 1000)) {
// call the NodeHash erase to get rid of this node
_nodeHash.erase(it->first);
it = _nodeHash.unsafe_erase(it);
emit nodeKilled(node);
} else {
// we didn't erase this node, push the iterator forwards
++it;
}
node->getMutex().unlock();
}
});
}
const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442;

View file

@ -20,19 +20,18 @@
#include <unistd.h> // not on windows, not needed for mac or windows
#endif
#include <QtCore/QElapsedTimer>
#include <QtCore/QMutex>
#include <QtCore/QSet>
#include <QtCore/QSettings>
#include <QtCore/QSharedPointer>
#include <QtNetwork/QHostAddress>
#include <QtNetwork/QUdpSocket>
#include <qelapsedtimer.h>
#include <qreadwritelock.h>
#include <qset.h>
#include <qsharedpointer.h>
#include <qhostaddress.h>
#include <qudpsocket.h>
#include <libcuckoo/cuckoohash_map.hh>
#include <tbb/concurrent_unordered_map.h>
#include "DomainHandler.h"
#include "Node.h"
#include "UUIDCityHasher.h"
#include "UUIDHasher.h"
const int MAX_PACKET_SIZE = 1500;
@ -54,8 +53,9 @@ typedef QSet<NodeType_t> NodeSet;
typedef QSharedPointer<Node> SharedNodePointer;
Q_DECLARE_METATYPE(SharedNodePointer)
typedef cuckoohash_map<QUuid, SharedNodePointer, UUIDCityHasher > NodeHash;
typedef std::vector<std::pair<QUuid, SharedNodePointer> > NodeHashSnapshot;
using namespace tbb;
typedef std::pair<QUuid, SharedNodePointer> UUIDNodePair;
typedef concurrent_unordered_map<QUuid, SharedNodePointer, UUIDHasher> NodeHash;
typedef quint8 PingType_t;
namespace PingType {
@ -74,7 +74,6 @@ public:
const QUuid& getSessionUUID() const { return _sessionUUID; }
void setSessionUUID(const QUuid& sessionUUID);
void rebindNodeSocket();
QUdpSocket& getNodeSocket() { return _nodeSocket; }
QUdpSocket& getDTLSSocket();
@ -95,11 +94,10 @@ public:
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
void(*linkedDataCreateCallback)(Node *);
const NodeHash& getNodeHash() { return _nodeHash; }
int size() const { return _nodeHash.size(); }
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true);
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID);
SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
SharedNodePointer addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
@ -128,6 +126,29 @@ public:
void sendHeartbeatToIceServer(const HifiSockAddr& iceServerSockAddr,
QUuid headerID = QUuid(), const QUuid& connectRequestID = QUuid());
template<typename NodeLambda>
void eachNode(NodeLambda functor) {
QReadLocker readLock(&_nodeMutex);
for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) {
functor(it->second);
}
}
template<typename PredLambda>
SharedNodePointer nodeMatchingPredicate(const PredLambda predicate) {
QReadLocker readLock(&_nodeMutex);
for (NodeHash::const_iterator it = _nodeHash.cbegin(); it != _nodeHash.cend(); ++it) {
if (predicate(it->second)) {
return it->second;
}
}
return SharedNodePointer();
}
public slots:
void reset();
void eraseAllNodes();
@ -158,6 +179,7 @@ protected:
QUuid _sessionUUID;
NodeHash _nodeHash;
QReadWriteLock _nodeMutex;
QUdpSocket _nodeSocket;
QUdpSocket* _dtlsSocket;
HifiSockAddr _localSockAddr;
@ -166,6 +188,17 @@ protected:
int _numCollectedPackets;
int _numCollectedBytes;
QElapsedTimer _packetStatTimer;
template<typename IteratorLambda>
void eachNodeHashIterator(IteratorLambda functor) {
QWriteLocker writeLock(&_nodeMutex);
NodeHash::iterator it = _nodeHash.begin();
while (it != _nodeHash.end()) {
functor(it);
}
}
};
#endif // hifi_LimitedNodeList_h

View file

@ -449,13 +449,12 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
}
void NodeList::pingInactiveNodes() {
NodeHashSnapshot snapshotHash = _nodeHash.snapshot_table();
for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
if (!it->second->getActiveSocket()) {
eachNode([this](const SharedNodePointer& node){
if (!node->getActiveSocket()) {
// we don't have an active link to this node, ping it to set that up
pingPunchForInactiveNode(it->second);
pingPunchForInactiveNode(node);
}
}
});
}
void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, const SharedNodePointer& sendingNode) {

View file

@ -1,12 +0,0 @@
//
// UUIDCityHasher.cpp
// libraries/networking/src
//
// Created by Stephen Birarda on 2014-11-05.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#include "UUIDCityHasher.h"

View file

@ -1,26 +0,0 @@
//
// UUIDCityHasher.h
// libraries/networking/src
//
// Created by Stephen Birarda on 2014-11-05.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_UUIDCityHasher_h
#define hifi_UUIDCityHasher_h
#include <libcuckoo/city.h>
#include "UUID.h"
class UUIDCityHasher {
public:
size_t operator()(const QUuid& key) const {
return CityHash64(key.toRfc4122().constData(), NUM_BYTES_RFC4122_UUID);
}
};
#endif // hifi_UUIDCityHasher_h

View file

@ -0,0 +1,26 @@
//
// UUIDHasher.h
// libraries/networking/src
//
// Created by Stephen Birarda on 2014-11-05.
// Copyright 2014 High Fidelity, Inc.
//
// Distributed under the Apache License, Version 2.0.
// See the accompanying file LICENSE or http://www.apache.org/licenses/LICENSE-2.0.html
//
#ifndef hifi_UUIDHasher_h
#define hifi_UUIDHasher_h
#include "UUID.h"
class UUIDHasher {
public:
size_t operator()(const QUuid& uuid) const {
return uuid.data1 ^ uuid.data2 ^ (uuid.data3 << 16)
^ ((uuid.data4[0] << 24) | (uuid.data4[1] << 16) | (uuid.data4[2] << 8) | uuid.data4[3])
^ ((uuid.data4[4] << 24) | (uuid.data4[5] << 16) | (uuid.data4[6] << 8) | uuid.data4[7]);
}
};
#endif // hifi_UUIDHasher_h