complete LimitedNodeList changes for new cuckoo hash

This commit is contained in:
Stephen Birarda 2014-11-05 14:40:56 -08:00
parent 35d0d31350
commit a492abc3a1
5 changed files with 79 additions and 119 deletions

View file

@ -68,7 +68,6 @@ LimitedNodeList* LimitedNodeList::getInstance() {
LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) : LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) :
_sessionUUID(), _sessionUUID(),
_nodeHash(), _nodeHash(),
_nodeHashMutex(QMutex::Recursive),
_nodeSocket(this), _nodeSocket(this),
_dtlsSocket(NULL), _dtlsSocket(NULL),
_localSockAddr(), _localSockAddr(),
@ -344,18 +343,11 @@ int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packe
} }
SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) { SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) {
const int WAIT_TIME = 10; // wait up to 10ms in the try lock case try {
SharedNodePointer node; return _nodeHash[nodeUUID];
// if caller wants us to block and guarantee the correct answer, then honor that request } catch (std::out_of_range) {
if (blockingLock) { return SharedNodePointer();
// this will block till we can get access
QMutexLocker locker(&_nodeHashMutex);
node = _nodeHash.value(nodeUUID);
} else if (_nodeHashMutex.tryLock(WAIT_TIME)) { // some callers are willing to get wrong answers but not block
node = _nodeHash.value(nodeUUID);
_nodeHashMutex.unlock();
} }
return node;
} }
SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet) { SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet) {
@ -365,22 +357,15 @@ SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet
return nodeWithUUID(nodeUUID); return nodeWithUUID(nodeUUID);
} }
NodeHash LimitedNodeList::getNodeHash() {
QMutexLocker locker(&_nodeHashMutex);
return NodeHash(_nodeHash);
}
void LimitedNodeList::eraseAllNodes() { void LimitedNodeList::eraseAllNodes() {
qDebug() << "Clearing the NodeList. Deleting all nodes in list."; qDebug() << "Clearing the NodeList. Deleting all nodes in list.";
QMutexLocker locker(&_nodeHashMutex); // iterate the current nodes and note that they are going down
for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) {
NodeHash::iterator nodeItem = _nodeHash.begin(); emit nodeKilled(it->second);
// iterate the nodes in the list
while (nodeItem != _nodeHash.end()) {
nodeItem = killNodeAtHashIterator(nodeItem);
} }
_nodeHash.clear();
} }
void LimitedNodeList::reset() { void LimitedNodeList::reset() {
@ -388,20 +373,13 @@ void LimitedNodeList::reset() {
} }
void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) { void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
QMutexLocker locker(&_nodeHashMutex); SharedNodePointer matchingNode = nodeWithUUID(nodeUUID);
if (matchingNode) {
NodeHash::iterator nodeItemToKill = _nodeHash.find(nodeUUID); emit nodeKilled(matchingNode);
if (nodeItemToKill != _nodeHash.end()) { _nodeHash.erase(nodeUUID);
killNodeAtHashIterator(nodeItemToKill);
} }
} }
NodeHash::iterator LimitedNodeList::killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill) {
qDebug() << "Killed" << *nodeItemToKill.value();
emit nodeKilled(nodeItemToKill.value());
return _nodeHash.erase(nodeItemToKill);
}
void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) { void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) {
// read the node id // read the node id
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader(dataByteArray), NUM_BYTES_RFC4122_UUID)); QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader(dataByteArray), NUM_BYTES_RFC4122_UUID));
@ -412,61 +390,32 @@ void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) {
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType, SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, NodeType_t nodeType,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) { const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
_nodeHashMutex.lock(); try {
SharedNodePointer matchingNode = _nodeHash[uuid];
if (!_nodeHash.contains(uuid)) { matchingNode->updateSockets(publicSocket, localSocket);
return matchingNode;
} catch (std::out_of_range) {
// we didn't have this node, so add them // we didn't have this node, so add them
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket); Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater); SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater);
_nodeHash.insert(newNode->getUUID(), newNodeSharedPointer); _nodeHash.insert(newNode->getUUID(), newNodeSharedPointer);
_nodeHashMutex.unlock();
qDebug() << "Added" << *newNode; qDebug() << "Added" << *newNode;
emit nodeAdded(newNodeSharedPointer); emit nodeAdded(newNodeSharedPointer);
return newNodeSharedPointer; return newNodeSharedPointer;
} else {
_nodeHashMutex.unlock();
return updateSocketsForNode(uuid, publicSocket, localSocket);
} }
} }
SharedNodePointer LimitedNodeList::updateSocketsForNode(const QUuid& uuid,
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
SharedNodePointer matchingNode = nodeWithUUID(uuid);
if (matchingNode) {
// perform appropriate updates to this node
QMutexLocker locker(&matchingNode->getMutex());
// check if we need to change this node's public or local sockets
if (publicSocket != matchingNode->getPublicSocket()) {
matchingNode->setPublicSocket(publicSocket);
qDebug() << "Public socket change for node" << *matchingNode;
}
if (localSocket != matchingNode->getLocalSocket()) {
matchingNode->setLocalSocket(localSocket);
qDebug() << "Local socket change for node" << *matchingNode;
}
}
return matchingNode;
}
unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) { unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) {
unsigned n = 0; unsigned n = 0;
foreach (const SharedNodePointer& node, getNodeHash()) { SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table();
// only send to the NodeTypes we are asked to send to. for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
if (destinationNodeTypes.contains(node->getType())) { if (destinationNodeTypes.contains(it->second->getType())) {
writeDatagram(packet, node); writeDatagram(packet, it->second);
++n; ++n;
} }
} }
@ -510,9 +459,11 @@ QByteArray LimitedNodeList::constructPingReplyPacket(const QByteArray& pingPacke
SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) { SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) {
if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) { if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
foreach (const SharedNodePointer& node, getNodeHash()) { SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table();
if (node->getType() == nodeType) {
return node; for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
if (it->second->getType() == nodeType) {
return it->second;
} }
} }
} }
@ -532,27 +483,19 @@ void LimitedNodeList::resetPacketStats() {
void LimitedNodeList::removeSilentNodes() { void LimitedNodeList::removeSilentNodes() {
_nodeHashMutex.lock(); SnapshotNodeHash snapshotHash = _nodeHash.snapshot_table();
NodeHash::iterator nodeItem = _nodeHash.begin();
while (nodeItem != _nodeHash.end()) {
SharedNodePointer node = nodeItem.value();
for (auto it = snapshotHash.begin(); it != snapshotHash.end(); it++) {
SharedNodePointer node = it->second;
node->getMutex().lock(); node->getMutex().lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * 1000)) { if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > (NODE_SILENCE_THRESHOLD_MSECS * 1000)) {
// call our private method to kill this node (removes it and emits the right signal) // call the NodeHash erase to get rid of this node
nodeItem = killNodeAtHashIterator(nodeItem); _nodeHash.erase(it->first);
} else {
// we didn't kill this node, push the iterator forwards
++nodeItem;
} }
node->getMutex().unlock(); node->getMutex().unlock();
} }
_nodeHashMutex.unlock();
} }
const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442; const uint32_t RFC_5389_MAGIC_COOKIE = 0x2112A442;

