mirror of
https://github.com/overte-org/overte.git
synced 2025-04-20 03:44:02 +02:00
use a LimitedNodeList in domain-server
This commit is contained in:
parent
62041d91a8
commit
45c6ae44a6
5 changed files with 582 additions and 467 deletions
|
@ -60,12 +60,12 @@ DomainServer::DomainServer(int argc, char* argv[]) :
|
|||
// DTLS requires that IP_DONTFRAG be set
|
||||
// This is not accessible on some platforms (OS X) so we need to make sure DTLS still works without it
|
||||
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
LimitedNodeList* nodeList = LimitedNodeList::getInstance();
|
||||
|
||||
#if defined(IP_DONTFRAG) || defined(IP_MTU_DISCOVER)
|
||||
qDebug() << "Making required DTLS changes to NodeList DTLS socket.";
|
||||
|
||||
int socketHandle = NodeList::getInstance()->getDTLSSocket().socketDescriptor();
|
||||
int socketHandle = LimitedNodeList::getInstance()->getDTLSSocket().socketDescriptor();
|
||||
#if defined(IP_DONTFRAG)
|
||||
int optValue = 1;yea
|
||||
setsockopt(socketHandle, IPPROTO_IP, IP_DONTFRAG, (const void*) optValue, sizeof(optValue));
|
||||
|
@ -207,13 +207,10 @@ void DomainServer::setupNodeListAndAssignments(const QUuid& sessionUUID) {
|
|||
|
||||
populateDefaultStaticAssignmentsExcludingTypes(parsedTypes);
|
||||
|
||||
NodeList* nodeList = NodeList::createInstance(NodeType::DomainServer, domainServerPort, domainServerDTLSPort);
|
||||
LimitedNodeList* nodeList = LimitedNodeList::createInstance(domainServerPort, domainServerDTLSPort);
|
||||
|
||||
// create a random UUID for this session for the domain-server
|
||||
nodeList->setSessionUUID(sessionUUID);
|
||||
|
||||
connect(nodeList, &NodeList::nodeAdded, this, &DomainServer::nodeAdded);
|
||||
connect(nodeList, &NodeList::nodeKilled, this, &DomainServer::nodeKilled);
|
||||
connect(nodeList, &LimitedNodeList::nodeAdded, this, &DomainServer::nodeAdded);
|
||||
connect(nodeList, &LimitedNodeList::nodeKilled, this, &DomainServer::nodeKilled);
|
||||
|
||||
QTimer* silentNodeTimer = new QTimer(this);
|
||||
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
|
||||
|
@ -394,7 +391,7 @@ void DomainServer::addNodeToNodeListAndConfirmConnection(const QByteArray& packe
|
|||
// create a new session UUID for this node
|
||||
QUuid nodeUUID = QUuid::createUuid();
|
||||
|
||||
SharedNodePointer newNode = NodeList::getInstance()->addOrUpdateNode(nodeUUID, nodeType, publicSockAddr, localSockAddr);
|
||||
SharedNodePointer newNode = LimitedNodeList::getInstance()->addOrUpdateNode(nodeUUID, nodeType, publicSockAddr, localSockAddr);
|
||||
|
||||
// when the newNode is created the linked data is also created, if this was a static assignment set the UUID
|
||||
reinterpret_cast<DomainServerNodeData*>(newNode->getLinkedData())->setStaticAssignmentUUID(assignmentUUID);
|
||||
|
@ -477,7 +474,7 @@ void DomainServer::sendDomainListToNode(const SharedNodePointer& node, const Hif
|
|||
|
||||
DomainServerNodeData* nodeData = reinterpret_cast<DomainServerNodeData*>(node->getLinkedData());
|
||||
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
LimitedNodeList* nodeList = LimitedNodeList::getInstance();
|
||||
|
||||
|
||||
if (nodeInterestList.size() > 0) {
|
||||
|
@ -532,7 +529,7 @@ void DomainServer::sendDomainListToNode(const SharedNodePointer& node, const Hif
|
|||
}
|
||||
|
||||
void DomainServer::readAvailableDatagrams() {
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
LimitedNodeList* nodeList = LimitedNodeList::getInstance();
|
||||
|
||||
HifiSockAddr senderSockAddr;
|
||||
QByteArray receivedPacket;
|
||||
|
@ -610,7 +607,7 @@ void DomainServer::readAvailableDTLSDatagrams() {
|
|||
}
|
||||
|
||||
void DomainServer::processDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr) {
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
LimitedNodeList* nodeList = LimitedNodeList::getInstance();
|
||||
|
||||
if (nodeList->packetVersionAndHashMatch(receivedPacket)) {
|
||||
PacketType requestType = packetTypeForPacket(receivedPacket);
|
||||
|
@ -702,7 +699,7 @@ bool DomainServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
|
|||
QJsonObject assignedNodesJSON;
|
||||
|
||||
// enumerate the NodeList to find the assigned nodes
|
||||
foreach (const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) {
|
||||
foreach (const SharedNodePointer& node, LimitedNodeList::getInstance()->getNodeHash()) {
|
||||
if (_staticAssignmentHash.value(node->getUUID())) {
|
||||
// add the node using the UUID as the key
|
||||
QString uuidString = uuidStringWithoutCurlyBraces(node->getUUID());
|
||||
|
@ -744,7 +741,7 @@ bool DomainServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
|
|||
QJsonObject nodesJSON;
|
||||
|
||||
// enumerate the NodeList to find the assigned nodes
|
||||
NodeList* nodeList = NodeList::getInstance();
|
||||
LimitedNodeList* nodeList = LimitedNodeList::getInstance();
|
||||
|
||||
foreach (const SharedNodePointer& node, nodeList->getNodeHash()) {
|
||||
// add the node using the UUID as the key
|
||||
|
@ -769,7 +766,7 @@ bool DomainServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
|
|||
QUuid matchingUUID = QUuid(nodeShowRegex.cap(1));
|
||||
|
||||
// see if we have a node that matches this ID
|
||||
SharedNodePointer matchingNode = NodeList::getInstance()->nodeWithUUID(matchingUUID);
|
||||
SharedNodePointer matchingNode = LimitedNodeList::getInstance()->nodeWithUUID(matchingUUID);
|
||||
if (matchingNode) {
|
||||
// create a QJsonDocument with the stats QJsonObject
|
||||
QJsonObject statsObject =
|
||||
|
@ -848,14 +845,14 @@ bool DomainServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
|
|||
// pull the captured string, if it exists
|
||||
QUuid deleteUUID = QUuid(nodeDeleteRegex.cap(1));
|
||||
|
||||
SharedNodePointer nodeToKill = NodeList::getInstance()->nodeWithUUID(deleteUUID);
|
||||
SharedNodePointer nodeToKill = LimitedNodeList::getInstance()->nodeWithUUID(deleteUUID);
|
||||
|
||||
if (nodeToKill) {
|
||||
// start with a 200 response
|
||||
connection->respond(HTTPConnection::StatusCode200);
|
||||
|
||||
// we have a valid UUID and node - kill the node that has this assignment
|
||||
QMetaObject::invokeMethod(NodeList::getInstance(), "killNodeWithUUID", Q_ARG(const QUuid&, deleteUUID));
|
||||
QMetaObject::invokeMethod(LimitedNodeList::getInstance(), "killNodeWithUUID", Q_ARG(const QUuid&, deleteUUID));
|
||||
|
||||
// successfully processed request
|
||||
return true;
|
||||
|
@ -864,7 +861,7 @@ bool DomainServer::handleHTTPRequest(HTTPConnection* connection, const QUrl& url
|
|||
return true;
|
||||
} else if (allNodesDeleteRegex.indexIn(url.path()) != -1) {
|
||||
qDebug() << "Received request to kill all nodes.";
|
||||
NodeList::getInstance()->eraseAllNodes();
|
||||
LimitedNodeList::getInstance()->eraseAllNodes();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -911,7 +908,7 @@ void DomainServer::nodeKilled(SharedNodePointer node) {
|
|||
|
||||
// cleanup the connection secrets that we set up for this node (on the other nodes)
|
||||
foreach (const QUuid& otherNodeSessionUUID, nodeData->getSessionSecretHash().keys()) {
|
||||
SharedNodePointer otherNode = NodeList::getInstance()->nodeWithUUID(otherNodeSessionUUID);
|
||||
SharedNodePointer otherNode = LimitedNodeList::getInstance()->nodeWithUUID(otherNodeSessionUUID);
|
||||
if (otherNode) {
|
||||
reinterpret_cast<DomainServerNodeData*>(otherNode->getLinkedData())->getSessionSecretHash().remove(node->getUUID());
|
||||
}
|
||||
|
@ -996,7 +993,7 @@ void DomainServer::addStaticAssignmentsToQueue() {
|
|||
bool foundMatchingAssignment = false;
|
||||
|
||||
// enumerate the nodes and check if there is one with an attached assignment with matching UUID
|
||||
foreach (const SharedNodePointer& node, NodeList::getInstance()->getNodeHash()) {
|
||||
foreach (const SharedNodePointer& node, LimitedNodeList::getInstance()->getNodeHash()) {
|
||||
if (node->getUUID() == staticAssignment->data()->getUUID()) {
|
||||
foundMatchingAssignment = true;
|
||||
}
|
||||
|
|
429
libraries/shared/src/LimitedNodeList.cpp
Normal file
429
libraries/shared/src/LimitedNodeList.cpp
Normal file
|
@ -0,0 +1,429 @@
|
|||
//
|
||||
// LimitedNodeList.cpp
|
||||
// hifi
|
||||
//
|
||||
// Created by Stephen Birarda on 2/15/13.
|
||||
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#include <cstring>
|
||||
#include <cstdlib>
|
||||
#include <cstdio>
|
||||
|
||||
#include <QtCore/QDataStream>
|
||||
#include <QtCore/QDebug>
|
||||
#include <QtCore/QJsonDocument>
|
||||
#include <QtCore/QUrl>
|
||||
#include <QtNetwork/QHostInfo>
|
||||
|
||||
#include "AccountManager.h"
|
||||
#include "Assignment.h"
|
||||
#include "HifiSockAddr.h"
|
||||
#include "Logging.h"
|
||||
#include "LimitedNodeList.h"
|
||||
#include "PacketHeaders.h"
|
||||
#include "SharedUtil.h"
|
||||
#include "UUID.h"
|
||||
|
||||
const char SOLO_NODE_TYPES[2] = {
|
||||
NodeType::AvatarMixer,
|
||||
NodeType::AudioMixer
|
||||
};
|
||||
|
||||
const QUrl DEFAULT_NODE_AUTH_URL = QUrl("https://data-web.highfidelity.io");
|
||||
|
||||
LimitedNodeList* LimitedNodeList::_sharedInstance = NULL;
|
||||
|
||||
LimitedNodeList* LimitedNodeList::createInstance(unsigned short socketListenPort, unsigned short dtlsPort) {
|
||||
if (!_sharedInstance) {
|
||||
NodeType::init();
|
||||
|
||||
_sharedInstance = new LimitedNodeList(socketListenPort, dtlsPort);
|
||||
|
||||
// register the SharedNodePointer meta-type for signals/slots
|
||||
qRegisterMetaType<SharedNodePointer>();
|
||||
} else {
|
||||
qDebug("NodeList createInstance called with existing instance.");
|
||||
}
|
||||
|
||||
return _sharedInstance;
|
||||
}
|
||||
|
||||
LimitedNodeList* LimitedNodeList::getInstance() {
|
||||
if (!_sharedInstance) {
|
||||
qDebug("NodeList getInstance called before call to createInstance. Returning NULL pointer.");
|
||||
}
|
||||
|
||||
return _sharedInstance;
|
||||
}
|
||||
|
||||
|
||||
LimitedNodeList::LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort) :
|
||||
_nodeHash(),
|
||||
_nodeHashMutex(QMutex::Recursive),
|
||||
_nodeSocket(this),
|
||||
_dtlsSocket(NULL),
|
||||
_numCollectedPackets(0),
|
||||
_numCollectedBytes(0),
|
||||
_packetStatTimer()
|
||||
{
|
||||
_nodeSocket.bind(QHostAddress::AnyIPv4, socketListenPort);
|
||||
qDebug() << "NodeList socket is listening on" << _nodeSocket.localPort();
|
||||
|
||||
if (dtlsListenPort > 0) {
|
||||
// only create the DTLS socket during constructor if a custom port is passed
|
||||
_dtlsSocket = new QUdpSocket(this);
|
||||
|
||||
_dtlsSocket->bind(QHostAddress::AnyIPv4, dtlsListenPort);
|
||||
qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort();
|
||||
}
|
||||
|
||||
const int LARGER_SNDBUF_SIZE = 1048576;
|
||||
changeSendSocketBufferSize(LARGER_SNDBUF_SIZE);
|
||||
|
||||
_packetStatTimer.start();
|
||||
}
|
||||
|
||||
QUdpSocket& LimitedNodeList::getDTLSSocket() {
|
||||
if (!_dtlsSocket) {
|
||||
// DTLS socket getter called but no DTLS socket exists, create it now
|
||||
_dtlsSocket = new QUdpSocket(this);
|
||||
|
||||
_dtlsSocket->bind(QHostAddress::AnyIPv4, 0, QAbstractSocket::DontShareAddress);
|
||||
qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort();
|
||||
}
|
||||
|
||||
return *_dtlsSocket;
|
||||
}
|
||||
|
||||
void LimitedNodeList::changeSendSocketBufferSize(int numSendBytes) {
|
||||
// change the socket send buffer size to be 1MB
|
||||
int oldBufferSize = 0;
|
||||
|
||||
#ifdef Q_OS_WIN
|
||||
int sizeOfInt = sizeof(oldBufferSize);
|
||||
#else
|
||||
unsigned int sizeOfInt = sizeof(oldBufferSize);
|
||||
#endif
|
||||
|
||||
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&oldBufferSize), &sizeOfInt);
|
||||
|
||||
setsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<const char*>(&numSendBytes),
|
||||
sizeof(numSendBytes));
|
||||
|
||||
int newBufferSize = 0;
|
||||
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&newBufferSize), &sizeOfInt);
|
||||
|
||||
qDebug() << "Changed socket send buffer size from" << oldBufferSize << "to" << newBufferSize << "bytes";
|
||||
}
|
||||
|
||||
bool LimitedNodeList::packetVersionAndHashMatch(const QByteArray& packet) {
|
||||
PacketType checkType = packetTypeForPacket(packet);
|
||||
if (packet[1] != versionForPacketType(checkType)
|
||||
&& checkType != PacketTypeStunResponse) {
|
||||
PacketType mismatchType = packetTypeForPacket(packet);
|
||||
int numPacketTypeBytes = numBytesArithmeticCodingFromBuffer(packet.data());
|
||||
|
||||
static QMultiMap<QUuid, PacketType> versionDebugSuppressMap;
|
||||
|
||||
QUuid senderUUID = uuidFromPacketHeader(packet);
|
||||
if (!versionDebugSuppressMap.contains(senderUUID, checkType)) {
|
||||
qDebug() << "Packet version mismatch on" << packetTypeForPacket(packet) << "- Sender"
|
||||
<< uuidFromPacketHeader(packet) << "sent" << qPrintable(QString::number(packet[numPacketTypeBytes])) << "but"
|
||||
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
|
||||
|
||||
versionDebugSuppressMap.insert(senderUUID, checkType);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!NON_VERIFIED_PACKETS.contains(checkType)) {
|
||||
// figure out which node this is from
|
||||
SharedNodePointer sendingNode = sendingNodeForPacket(packet);
|
||||
if (sendingNode) {
|
||||
// check if the md5 hash in the header matches the hash we would expect
|
||||
if (hashFromPacketHeader(packet) == hashForPacketAndConnectionUUID(packet, sendingNode->getConnectionSecret())) {
|
||||
return true;
|
||||
} else {
|
||||
qDebug() << "Packet hash mismatch on" << checkType << "- Sender"
|
||||
<< uuidFromPacketHeader(packet);
|
||||
}
|
||||
} else {
|
||||
qDebug() << "Packet of type" << checkType << "received from unknown node with UUID"
|
||||
<< uuidFromPacketHeader(packet);
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
|
||||
const QUuid& connectionSecret) {
|
||||
QByteArray datagramCopy = datagram;
|
||||
|
||||
if (!connectionSecret.isNull()) {
|
||||
// setup the MD5 hash for source verification in the header
|
||||
replaceHashInPacketGivenConnectionUUID(datagramCopy, connectionSecret);
|
||||
}
|
||||
|
||||
// stat collection for packets
|
||||
++_numCollectedPackets;
|
||||
_numCollectedBytes += datagram.size();
|
||||
|
||||
qint64 bytesWritten = _nodeSocket.writeDatagram(datagramCopy,
|
||||
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
|
||||
|
||||
if (bytesWritten < 0) {
|
||||
qDebug() << "ERROR in writeDatagram:" << _nodeSocket.error() << "-" << _nodeSocket.errorString();
|
||||
}
|
||||
|
||||
return bytesWritten;
|
||||
}
|
||||
|
||||
qint64 LimitedNodeList::writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr) {
|
||||
if (destinationNode) {
|
||||
// if we don't have an ovveriden address, assume they want to send to the node's active socket
|
||||
const HifiSockAddr* destinationSockAddr = &overridenSockAddr;
|
||||
if (overridenSockAddr.isNull()) {
|
||||
if (destinationNode->getActiveSocket()) {
|
||||
// use the node's active socket as the destination socket
|
||||
destinationSockAddr = destinationNode->getActiveSocket();
|
||||
} else {
|
||||
// we don't have a socket to send to, return 0
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
writeDatagram(datagram, *destinationSockAddr, destinationNode->getConnectionSecret());
|
||||
}
|
||||
|
||||
// didn't have a destinationNode to send to, return 0
|
||||
return 0;
|
||||
}
|
||||
|
||||
qint64 LimitedNodeList::writeUnverifiedDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) {
|
||||
return writeDatagram(datagram, destinationSockAddr, QUuid());
|
||||
}
|
||||
|
||||
qint64 LimitedNodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr) {
|
||||
return writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
|
||||
}
|
||||
|
||||
void LimitedNodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet) {
|
||||
// the node decided not to do anything with this packet
|
||||
// if it comes from a known source we should keep that node alive
|
||||
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
|
||||
if (matchingNode) {
|
||||
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
}
|
||||
}
|
||||
|
||||
int LimitedNodeList::updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray &packet) {
|
||||
QMutexLocker locker(&matchingNode->getMutex());
|
||||
|
||||
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
matchingNode->recordBytesReceived(packet.size());
|
||||
|
||||
if (!matchingNode->getLinkedData() && linkedDataCreateCallback) {
|
||||
linkedDataCreateCallback(matchingNode.data());
|
||||
}
|
||||
|
||||
QMutexLocker linkedDataLocker(&matchingNode->getLinkedData()->getMutex());
|
||||
|
||||
return matchingNode->getLinkedData()->parseData(packet);
|
||||
}
|
||||
|
||||
int LimitedNodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packet) {
|
||||
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
|
||||
|
||||
if (matchingNode) {
|
||||
updateNodeWithDataFromPacket(matchingNode, packet);
|
||||
}
|
||||
|
||||
// we weren't able to match the sender address to the address we have for this node, unlock and don't parse
|
||||
return 0;
|
||||
}
|
||||
|
||||
SharedNodePointer LimitedNodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) {
|
||||
const int WAIT_TIME = 10; // wait up to 10ms in the try lock case
|
||||
SharedNodePointer node;
|
||||
// if caller wants us to block and guarantee the correct answer, then honor that request
|
||||
if (blockingLock) {
|
||||
// this will block till we can get access
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
node = _nodeHash.value(nodeUUID);
|
||||
} else if (_nodeHashMutex.tryLock(WAIT_TIME)) { // some callers are willing to get wrong answers but not block
|
||||
node = _nodeHash.value(nodeUUID);
|
||||
_nodeHashMutex.unlock();
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
SharedNodePointer LimitedNodeList::sendingNodeForPacket(const QByteArray& packet) {
|
||||
QUuid nodeUUID = uuidFromPacketHeader(packet);
|
||||
|
||||
// return the matching node, or NULL if there is no match
|
||||
return nodeWithUUID(nodeUUID);
|
||||
}
|
||||
|
||||
NodeHash LimitedNodeList::getNodeHash() {
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
return NodeHash(_nodeHash);
|
||||
}
|
||||
|
||||
void LimitedNodeList::eraseAllNodes() {
|
||||
qDebug() << "Clearing the NodeList. Deleting all nodes in list.";
|
||||
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
|
||||
NodeHash::iterator nodeItem = _nodeHash.begin();
|
||||
|
||||
// iterate the nodes in the list
|
||||
while (nodeItem != _nodeHash.end()) {
|
||||
nodeItem = killNodeAtHashIterator(nodeItem);
|
||||
}
|
||||
}
|
||||
|
||||
void LimitedNodeList::reset() {
|
||||
eraseAllNodes();
|
||||
}
|
||||
|
||||
void LimitedNodeList::killNodeWithUUID(const QUuid& nodeUUID) {
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
|
||||
NodeHash::iterator nodeItemToKill = _nodeHash.find(nodeUUID);
|
||||
if (nodeItemToKill != _nodeHash.end()) {
|
||||
killNodeAtHashIterator(nodeItemToKill);
|
||||
}
|
||||
}
|
||||
|
||||
NodeHash::iterator LimitedNodeList::killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill) {
|
||||
qDebug() << "Killed" << *nodeItemToKill.value();
|
||||
emit nodeKilled(nodeItemToKill.value());
|
||||
return _nodeHash.erase(nodeItemToKill);
|
||||
}
|
||||
|
||||
void LimitedNodeList::processKillNode(const QByteArray& dataByteArray) {
|
||||
// read the node id
|
||||
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader(dataByteArray), NUM_BYTES_RFC4122_UUID));
|
||||
|
||||
// kill the node with this UUID, if it exists
|
||||
killNodeWithUUID(nodeUUID);
|
||||
}
|
||||
|
||||
SharedNodePointer LimitedNodeList::addOrUpdateNode(const QUuid& uuid, char nodeType,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
|
||||
_nodeHashMutex.lock();
|
||||
|
||||
if (!_nodeHash.contains(uuid)) {
|
||||
|
||||
// we didn't have this node, so add them
|
||||
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
|
||||
SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater);
|
||||
|
||||
_nodeHash.insert(newNode->getUUID(), newNodeSharedPointer);
|
||||
|
||||
_nodeHashMutex.unlock();
|
||||
|
||||
qDebug() << "Added" << *newNode;
|
||||
|
||||
emit nodeAdded(newNodeSharedPointer);
|
||||
|
||||
return newNodeSharedPointer;
|
||||
} else {
|
||||
_nodeHashMutex.unlock();
|
||||
|
||||
return updateSocketsForNode(uuid, publicSocket, localSocket);
|
||||
}
|
||||
}
|
||||
|
||||
SharedNodePointer LimitedNodeList::updateSocketsForNode(const QUuid& uuid,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
|
||||
|
||||
SharedNodePointer matchingNode = nodeWithUUID(uuid);
|
||||
|
||||
if (matchingNode) {
|
||||
// perform appropriate updates to this node
|
||||
QMutexLocker locker(&matchingNode->getMutex());
|
||||
|
||||
// check if we need to change this node's public or local sockets
|
||||
if (publicSocket != matchingNode->getPublicSocket()) {
|
||||
matchingNode->setPublicSocket(publicSocket);
|
||||
qDebug() << "Public socket change for node" << *matchingNode;
|
||||
}
|
||||
|
||||
if (localSocket != matchingNode->getLocalSocket()) {
|
||||
matchingNode->setLocalSocket(localSocket);
|
||||
qDebug() << "Local socket change for node" << *matchingNode;
|
||||
}
|
||||
}
|
||||
|
||||
return matchingNode;
|
||||
}
|
||||
|
||||
unsigned LimitedNodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) {
|
||||
unsigned n = 0;
|
||||
|
||||
foreach (const SharedNodePointer& node, getNodeHash()) {
|
||||
// only send to the NodeTypes we are asked to send to.
|
||||
if (destinationNodeTypes.contains(node->getType())) {
|
||||
writeDatagram(packet, node);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
SharedNodePointer LimitedNodeList::soloNodeOfType(char nodeType) {
|
||||
|
||||
if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
|
||||
foreach (const SharedNodePointer& node, getNodeHash()) {
|
||||
if (node->getType() == nodeType) {
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
||||
return SharedNodePointer();
|
||||
}
|
||||
|
||||
void LimitedNodeList::getPacketStats(float& packetsPerSecond, float& bytesPerSecond) {
|
||||
packetsPerSecond = (float) _numCollectedPackets / ((float) _packetStatTimer.elapsed() / 1000.0f);
|
||||
bytesPerSecond = (float) _numCollectedBytes / ((float) _packetStatTimer.elapsed() / 1000.0f);
|
||||
}
|
||||
|
||||
void LimitedNodeList::resetPacketStats() {
|
||||
_numCollectedPackets = 0;
|
||||
_numCollectedBytes = 0;
|
||||
_packetStatTimer.restart();
|
||||
}
|
||||
|
||||
void LimitedNodeList::removeSilentNodes() {
|
||||
|
||||
_nodeHashMutex.lock();
|
||||
|
||||
NodeHash::iterator nodeItem = _nodeHash.begin();
|
||||
|
||||
while (nodeItem != _nodeHash.end()) {
|
||||
SharedNodePointer node = nodeItem.value();
|
||||
|
||||
node->getMutex().lock();
|
||||
|
||||
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > NODE_SILENCE_THRESHOLD_USECS) {
|
||||
// call our private method to kill this node (removes it and emits the right signal)
|
||||
nodeItem = killNodeAtHashIterator(nodeItem);
|
||||
} else {
|
||||
// we didn't kill this node, push the iterator forwards
|
||||
++nodeItem;
|
||||
}
|
||||
|
||||
node->getMutex().unlock();
|
||||
}
|
||||
|
||||
_nodeHashMutex.unlock();
|
||||
}
|
128
libraries/shared/src/LimitedNodeList.h
Normal file
128
libraries/shared/src/LimitedNodeList.h
Normal file
|
@ -0,0 +1,128 @@
|
|||
//
|
||||
// LimitedNodeList.h
|
||||
// hifi
|
||||
//
|
||||
// Created by Stephen Birarda on 2/15/13.
|
||||
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#ifndef __hifi__LimitedNodeList__
|
||||
#define __hifi__LimitedNodeList__
|
||||
|
||||
#ifdef _WIN32
|
||||
#include "Syssocket.h"
|
||||
#else
|
||||
#include <netinet/in.h>
|
||||
#endif
|
||||
#include <stdint.h>
|
||||
#include <iterator>
|
||||
|
||||
#ifndef _WIN32
|
||||
#include <unistd.h> // not on windows, not needed for mac or windows
|
||||
#endif
|
||||
|
||||
#include <QtCore/QElapsedTimer>
|
||||
#include <QtCore/QMutex>
|
||||
#include <QtCore/QSet>
|
||||
#include <QtCore/QSettings>
|
||||
#include <QtCore/QSharedPointer>
|
||||
#include <QtNetwork/QHostAddress>
|
||||
#include <QtNetwork/QUdpSocket>
|
||||
|
||||
#include <gnutls/gnutls.h>
|
||||
|
||||
#include "DomainHandler.h"
|
||||
#include "Node.h"
|
||||
|
||||
const quint64 NODE_SILENCE_THRESHOLD_USECS = 2 * 1000 * 1000;
|
||||
|
||||
extern const char SOLO_NODE_TYPES[2];
|
||||
|
||||
extern const QUrl DEFAULT_NODE_AUTH_URL;
|
||||
|
||||
const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost";
|
||||
|
||||
class HifiSockAddr;
|
||||
|
||||
typedef QSet<NodeType_t> NodeSet;
|
||||
|
||||
typedef QSharedPointer<Node> SharedNodePointer;
|
||||
typedef QHash<QUuid, SharedNodePointer> NodeHash;
|
||||
Q_DECLARE_METATYPE(SharedNodePointer)
|
||||
|
||||
class LimitedNodeList : public QObject {
|
||||
Q_OBJECT
|
||||
public:
|
||||
static LimitedNodeList* createInstance(unsigned short socketListenPort = 0, unsigned short dtlsPort = 0);
|
||||
static LimitedNodeList* getInstance();
|
||||
|
||||
QUdpSocket& getNodeSocket() { return _nodeSocket; }
|
||||
QUdpSocket& getDTLSSocket();
|
||||
|
||||
bool packetVersionAndHashMatch(const QByteArray& packet);
|
||||
|
||||
qint64 writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
|
||||
qint64 writeUnverifiedDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr);
|
||||
qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
|
||||
|
||||
void(*linkedDataCreateCallback)(Node *);
|
||||
|
||||
NodeHash getNodeHash();
|
||||
int size() const { return _nodeHash.size(); }
|
||||
|
||||
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true);
|
||||
SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
|
||||
|
||||
SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
|
||||
SharedNodePointer updateSocketsForNode(const QUuid& uuid,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
|
||||
|
||||
void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
|
||||
void processKillNode(const QByteArray& datagram);
|
||||
|
||||
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
|
||||
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
|
||||
|
||||
unsigned broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes);
|
||||
SharedNodePointer soloNodeOfType(char nodeType);
|
||||
|
||||
void getPacketStats(float &packetsPerSecond, float &bytesPerSecond);
|
||||
void resetPacketStats();
|
||||
public slots:
|
||||
void reset();
|
||||
void eraseAllNodes();
|
||||
|
||||
void removeSilentNodes();
|
||||
|
||||
void killNodeWithUUID(const QUuid& nodeUUID);
|
||||
signals:
|
||||
void nodeAdded(SharedNodePointer);
|
||||
void nodeKilled(SharedNodePointer);
|
||||
protected:
|
||||
static LimitedNodeList* _sharedInstance;
|
||||
|
||||
LimitedNodeList(unsigned short socketListenPort, unsigned short dtlsListenPort);
|
||||
LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
|
||||
void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
|
||||
|
||||
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
|
||||
const QUuid& connectionSecret);
|
||||
|
||||
NodeHash::iterator killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill);
|
||||
|
||||
|
||||
void changeSendSocketBufferSize(int numSendBytes);
|
||||
|
||||
NodeHash _nodeHash;
|
||||
QMutex _nodeHashMutex;
|
||||
QUdpSocket _nodeSocket;
|
||||
QUdpSocket* _dtlsSocket;
|
||||
int _numCollectedPackets;
|
||||
int _numCollectedBytes;
|
||||
QElapsedTimer _packetStatTimer;
|
||||
};
|
||||
|
||||
#endif /* defined(__hifi__LimitedNodeList__) */
|
|
@ -6,10 +6,6 @@
|
|||
// Copyright (c) 2013 High Fidelity, Inc. All rights reserved.
|
||||
//
|
||||
|
||||
#include <cstring>
|
||||
#include <cstdlib>
|
||||
#include <cstdio>
|
||||
|
||||
#include <QtCore/QDataStream>
|
||||
#include <QtCore/QDebug>
|
||||
#include <QtCore/QJsonDocument>
|
||||
|
@ -25,13 +21,6 @@
|
|||
#include "SharedUtil.h"
|
||||
#include "UUID.h"
|
||||
|
||||
const char SOLO_NODE_TYPES[2] = {
|
||||
NodeType::AvatarMixer,
|
||||
NodeType::AudioMixer
|
||||
};
|
||||
|
||||
const QUrl DEFAULT_NODE_AUTH_URL = QUrl("https://data-web.highfidelity.io");
|
||||
|
||||
NodeList* NodeList::_sharedInstance = NULL;
|
||||
|
||||
NodeList* NodeList::createInstance(char ownerType, unsigned short socketListenPort, unsigned short dtlsPort) {
|
||||
|
@ -57,12 +46,8 @@ NodeList* NodeList::getInstance() {
|
|||
return _sharedInstance;
|
||||
}
|
||||
|
||||
|
||||
NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned short dtlsListenPort) :
|
||||
_nodeHash(),
|
||||
_nodeHashMutex(QMutex::Recursive),
|
||||
_nodeSocket(this),
|
||||
_dtlsSocket(NULL),
|
||||
LimitedNodeList(socketListenPort, dtlsListenPort),
|
||||
_ownerType(newOwnerType),
|
||||
_nodeTypesOfInterest(),
|
||||
_sessionUUID(),
|
||||
|
@ -70,162 +55,13 @@ NodeList::NodeList(char newOwnerType, unsigned short socketListenPort, unsigned
|
|||
_assignmentServerSocket(),
|
||||
_publicSockAddr(),
|
||||
_hasCompletedInitialSTUNFailure(false),
|
||||
_stunRequestsSinceSuccess(0),
|
||||
_numCollectedPackets(0),
|
||||
_numCollectedBytes(0),
|
||||
_packetStatTimer()
|
||||
_stunRequestsSinceSuccess(0)
|
||||
{
|
||||
_nodeSocket.bind(QHostAddress::AnyIPv4, socketListenPort);
|
||||
qDebug() << "NodeList socket is listening on" << _nodeSocket.localPort();
|
||||
|
||||
if (dtlsListenPort > 0) {
|
||||
// only create the DTLS socket during constructor if a custom port is passed
|
||||
_dtlsSocket = new QUdpSocket(this);
|
||||
|
||||
_dtlsSocket->bind(QHostAddress::AnyIPv4, dtlsListenPort);
|
||||
qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort();
|
||||
}
|
||||
|
||||
// clear our NodeList when the domain changes
|
||||
connect(&_DomainHandler, &DomainHandler::hostnameChanged, this, &NodeList::reset);
|
||||
|
||||
// clear our NodeList when logout is requested
|
||||
connect(&AccountManager::getInstance(), &AccountManager::logoutComplete , this, &NodeList::reset);
|
||||
|
||||
const int LARGER_SNDBUF_SIZE = 1048576;
|
||||
changeSendSocketBufferSize(LARGER_SNDBUF_SIZE);
|
||||
|
||||
_packetStatTimer.start();
|
||||
}
|
||||
|
||||
QUdpSocket& NodeList::getDTLSSocket() {
|
||||
if (!_dtlsSocket) {
|
||||
// DTLS socket getter called but no DTLS socket exists, create it now
|
||||
_dtlsSocket = new QUdpSocket(this);
|
||||
|
||||
_dtlsSocket->bind(QHostAddress::AnyIPv4, 0, QAbstractSocket::DontShareAddress);
|
||||
qDebug() << "NodeList DTLS socket is listening on" << _dtlsSocket->localPort();
|
||||
}
|
||||
|
||||
return *_dtlsSocket;
|
||||
}
|
||||
|
||||
void NodeList::changeSendSocketBufferSize(int numSendBytes) {
|
||||
// change the socket send buffer size to be 1MB
|
||||
int oldBufferSize = 0;
|
||||
|
||||
#ifdef Q_OS_WIN
|
||||
int sizeOfInt = sizeof(oldBufferSize);
|
||||
#else
|
||||
unsigned int sizeOfInt = sizeof(oldBufferSize);
|
||||
#endif
|
||||
|
||||
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&oldBufferSize), &sizeOfInt);
|
||||
|
||||
setsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<const char*>(&numSendBytes),
|
||||
sizeof(numSendBytes));
|
||||
|
||||
int newBufferSize = 0;
|
||||
getsockopt(_nodeSocket.socketDescriptor(), SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&newBufferSize), &sizeOfInt);
|
||||
|
||||
qDebug() << "Changed socket send buffer size from" << oldBufferSize << "to" << newBufferSize << "bytes";
|
||||
}
|
||||
|
||||
bool NodeList::packetVersionAndHashMatch(const QByteArray& packet) {
|
||||
PacketType checkType = packetTypeForPacket(packet);
|
||||
if (packet[1] != versionForPacketType(checkType)
|
||||
&& checkType != PacketTypeStunResponse) {
|
||||
PacketType mismatchType = packetTypeForPacket(packet);
|
||||
int numPacketTypeBytes = numBytesArithmeticCodingFromBuffer(packet.data());
|
||||
|
||||
static QMultiMap<QUuid, PacketType> versionDebugSuppressMap;
|
||||
|
||||
QUuid senderUUID = uuidFromPacketHeader(packet);
|
||||
if (!versionDebugSuppressMap.contains(senderUUID, checkType)) {
|
||||
qDebug() << "Packet version mismatch on" << packetTypeForPacket(packet) << "- Sender"
|
||||
<< uuidFromPacketHeader(packet) << "sent" << qPrintable(QString::number(packet[numPacketTypeBytes])) << "but"
|
||||
<< qPrintable(QString::number(versionForPacketType(mismatchType))) << "expected.";
|
||||
|
||||
versionDebugSuppressMap.insert(senderUUID, checkType);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!NON_VERIFIED_PACKETS.contains(checkType)) {
|
||||
// figure out which node this is from
|
||||
SharedNodePointer sendingNode = sendingNodeForPacket(packet);
|
||||
if (sendingNode) {
|
||||
// check if the md5 hash in the header matches the hash we would expect
|
||||
if (hashFromPacketHeader(packet) == hashForPacketAndConnectionUUID(packet, sendingNode->getConnectionSecret())) {
|
||||
return true;
|
||||
} else {
|
||||
qDebug() << "Packet hash mismatch on" << checkType << "- Sender"
|
||||
<< uuidFromPacketHeader(packet);
|
||||
}
|
||||
} else {
|
||||
qDebug() << "Packet of type" << checkType << "received from unknown node with UUID"
|
||||
<< uuidFromPacketHeader(packet);
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
qint64 NodeList::writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
|
||||
const QUuid& connectionSecret) {
|
||||
QByteArray datagramCopy = datagram;
|
||||
|
||||
if (!connectionSecret.isNull()) {
|
||||
// setup the MD5 hash for source verification in the header
|
||||
replaceHashInPacketGivenConnectionUUID(datagramCopy, connectionSecret);
|
||||
}
|
||||
|
||||
// stat collection for packets
|
||||
++_numCollectedPackets;
|
||||
_numCollectedBytes += datagram.size();
|
||||
|
||||
qint64 bytesWritten = _nodeSocket.writeDatagram(datagramCopy,
|
||||
destinationSockAddr.getAddress(), destinationSockAddr.getPort());
|
||||
|
||||
if (bytesWritten < 0) {
|
||||
qDebug() << "ERROR in writeDatagram:" << _nodeSocket.error() << "-" << _nodeSocket.errorString();
|
||||
}
|
||||
|
||||
return bytesWritten;
|
||||
}
|
||||
|
||||
qint64 NodeList::writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr) {
|
||||
if (destinationNode) {
|
||||
// if we don't have an ovveriden address, assume they want to send to the node's active socket
|
||||
const HifiSockAddr* destinationSockAddr = &overridenSockAddr;
|
||||
if (overridenSockAddr.isNull()) {
|
||||
if (destinationNode->getActiveSocket()) {
|
||||
// use the node's active socket as the destination socket
|
||||
destinationSockAddr = destinationNode->getActiveSocket();
|
||||
} else {
|
||||
// we don't have a socket to send to, return 0
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
writeDatagram(datagram, *destinationSockAddr, destinationNode->getConnectionSecret());
|
||||
}
|
||||
|
||||
// didn't have a destinationNode to send to, return 0
|
||||
return 0;
|
||||
}
|
||||
|
||||
qint64 NodeList::writeUnverifiedDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr) {
|
||||
return writeDatagram(datagram, destinationSockAddr, QUuid());
|
||||
}
|
||||
|
||||
qint64 NodeList::writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr) {
|
||||
return writeDatagram(QByteArray(data, size), destinationNode, overridenSockAddr);
|
||||
}
|
||||
|
||||
qint64 NodeList::sendStatsToDomainServer(const QJsonObject& statsObject) {
|
||||
|
@ -315,85 +151,14 @@ void NodeList::processNodeData(const HifiSockAddr& senderSockAddr, const QByteAr
|
|||
break;
|
||||
}
|
||||
default:
|
||||
// the node decided not to do anything with this packet
|
||||
// if it comes from a known source we should keep that node alive
|
||||
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
|
||||
if (matchingNode) {
|
||||
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
}
|
||||
|
||||
LimitedNodeList::processNodeData(senderSockAddr, packet);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int NodeList::updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray &packet) {
|
||||
QMutexLocker locker(&matchingNode->getMutex());
|
||||
|
||||
matchingNode->setLastHeardMicrostamp(usecTimestampNow());
|
||||
matchingNode->recordBytesReceived(packet.size());
|
||||
|
||||
if (!matchingNode->getLinkedData() && linkedDataCreateCallback) {
|
||||
linkedDataCreateCallback(matchingNode.data());
|
||||
}
|
||||
|
||||
QMutexLocker linkedDataLocker(&matchingNode->getLinkedData()->getMutex());
|
||||
|
||||
return matchingNode->getLinkedData()->parseData(packet);
|
||||
}
|
||||
|
||||
int NodeList::findNodeAndUpdateWithDataFromPacket(const QByteArray& packet) {
|
||||
SharedNodePointer matchingNode = sendingNodeForPacket(packet);
|
||||
|
||||
if (matchingNode) {
|
||||
updateNodeWithDataFromPacket(matchingNode, packet);
|
||||
}
|
||||
|
||||
// we weren't able to match the sender address to the address we have for this node, unlock and don't parse
|
||||
return 0;
|
||||
}
|
||||
|
||||
SharedNodePointer NodeList::nodeWithUUID(const QUuid& nodeUUID, bool blockingLock) {
|
||||
const int WAIT_TIME = 10; // wait up to 10ms in the try lock case
|
||||
SharedNodePointer node;
|
||||
// if caller wants us to block and guarantee the correct answer, then honor that request
|
||||
if (blockingLock) {
|
||||
// this will block till we can get access
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
node = _nodeHash.value(nodeUUID);
|
||||
} else if (_nodeHashMutex.tryLock(WAIT_TIME)) { // some callers are willing to get wrong answers but not block
|
||||
node = _nodeHash.value(nodeUUID);
|
||||
_nodeHashMutex.unlock();
|
||||
}
|
||||
return node;
|
||||
}
|
||||
|
||||
SharedNodePointer NodeList::sendingNodeForPacket(const QByteArray& packet) {
|
||||
QUuid nodeUUID = uuidFromPacketHeader(packet);
|
||||
|
||||
// return the matching node, or NULL if there is no match
|
||||
return nodeWithUUID(nodeUUID);
|
||||
}
|
||||
|
||||
NodeHash NodeList::getNodeHash() {
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
return NodeHash(_nodeHash);
|
||||
}
|
||||
|
||||
void NodeList::eraseAllNodes() {
|
||||
qDebug() << "Clearing the NodeList. Deleting all nodes in list.";
|
||||
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
|
||||
NodeHash::iterator nodeItem = _nodeHash.begin();
|
||||
|
||||
// iterate the nodes in the list
|
||||
while (nodeItem != _nodeHash.end()) {
|
||||
nodeItem = killNodeAtHashIterator(nodeItem);
|
||||
}
|
||||
}
|
||||
|
||||
void NodeList::reset() {
|
||||
eraseAllNodes();
|
||||
LimitedNodeList::reset();
|
||||
|
||||
_numNoReplyDomainCheckIns = 0;
|
||||
|
||||
// refresh the owner UUID to the NULL UUID
|
||||
|
@ -547,29 +312,6 @@ void NodeList::processSTUNResponse(const QByteArray& packet) {
|
|||
}
|
||||
}
|
||||
|
||||
void NodeList::killNodeWithUUID(const QUuid& nodeUUID) {
|
||||
QMutexLocker locker(&_nodeHashMutex);
|
||||
|
||||
NodeHash::iterator nodeItemToKill = _nodeHash.find(nodeUUID);
|
||||
if (nodeItemToKill != _nodeHash.end()) {
|
||||
killNodeAtHashIterator(nodeItemToKill);
|
||||
}
|
||||
}
|
||||
|
||||
NodeHash::iterator NodeList::killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill) {
|
||||
qDebug() << "Killed" << *nodeItemToKill.value();
|
||||
emit nodeKilled(nodeItemToKill.value());
|
||||
return _nodeHash.erase(nodeItemToKill);
|
||||
}
|
||||
|
||||
void NodeList::processKillNode(const QByteArray& dataByteArray) {
|
||||
// read the node id
|
||||
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader(dataByteArray), NUM_BYTES_RFC4122_UUID));
|
||||
|
||||
// kill the node with this UUID, if it exists
|
||||
killNodeWithUUID(nodeUUID);
|
||||
}
|
||||
|
||||
void NodeList::sendDomainServerCheckIn() {
|
||||
if (_publicSockAddr.isNull() && !_hasCompletedInitialSTUNFailure) {
|
||||
// we don't know our public socket and we need to send it to the domain server
|
||||
|
@ -734,70 +476,6 @@ void NodeList::pingPublicAndLocalSocketsForInactiveNode(const SharedNodePointer&
|
|||
writeDatagram(publicPingPacket, node, node->getPublicSocket());
|
||||
}
|
||||
|
||||
SharedNodePointer NodeList::addOrUpdateNode(const QUuid& uuid, char nodeType,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
|
||||
_nodeHashMutex.lock();
|
||||
|
||||
if (!_nodeHash.contains(uuid)) {
|
||||
|
||||
// we didn't have this node, so add them
|
||||
Node* newNode = new Node(uuid, nodeType, publicSocket, localSocket);
|
||||
SharedNodePointer newNodeSharedPointer(newNode, &QObject::deleteLater);
|
||||
|
||||
_nodeHash.insert(newNode->getUUID(), newNodeSharedPointer);
|
||||
|
||||
_nodeHashMutex.unlock();
|
||||
|
||||
qDebug() << "Added" << *newNode;
|
||||
|
||||
emit nodeAdded(newNodeSharedPointer);
|
||||
|
||||
return newNodeSharedPointer;
|
||||
} else {
|
||||
_nodeHashMutex.unlock();
|
||||
|
||||
return updateSocketsForNode(uuid, publicSocket, localSocket);
|
||||
}
|
||||
}
|
||||
|
||||
SharedNodePointer NodeList::updateSocketsForNode(const QUuid& uuid,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket) {
|
||||
|
||||
SharedNodePointer matchingNode = nodeWithUUID(uuid);
|
||||
|
||||
if (matchingNode) {
|
||||
// perform appropriate updates to this node
|
||||
QMutexLocker locker(&matchingNode->getMutex());
|
||||
|
||||
// check if we need to change this node's public or local sockets
|
||||
if (publicSocket != matchingNode->getPublicSocket()) {
|
||||
matchingNode->setPublicSocket(publicSocket);
|
||||
qDebug() << "Public socket change for node" << *matchingNode;
|
||||
}
|
||||
|
||||
if (localSocket != matchingNode->getLocalSocket()) {
|
||||
matchingNode->setLocalSocket(localSocket);
|
||||
qDebug() << "Local socket change for node" << *matchingNode;
|
||||
}
|
||||
}
|
||||
|
||||
return matchingNode;
|
||||
}
|
||||
|
||||
unsigned NodeList::broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes) {
|
||||
unsigned n = 0;
|
||||
|
||||
foreach (const SharedNodePointer& node, getNodeHash()) {
|
||||
// only send to the NodeTypes we are asked to send to.
|
||||
if (destinationNodeTypes.contains(node->getType())) {
|
||||
writeDatagram(packet, node);
|
||||
++n;
|
||||
}
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
void NodeList::pingInactiveNodes() {
|
||||
foreach (const SharedNodePointer& node, getNodeHash()) {
|
||||
if (!node->getActiveSocket()) {
|
||||
|
@ -824,54 +502,6 @@ void NodeList::activateSocketFromNodeCommunication(const QByteArray& packet, con
|
|||
}
|
||||
}
|
||||
|
||||
SharedNodePointer NodeList::soloNodeOfType(char nodeType) {
|
||||
|
||||
if (memchr(SOLO_NODE_TYPES, nodeType, sizeof(SOLO_NODE_TYPES))) {
|
||||
foreach (const SharedNodePointer& node, getNodeHash()) {
|
||||
if (node->getType() == nodeType) {
|
||||
return node;
|
||||
}
|
||||
}
|
||||
}
|
||||
return SharedNodePointer();
|
||||
}
|
||||
|
||||
void NodeList::getPacketStats(float& packetsPerSecond, float& bytesPerSecond) {
|
||||
packetsPerSecond = (float) _numCollectedPackets / ((float) _packetStatTimer.elapsed() / 1000.0f);
|
||||
bytesPerSecond = (float) _numCollectedBytes / ((float) _packetStatTimer.elapsed() / 1000.0f);
|
||||
}
|
||||
|
||||
void NodeList::resetPacketStats() {
|
||||
_numCollectedPackets = 0;
|
||||
_numCollectedBytes = 0;
|
||||
_packetStatTimer.restart();
|
||||
}
|
||||
|
||||
void NodeList::removeSilentNodes() {
|
||||
|
||||
_nodeHashMutex.lock();
|
||||
|
||||
NodeHash::iterator nodeItem = _nodeHash.begin();
|
||||
|
||||
while (nodeItem != _nodeHash.end()) {
|
||||
SharedNodePointer node = nodeItem.value();
|
||||
|
||||
node->getMutex().lock();
|
||||
|
||||
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > NODE_SILENCE_THRESHOLD_USECS) {
|
||||
// call our private method to kill this node (removes it and emits the right signal)
|
||||
nodeItem = killNodeAtHashIterator(nodeItem);
|
||||
} else {
|
||||
// we didn't kill this node, push the iterator forwards
|
||||
++nodeItem;
|
||||
}
|
||||
|
||||
node->getMutex().unlock();
|
||||
}
|
||||
|
||||
_nodeHashMutex.unlock();
|
||||
}
|
||||
|
||||
const QString QSETTINGS_GROUP_NAME = "NodeList";
|
||||
const QString DOMAIN_SERVER_SETTING_KEY = "domainServerHostname";
|
||||
|
||||
|
|
|
@ -32,28 +32,14 @@
|
|||
#include <gnutls/gnutls.h>
|
||||
|
||||
#include "DomainHandler.h"
|
||||
#include "LimitedNodeList.h"
|
||||
#include "Node.h"
|
||||
|
||||
const quint64 NODE_SILENCE_THRESHOLD_USECS = 2 * 1000 * 1000;
|
||||
const quint64 DOMAIN_SERVER_CHECK_IN_USECS = 1 * 1000000;
|
||||
const quint64 PING_INACTIVE_NODE_INTERVAL_USECS = 1 * 1000 * 1000;
|
||||
|
||||
extern const char SOLO_NODE_TYPES[2];
|
||||
|
||||
extern const QUrl DEFAULT_NODE_AUTH_URL;
|
||||
|
||||
const char DEFAULT_ASSIGNMENT_SERVER_HOSTNAME[] = "localhost";
|
||||
|
||||
const int MAX_SILENT_DOMAIN_SERVER_CHECK_INS = 5;
|
||||
|
||||
class Assignment;
|
||||
class HifiSockAddr;
|
||||
|
||||
typedef QSet<NodeType_t> NodeSet;
|
||||
|
||||
typedef QSharedPointer<Node> SharedNodePointer;
|
||||
typedef QHash<QUuid, SharedNodePointer> NodeHash;
|
||||
Q_DECLARE_METATYPE(SharedNodePointer)
|
||||
|
||||
typedef quint8 PingType_t;
|
||||
namespace PingType {
|
||||
|
@ -62,7 +48,7 @@ namespace PingType {
|
|||
const PingType_t Public = 2;
|
||||
}
|
||||
|
||||
class NodeList : public QObject {
|
||||
class NodeList : public LimitedNodeList {
|
||||
Q_OBJECT
|
||||
public:
|
||||
static NodeList* createInstance(char ownerType, unsigned short socketListenPort = 0, unsigned short dtlsPort = 0);
|
||||
|
@ -73,23 +59,8 @@ public:
|
|||
const QUuid& getSessionUUID() const { return _sessionUUID; }
|
||||
void setSessionUUID(const QUuid& sessionUUID);
|
||||
|
||||
QUdpSocket& getNodeSocket() { return _nodeSocket; }
|
||||
QUdpSocket& getDTLSSocket();
|
||||
|
||||
bool packetVersionAndHashMatch(const QByteArray& packet);
|
||||
|
||||
qint64 writeDatagram(const QByteArray& datagram, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
|
||||
qint64 writeUnverifiedDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr);
|
||||
qint64 writeDatagram(const char* data, qint64 size, const SharedNodePointer& destinationNode,
|
||||
const HifiSockAddr& overridenSockAddr = HifiSockAddr());
|
||||
qint64 sendStatsToDomainServer(const QJsonObject& statsObject);
|
||||
|
||||
void(*linkedDataCreateCallback)(Node *);
|
||||
|
||||
NodeHash getNodeHash();
|
||||
int size() const { return _nodeHash.size(); }
|
||||
|
||||
int getNumNoReplyDomainCheckIns() const { return _numNoReplyDomainCheckIns; }
|
||||
DomainHandler& getDomainHandler() { return _DomainHandler; }
|
||||
|
||||
|
@ -98,6 +69,7 @@ public:
|
|||
void addSetOfNodeTypesToNodeInterestSet(const NodeSet& setOfNodeTypes);
|
||||
void resetNodeInterestSet() { _nodeTypesOfInterest.clear(); }
|
||||
|
||||
void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
|
||||
int processDomainServerList(const QByteArray& packet);
|
||||
|
||||
void setAssignmentServerSocket(const HifiSockAddr& serverSocket) { _assignmentServerSocket = serverSocket; }
|
||||
|
@ -106,42 +78,15 @@ public:
|
|||
QByteArray constructPingPacket(PingType_t pingType = PingType::Agnostic);
|
||||
QByteArray constructPingReplyPacket(const QByteArray& pingPacket);
|
||||
void pingPublicAndLocalSocketsForInactiveNode(const SharedNodePointer& node);
|
||||
|
||||
SharedNodePointer nodeWithUUID(const QUuid& nodeUUID, bool blockingLock = true);
|
||||
SharedNodePointer sendingNodeForPacket(const QByteArray& packet);
|
||||
|
||||
SharedNodePointer addOrUpdateNode(const QUuid& uuid, char nodeType,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
|
||||
SharedNodePointer updateSocketsForNode(const QUuid& uuid,
|
||||
const HifiSockAddr& publicSocket, const HifiSockAddr& localSocket);
|
||||
|
||||
void processNodeData(const HifiSockAddr& senderSockAddr, const QByteArray& packet);
|
||||
void processKillNode(const QByteArray& datagram);
|
||||
|
||||
int updateNodeWithDataFromPacket(const SharedNodePointer& matchingNode, const QByteArray& packet);
|
||||
int findNodeAndUpdateWithDataFromPacket(const QByteArray& packet);
|
||||
|
||||
unsigned broadcastToNodes(const QByteArray& packet, const NodeSet& destinationNodeTypes);
|
||||
SharedNodePointer soloNodeOfType(char nodeType);
|
||||
|
||||
void getPacketStats(float &packetsPerSecond, float &bytesPerSecond);
|
||||
void resetPacketStats();
|
||||
|
||||
void loadData(QSettings* settings);
|
||||
void saveData(QSettings* settings);
|
||||
public slots:
|
||||
void reset();
|
||||
void eraseAllNodes();
|
||||
|
||||
void sendDomainServerCheckIn();
|
||||
void pingInactiveNodes();
|
||||
void removeSilentNodes();
|
||||
|
||||
void killNodeWithUUID(const QUuid& nodeUUID);
|
||||
signals:
|
||||
void uuidChanged(const QUuid& ownerUUID);
|
||||
void nodeAdded(SharedNodePointer);
|
||||
void nodeKilled(SharedNodePointer);
|
||||
void limitOfSilentDomainCheckInsReached();
|
||||
private:
|
||||
static NodeList* _sharedInstance;
|
||||
|
@ -152,22 +97,11 @@ private:
|
|||
void sendSTUNRequest();
|
||||
void processSTUNResponse(const QByteArray& packet);
|
||||
|
||||
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
|
||||
const QUuid& connectionSecret);
|
||||
|
||||
NodeHash::iterator killNodeAtHashIterator(NodeHash::iterator& nodeItemToKill);
|
||||
|
||||
void processDomainServerAuthRequest(const QByteArray& packet);
|
||||
void requestAuthForDomainServer();
|
||||
void activateSocketFromNodeCommunication(const QByteArray& packet, const SharedNodePointer& sendingNode);
|
||||
void timePingReply(const QByteArray& packet, const SharedNodePointer& sendingNode);
|
||||
|
||||
void changeSendSocketBufferSize(int numSendBytes);
|
||||
|
||||
NodeHash _nodeHash;
|
||||
QMutex _nodeHashMutex;
|
||||
QUdpSocket _nodeSocket;
|
||||
QUdpSocket* _dtlsSocket;
|
||||
NodeType_t _ownerType;
|
||||
NodeSet _nodeTypesOfInterest;
|
||||
DomainHandler _DomainHandler;
|
||||
|
@ -177,9 +111,6 @@ private:
|
|||
HifiSockAddr _publicSockAddr;
|
||||
bool _hasCompletedInitialSTUNFailure;
|
||||
unsigned int _stunRequestsSinceSuccess;
|
||||
int _numCollectedPackets;
|
||||
int _numCollectedBytes;
|
||||
QElapsedTimer _packetStatTimer;
|
||||
};
|
||||
|
||||
#endif /* defined(__hifi__NodeList__) */
|
||||
|
|
Loading…
Reference in a new issue