continued STUN requests for keepalive, propagate socket changes

This commit is contained in:
Stephen Birarda 2013-10-18 15:05:16 -07:00
parent 8d8278daef
commit 32680bbaad
4 changed files with 173 additions and 128 deletions

View file

@ -33,17 +33,8 @@ Node::Node(const QUuid& uuid, char type, sockaddr* publicSocket, sockaddr* local
_linkedData(NULL), _linkedData(NULL),
_isAlive(true) _isAlive(true)
{ {
if (publicSocket) { setPublicSocket(publicSocket);
_publicSocket = new sockaddr(*publicSocket); setLocalSocket(localSocket);
} else {
_publicSocket = NULL;
}
if (localSocket) {
_localSocket = new sockaddr(*localSocket);
} else {
_localSocket = NULL;
}
pthread_mutex_init(&_mutex, 0); pthread_mutex_init(&_mutex, 0);
} }
@ -95,6 +86,32 @@ const char* Node::getTypeName() const {
} }
} }
void Node::setPublicSocket(sockaddr* publicSocket) {
if (_activeSocket == _publicSocket) {
// if the active socket was the public socket then reset it to NULL
_activeSocket = NULL;
}
if (publicSocket) {
_publicSocket = new sockaddr(*publicSocket);
} else {
_publicSocket = NULL;
}
}
void Node::setLocalSocket(sockaddr* localSocket) {
if (_activeSocket == _localSocket) {
// if the active socket was the local socket then reset it to NULL
_activeSocket = NULL;
}
if (localSocket) {
_localSocket = new sockaddr(*localSocket);
} else {
_localSocket = NULL;
}
}
void Node::activateLocalSocket() { void Node::activateLocalSocket() {
qDebug() << "Activating local socket for node" << *this << "\n"; qDebug() << "Activating local socket for node" << *this << "\n";
_activeSocket = _localSocket; _activeSocket = _localSocket;

View file

@ -47,9 +47,9 @@ public:
void setLastHeardMicrostamp(uint64_t lastHeardMicrostamp) { _lastHeardMicrostamp = lastHeardMicrostamp; } void setLastHeardMicrostamp(uint64_t lastHeardMicrostamp) { _lastHeardMicrostamp = lastHeardMicrostamp; }
sockaddr* getPublicSocket() const { return _publicSocket; } sockaddr* getPublicSocket() const { return _publicSocket; }
void setPublicSocket(sockaddr* publicSocket) { _publicSocket = publicSocket; } void setPublicSocket(sockaddr* publicSocket);
sockaddr* getLocalSocket() const { return _localSocket; } sockaddr* getLocalSocket() const { return _localSocket; }
void setLocalSocket(sockaddr* localSocket) { _localSocket = localSocket; } void setLocalSocket(sockaddr* localSocket);
sockaddr* getActiveSocket() const { return _activeSocket; } sockaddr* getActiveSocket() const { return _activeSocket; }

View file

@ -71,11 +71,10 @@ NodeList::NodeList(char newOwnerType, unsigned short int newSocketListenPort) :
_ownerUUID(QUuid::createUuid()), _ownerUUID(QUuid::createUuid()),
_numNoReplyDomainCheckIns(0), _numNoReplyDomainCheckIns(0),
_assignmentServerSocket(NULL), _assignmentServerSocket(NULL),
_checkInPacket(NULL),
_numBytesCheckInPacket(0),
_publicAddress(), _publicAddress(),
_publicPort(0), _publicPort(0),
_shouldUseDomainServerAsSTUN(0) _hasCompletedInitialSTUNFailure(false),
_stunRequestsSinceSuccess(0)
{ {
} }
@ -288,11 +287,6 @@ void NodeList::reset() {
clear(); clear();
_numNoReplyDomainCheckIns = 0; _numNoReplyDomainCheckIns = 0;
delete[] _checkInPacket;
_checkInPacket = NULL;
_numBytesCheckInPacket = 0;
delete _nodeTypesOfInterest; delete _nodeTypesOfInterest;
_nodeTypesOfInterest = NULL; _nodeTypesOfInterest = NULL;
@ -316,62 +310,67 @@ void NodeList::sendSTUNRequest() {
const char STUN_SERVER_HOSTNAME[] = "stun.highfidelity.io"; const char STUN_SERVER_HOSTNAME[] = "stun.highfidelity.io";
const unsigned short STUN_SERVER_PORT = 3478; const unsigned short STUN_SERVER_PORT = 3478;
static int failedStunRequests = 0; unsigned char stunRequestPacket[NUM_BYTES_STUN_HEADER];
if (failedStunRequests < NUM_STUN_REQUESTS_BEFORE_FALLBACK) { int packetIndex = 0;
unsigned char stunRequestPacket[NUM_BYTES_STUN_HEADER];
const uint32_t RFC_5389_MAGIC_COOKIE_NETWORK_ORDER = htonl(RFC_5389_MAGIC_COOKIE);
int packetIndex = 0;
// leading zeros + message type
const uint32_t RFC_5389_MAGIC_COOKIE_NETWORK_ORDER = htonl(RFC_5389_MAGIC_COOKIE); const uint16_t REQUEST_MESSAGE_TYPE = htons(0x0001);
memcpy(stunRequestPacket + packetIndex, &REQUEST_MESSAGE_TYPE, sizeof(REQUEST_MESSAGE_TYPE));
// leading zeros + message type packetIndex += sizeof(REQUEST_MESSAGE_TYPE);
const uint16_t REQUEST_MESSAGE_TYPE = htons(0x0001);
memcpy(stunRequestPacket + packetIndex, &REQUEST_MESSAGE_TYPE, sizeof(REQUEST_MESSAGE_TYPE)); // message length (no additional attributes are included)
packetIndex += sizeof(REQUEST_MESSAGE_TYPE); uint16_t messageLength = 0;
memcpy(stunRequestPacket + packetIndex, &messageLength, sizeof(messageLength));
// message length (no additional attributes are included) packetIndex += sizeof(messageLength);
uint16_t messageLength = 0;
memcpy(stunRequestPacket + packetIndex, &messageLength, sizeof(messageLength)); memcpy(stunRequestPacket + packetIndex, &RFC_5389_MAGIC_COOKIE_NETWORK_ORDER, sizeof(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER));
packetIndex += sizeof(messageLength); packetIndex += sizeof(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER);
memcpy(stunRequestPacket + packetIndex, &RFC_5389_MAGIC_COOKIE_NETWORK_ORDER, sizeof(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER)); // transaction ID (random 12-byte unsigned integer)
packetIndex += sizeof(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER); const uint NUM_TRANSACTION_ID_BYTES = 12;
unsigned char transactionID[NUM_TRANSACTION_ID_BYTES];
// transaction ID (random 12-byte unsigned integer) loadRandomIdentifier(transactionID, NUM_TRANSACTION_ID_BYTES);
const uint NUM_TRANSACTION_ID_BYTES = 12; memcpy(stunRequestPacket + packetIndex, &transactionID, sizeof(transactionID));
unsigned char transactionID[NUM_TRANSACTION_ID_BYTES];
loadRandomIdentifier(transactionID, NUM_TRANSACTION_ID_BYTES); // lookup the IP for the STUN server
memcpy(stunRequestPacket + packetIndex, &transactionID, sizeof(transactionID)); static QHostInfo stunInfo = QHostInfo::fromName(STUN_SERVER_HOSTNAME);
// lookup the IP for the STUN server for (int i = 0; i < stunInfo.addresses().size(); i++) {
static QHostInfo stunInfo = QHostInfo::fromName(STUN_SERVER_HOSTNAME); if (stunInfo.addresses()[i].protocol() == QAbstractSocket::IPv4Protocol) {
QString stunIPAddress = stunInfo.addresses()[i].toString();
for (int i = 0; i < stunInfo.addresses().size(); i++) {
if (stunInfo.addresses()[i].protocol() == QAbstractSocket::IPv4Protocol) { if (!_hasCompletedInitialSTUNFailure) {
QString stunIPAddress = stunInfo.addresses()[i].toString(); qDebug("Sending intial stun request to %s\n", stunIPAddress.toLocal8Bit().constData());
qDebug("Sending a stun request to %s\n", stunIPAddress.toLocal8Bit().constData());
_nodeSocket.send(stunIPAddress.toLocal8Bit().constData(),
STUN_SERVER_PORT,
stunRequestPacket,
sizeof(stunRequestPacket));
break;
} }
_nodeSocket.send(stunIPAddress.toLocal8Bit().constData(),
STUN_SERVER_PORT,
stunRequestPacket,
sizeof(stunRequestPacket));
break;
} }
failedStunRequests++;
return;
} }
// if we're here this was the last failed STUN request _stunRequestsSinceSuccess++;
// use our DS as our stun server
qDebug("Failed to lookup public address via STUN server at %s:%hu. Using DS for STUN.\n", if (_stunRequestsSinceSuccess >= NUM_STUN_REQUESTS_BEFORE_FALLBACK) {
STUN_SERVER_HOSTNAME, STUN_SERVER_PORT); if (!_hasCompletedInitialSTUNFailure) {
_shouldUseDomainServerAsSTUN = true; // if we're here this was the last failed STUN request
// use our DS as our stun server
qDebug("Failed to lookup public address via STUN server at %s:%hu. Using DS for STUN.\n",
STUN_SERVER_HOSTNAME, STUN_SERVER_PORT);
_hasCompletedInitialSTUNFailure = true;
}
// reset the public address and port
_publicAddress = QHostAddress::Null;
_publicPort = 0;
}
} }
void NodeList::processSTUNResponse(unsigned char* packetData, size_t dataBytes) { void NodeList::processSTUNResponse(unsigned char* packetData, size_t dataBytes) {
@ -395,6 +394,9 @@ void NodeList::processSTUNResponse(unsigned char* packetData, size_t dataBytes)
const int NUM_BYTES_FAMILY_ALIGN = 1; const int NUM_BYTES_FAMILY_ALIGN = 1;
const uint8_t IPV4_FAMILY_NETWORK_ORDER = htons(0x01) >> 8; const uint8_t IPV4_FAMILY_NETWORK_ORDER = htons(0x01) >> 8;
// reset the number of failed STUN requests since last success
_stunRequestsSinceSuccess = 0;
int byteIndex = attributeStartIndex + NUM_BYTES_STUN_ATTR_TYPE_AND_LENGTH + NUM_BYTES_FAMILY_ALIGN; int byteIndex = attributeStartIndex + NUM_BYTES_STUN_ATTR_TYPE_AND_LENGTH + NUM_BYTES_FAMILY_ALIGN;
uint8_t addressFamily = 0; uint8_t addressFamily = 0;
@ -416,12 +418,20 @@ void NodeList::processSTUNResponse(unsigned char* packetData, size_t dataBytes)
memcpy(&xorMappedAddress, packetData + byteIndex, sizeof(xorMappedAddress)); memcpy(&xorMappedAddress, packetData + byteIndex, sizeof(xorMappedAddress));
uint32_t stunAddress = ntohl(xorMappedAddress) ^ ntohl(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER); uint32_t stunAddress = ntohl(xorMappedAddress) ^ ntohl(RFC_5389_MAGIC_COOKIE_NETWORK_ORDER);
_publicAddress = QHostAddress(stunAddress);
qDebug("Public socket received from STUN server is %s:%hu\n", QHostAddress newPublicAddress = QHostAddress(stunAddress);
_publicAddress.toString().toLocal8Bit().constData(),
_publicPort);
if (newPublicAddress != _publicAddress) {
_publicAddress = newPublicAddress;
qDebug("New public socket received from STUN server is %s:%hu\n",
_publicAddress.toString().toLocal8Bit().constData(),
_publicPort);
}
_hasCompletedInitialSTUNFailure = true;
break; break;
} }
} else { } else {
@ -469,62 +479,66 @@ void NodeList::sendDomainServerCheckIn() {
printedDomainServerIP = true; printedDomainServerIP = true;
} }
if (_publicAddress.isNull() && !_shouldUseDomainServerAsSTUN) { if (_publicAddress.isNull() && !_hasCompletedInitialSTUNFailure) {
// we don't know our public socket and we need to send it to the domain server // we don't know our public socket and we need to send it to the domain server
// send a STUN request to figure it out // send a STUN request to figure it out
sendSTUNRequest(); sendSTUNRequest();
} else { } else {
// construct the DS check in packet if we need to // construct the DS check in packet if we need to
if (!_checkInPacket) { int numBytesNodesOfInterest = _nodeTypesOfInterest ? strlen((char*) _nodeTypesOfInterest) : 0;
int numBytesNodesOfInterest = _nodeTypesOfInterest ? strlen((char*) _nodeTypesOfInterest) : 0;
const int IP_ADDRESS_BYTES = 4;
const int IP_ADDRESS_BYTES = 4;
// check in packet has header, optional UUID, node type, port, IP, node types of interest, null termination
// check in packet has header, optional UUID, node type, port, IP, node types of interest, null termination int numPacketBytes = sizeof(PACKET_TYPE) + sizeof(PACKET_VERSION) + sizeof(NODE_TYPE) +
int numPacketBytes = sizeof(PACKET_TYPE) + sizeof(PACKET_VERSION) + sizeof(NODE_TYPE) + NUM_BYTES_RFC4122_UUID + (2 * (sizeof(uint16_t) + IP_ADDRESS_BYTES)) +
NUM_BYTES_RFC4122_UUID + (2 * (sizeof(uint16_t) + IP_ADDRESS_BYTES)) + numBytesNodesOfInterest + sizeof(unsigned char);
numBytesNodesOfInterest + sizeof(unsigned char);
unsigned char* checkInPacket = new unsigned char[numPacketBytes];
_checkInPacket = new unsigned char[numPacketBytes]; unsigned char* packetPosition = checkInPacket;
unsigned char* packetPosition = _checkInPacket;
PACKET_TYPE nodePacketType = (memchr(SOLO_NODE_TYPES, _ownerType, sizeof(SOLO_NODE_TYPES)))
PACKET_TYPE nodePacketType = (memchr(SOLO_NODE_TYPES, _ownerType, sizeof(SOLO_NODE_TYPES))) ? PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY
? PACKET_TYPE_DOMAIN_REPORT_FOR_DUTY : PACKET_TYPE_DOMAIN_LIST_REQUEST;
: PACKET_TYPE_DOMAIN_LIST_REQUEST;
packetPosition += populateTypeAndVersion(packetPosition, nodePacketType);
packetPosition += populateTypeAndVersion(packetPosition, nodePacketType);
*(packetPosition++) = _ownerType;
*(packetPosition++) = _ownerType;
// send our owner UUID or the null one
// send our owner UUID or the null one QByteArray rfcOwnerUUID = _ownerUUID.toRfc4122();
QByteArray rfcOwnerUUID = _ownerUUID.toRfc4122(); memcpy(packetPosition, rfcOwnerUUID.constData(), rfcOwnerUUID.size());
memcpy(packetPosition, rfcOwnerUUID.constData(), rfcOwnerUUID.size()); packetPosition += rfcOwnerUUID.size();
packetPosition += rfcOwnerUUID.size();
// pack our public address to send to domain-server
// pack our public address to send to domain-server packetPosition += packSocket(checkInPacket + (packetPosition - checkInPacket),
packetPosition += packSocket(_checkInPacket + (packetPosition - _checkInPacket), htonl(_publicAddress.toIPv4Address()), htons(_publicPort));
htonl(_publicAddress.toIPv4Address()), htons(_publicPort));
// pack our local address to send to domain-server
// pack our local address to send to domain-server packetPosition += packSocket(checkInPacket + (packetPosition - checkInPacket),
packetPosition += packSocket(_checkInPacket + (packetPosition - _checkInPacket), getLocalAddress(),
getLocalAddress(), htons(_nodeSocket.getListeningPort()));
htons(_nodeSocket.getListeningPort()));
// add the number of bytes for node types of interest
// add the number of bytes for node types of interest *(packetPosition++) = numBytesNodesOfInterest;
*(packetPosition++) = numBytesNodesOfInterest;
// copy over the bytes for node types of interest, if required
// copy over the bytes for node types of interest, if required if (numBytesNodesOfInterest > 0) {
if (numBytesNodesOfInterest > 0) { memcpy(packetPosition,
memcpy(packetPosition, _nodeTypesOfInterest,
_nodeTypesOfInterest, numBytesNodesOfInterest);
numBytesNodesOfInterest); packetPosition += numBytesNodesOfInterest;
packetPosition += numBytesNodesOfInterest;
}
_numBytesCheckInPacket = packetPosition - _checkInPacket;
} }
_nodeSocket.send(_domainIP.toString().toLocal8Bit().constData(), _domainPort, _checkInPacket, _numBytesCheckInPacket); _nodeSocket.send(_domainIP.toString().toLocal8Bit().constData(), _domainPort, checkInPacket, packetPosition - checkInPacket);
const int NUM_DOMAIN_SERVER_CHECKINS_PER_STUN_REQUEST = 5;
static unsigned int numDomainCheckins = 0;
// send a STUN request every Nth domain server check in so we update our public socket, if required
if (numDomainCheckins++ % NUM_DOMAIN_SERVER_CHECKINS_PER_STUN_REQUEST == 0) {
sendSTUNRequest();
}
// increment the count of un-replied check-ins // increment the count of un-replied check-ins
_numNoReplyDomainCheckIns++; _numNoReplyDomainCheckIns++;
@ -622,6 +636,8 @@ Node* NodeList::addOrUpdateNode(const QUuid& uuid, char nodeType, sockaddr* publ
return newNode; return newNode;
} else { } else {
node->lock();
if (node->getType() == NODE_TYPE_AUDIO_MIXER || if (node->getType() == NODE_TYPE_AUDIO_MIXER ||
node->getType() == NODE_TYPE_VOXEL_SERVER) { node->getType() == NODE_TYPE_VOXEL_SERVER) {
// until the Audio class also uses our nodeList, we need to update // until the Audio class also uses our nodeList, we need to update
@ -629,6 +645,19 @@ Node* NodeList::addOrUpdateNode(const QUuid& uuid, char nodeType, sockaddr* publ
node->setLastHeardMicrostamp(usecTimestampNow()); node->setLastHeardMicrostamp(usecTimestampNow());
} }
// check if we need to change this node's public or local sockets
if (!socketMatch(publicSocket, node->getPublicSocket())) {
node->setPublicSocket(publicSocket);
qDebug() << "Public socket change for node" << *node << "\n";
}
if (!socketMatch(localSocket, node->getLocalSocket())) {
node->setLocalSocket(localSocket);
qDebug() << "Local socket change for node" << *node << "\n";
}
node->unlock();
// we had this node already, do nothing for now // we had this node already, do nothing for now
return &*node; return &*node;
} }

View file

@ -167,11 +167,10 @@ private:
pthread_t checkInWithDomainServerThread; pthread_t checkInWithDomainServerThread;
int _numNoReplyDomainCheckIns; int _numNoReplyDomainCheckIns;
sockaddr* _assignmentServerSocket; sockaddr* _assignmentServerSocket;
uchar* _checkInPacket;
int _numBytesCheckInPacket;
QHostAddress _publicAddress; QHostAddress _publicAddress;
uint16_t _publicPort; uint16_t _publicPort;
bool _shouldUseDomainServerAsSTUN; bool _hasCompletedInitialSTUNFailure;
unsigned int _stunRequestsSinceSuccess;
void activateSocketFromNodeCommunication(sockaddr *nodeAddress); void activateSocketFromNodeCommunication(sockaddr *nodeAddress);
void timePingReply(sockaddr *nodeAddress, unsigned char *packetData); void timePingReply(sockaddr *nodeAddress, unsigned char *packetData);