View file

@ -55,6 +55,7 @@ typedef QSharedPointer<Node> SharedNodePointer;
Q_DECLARE_METATYPE(SharedNodePointer) Q_DECLARE_METATYPE(SharedNodePointer)
typedef cuckoohash_map<QUuid, SharedNodePointer, UUIDCityHasher > NodeHash; typedef cuckoohash_map<QUuid, SharedNodePointer, UUIDCityHasher > NodeHash;
typedef std::vector<std::pair<QUuid, SharedNodePointer> > SnapshotNodeHash;
typedef quint8 PingType_t; typedef quint8 PingType_t;
namespace PingType { namespace PingType {
@ -95,7 +96,7 @@ public:
void(*linkedDataCreateCallback)(Node *); void(*linkedDataCreateCallback)(Node *);
NodeHash getNodeHash(); const NodeHash& getNodeHash() { return _nodeHash; }
int size() const { return _nodeHash.size(); } int size() const { return _nodeHash.size(); }
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true); SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true);
@ -155,14 +156,10 @@ protected:
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr, qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret); const QUuid& connectionSecret);
NodeHash::iterator killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill);
void changeSocketBufferSizes(int numBytes); void changeSocketBufferSizes(int numBytes);
QUuid _sessionUUID; QUuid _sessionUUID;
cuckoohash_map<QByteArray, SharedNodePointer> _nodeHash; NodeHash _nodeHash;
QMutex _nodeHashMutex;
QUdpSocket _nodeSocket; QUdpSocket _nodeSocket;
QUdpSocket* _dtlsSocket; QUdpSocket* _dtlsSocket;
HifiSockAddr _localSockAddr; HifiSockAddr _localSockAddr;

View file

@ -94,30 +94,48 @@ void Node::updateClockSkewUsec(int clockSkewSample) {
} }
void Node::setPublicSocket(const HifiSockAddr& publicSocket) { void Node::setPublicSocket(const HifiSockAddr& publicSocket) {
if (publicSocket != _publicSocket) {
if (_activeSocket == &_publicSocket) { if (_activeSocket == &_publicSocket) {
// if the active socket was the public socket then reset it to NULL // if the active socket was the public socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
if (!_publicSocket.isNull()) {
qDebug() << "Public socket change for node" << *this;
}
_publicSocket = publicSocket; _publicSocket = publicSocket;
}
} }
void Node::setLocalSocket(const HifiSockAddr& localSocket) { void Node::setLocalSocket(const HifiSockAddr& localSocket) {
if (localSocket != _localSocket) {
if (_activeSocket == &_localSocket) { if (_activeSocket == &_localSocket) {
// if the active socket was the local socket then reset it to NULL // if the active socket was the local socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
if (!_localSocket.isNull()) {
qDebug() << "Local socket change for node" << *this;
}
_localSocket = localSocket; _localSocket = localSocket;
}
} }
void Node::setSymmetricSocket(const HifiSockAddr& symmetricSocket) { void Node::setSymmetricSocket(const HifiSockAddr& symmetricSocket) {
if (symmetricSocket != _symmetricSocket) {
if (_activeSocket == &_symmetricSocket) { if (_activeSocket == &_symmetricSocket) {
// if the active socket was the symmetric socket then reset it to NULL // if the active socket was the symmetric socket then reset it to NULL
_activeSocket = NULL; _activeSocket = NULL;
} }
if (!_symmetricSocket.isNull()) {
qDebug() << "Symmetric socket change for node" << *this;
}
_symmetricSocket = symmetricSocket; _symmetricSocket = symmetricSocket;
}
} }
void Node::activateLocalSocket() { void Node::activateLocalSocket() {

View file

@ -81,6 +81,8 @@ public:
const HifiSockAddr& getSymmetricSocket() const { return _symmetricSocket; } const HifiSockAddr& getSymmetricSocket() const { return _symmetricSocket; }
virtual void setSymmetricSocket(const HifiSockAddr& symmetricSocket); virtual void setSymmetricSocket(const HifiSockAddr& symmetricSocket);
void updateSockets(const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
const HifiSockAddr* getActiveSocket() const { return _activeSocket; } const HifiSockAddr* getActiveSocket() const { return _activeSocket; }
void activatePublicSocket(); void activatePublicSocket();

View file

@ -449,10 +449,10 @@ void NodeList::pingPunchForInactiveNode(const SharedNodePointer& node) {
} }
void NodeList::pingInactiveNodes() { void NodeList::pingInactiveNodes() {
foreach (const SharedNodePointer& node, getNodeHash()) { for (auto it = _nodeHash.cbegin(); !it.is_end(); it++) {
if (!node->getActiveSocket()) { if (!it->second->getActiveSocket()) {
// we don't have an active link to this node, ping it to set that up // we don't have an active link to this node, ping it to set that up
pingPunchForInactiveNode(node); pingPunchForInactiveNode(it->second);
} }
} }
} }