diff --git a/assignment-client/src/audio/AudioMixer.cpp b/assignment-client/src/audio/AudioMixer.cpp index 1b44c2e905..31de1b6317 100644 --- a/assignment-client/src/audio/AudioMixer.cpp +++ b/assignment-client/src/audio/AudioMixer.cpp @@ -81,6 +81,9 @@ void AudioMixer::run() { NodeList *nodeList = NodeList::getInstance(); nodeList->setOwnerType(NODE_TYPE_AUDIO_MIXER); + const char AUDIO_MIXER_NODE_TYPES_OF_INTEREST[2] = { NODE_TYPE_AGENT, NODE_TYPE_AUDIO_INJECTOR }; + nodeList->setNodeTypesOfInterest(AUDIO_MIXER_NODE_TYPES_OF_INTEREST, sizeof(AUDIO_MIXER_NODE_TYPES_OF_INTEREST)); + ssize_t receivedBytes = 0; nodeList->linkedDataCreateCallback = attachNewBufferToNode; @@ -144,6 +147,9 @@ void AudioMixer::run() { } } + // get the NodeList to ping any inactive nodes, for hole punching + nodeList->possiblyPingInactiveNodes(); + for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { PositionalAudioRingBuffer* positionalRingBuffer = (PositionalAudioRingBuffer*) node->getLinkedData(); if (positionalRingBuffer && positionalRingBuffer->shouldBeAddedToMix(JITTER_BUFFER_SAMPLES)) { @@ -353,39 +359,24 @@ void AudioMixer::run() { // pull any new audio data from nodes off of the network stack while (nodeList->getNodeSocket()->receive(nodeAddress, packetData, &receivedBytes) && packetVersionMatch(packetData)) { - if (packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO || - packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO) { + if (packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO + || packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO + || packetData[0] == PACKET_TYPE_INJECT_AUDIO) { - unsigned char* currentBuffer = packetData + numBytesForPacketHeader(packetData); - QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*) currentBuffer, NUM_BYTES_RFC4122_UUID)); - - Node* avatarNode = nodeList->addOrUpdateNode(nodeUUID, - NODE_TYPE_AGENT, - nodeAddress, - nodeAddress); - - // temp activation of public socket before server ping/reply is setup - if (!avatarNode->getActiveSocket()) { - avatarNode->activatePublicSocket(); - } - - nodeList->updateNodeWithData(nodeAddress, packetData, receivedBytes); - - if (std::isnan(((PositionalAudioRingBuffer *)avatarNode->getLinkedData())->getOrientation().x)) { - // kill off this node - temporary solution to mixer crash on mac sleep - avatarNode->setAlive(false); - } - } else if (packetData[0] == PACKET_TYPE_INJECT_AUDIO) { QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*) packetData + numBytesForPacketHeader(packetData), NUM_BYTES_RFC4122_UUID)); - Node* matchingInjector = nodeList->addOrUpdateNode(nodeUUID, - NODE_TYPE_AUDIO_INJECTOR, - NULL, - NULL); + Node* matchingNode = nodeList->nodeWithUUID(nodeUUID); - // give the new audio data to the matching injector node - nodeList->updateNodeWithData(matchingInjector, packetData, receivedBytes); + if (matchingNode) { + nodeList->updateNodeWithData(matchingNode, nodeAddress, packetData, receivedBytes); + + if (packetData[0] != PACKET_TYPE_INJECT_AUDIO + && std::isnan(((PositionalAudioRingBuffer *)matchingNode->getLinkedData())->getOrientation().x)) { + // kill off this node - temporary solution to mixer crash on mac sleep + matchingNode->setAlive(false); + } + } } else { // let processNodeData handle it. nodeList->processNodeData(nodeAddress, packetData, receivedBytes); diff --git a/assignment-client/src/avatars/AvatarMixer.cpp b/assignment-client/src/avatars/AvatarMixer.cpp index b515f1400c..4e58f0b014 100644 --- a/assignment-client/src/avatars/AvatarMixer.cpp +++ b/assignment-client/src/avatars/AvatarMixer.cpp @@ -97,6 +97,8 @@ void AvatarMixer::run() { NodeList* nodeList = NodeList::getInstance(); nodeList->setOwnerType(NODE_TYPE_AVATAR_MIXER); + nodeList->setNodeTypesOfInterest(&NODE_TYPE_AGENT, 1); + nodeList->linkedDataCreateCallback = attachAvatarDataToNode; nodeList->startSilentNodeRemovalThread(); @@ -123,6 +125,8 @@ void AvatarMixer::run() { NodeList::getInstance()->sendDomainServerCheckIn(); } + nodeList->possiblyPingInactiveNodes(); + if (nodeList->getNodeSocket()->receive(&nodeAddress, packetData, &receivedBytes) && packetVersionMatch(packetData)) { switch (packetData[0]) { @@ -131,10 +135,14 @@ void AvatarMixer::run() { NUM_BYTES_RFC4122_UUID)); // add or update the node in our list - avatarNode = nodeList->addOrUpdateNode(nodeUUID, NODE_TYPE_AGENT, &nodeAddress, &nodeAddress); + avatarNode = nodeList->nodeWithUUID(nodeUUID); - // parse positional data from an node - nodeList->updateNodeWithData(avatarNode, packetData, receivedBytes); + if (avatarNode) { + // parse positional data from an node + nodeList->updateNodeWithData(avatarNode, &nodeAddress, packetData, receivedBytes); + } else { + break; + } case PACKET_TYPE_INJECT_AUDIO: broadcastAvatarData(nodeList, nodeUUID, &nodeAddress); break; diff --git a/libraries/shared/src/NodeList.cpp b/libraries/shared/src/NodeList.cpp index 6510276f9a..d0520e00ba 100644 --- a/libraries/shared/src/NodeList.cpp +++ b/libraries/shared/src/NodeList.cpp @@ -151,7 +151,7 @@ void NodeList::processNodeData(sockaddr* senderAddress, unsigned char* packetDat } case PACKET_TYPE_PING_REPLY: { // activate the appropriate socket for this node, if not yet updated - activateSocketFromPingReply(senderAddress); + activateSocketFromNodeCommunication(senderAddress); // set the ping time for this node for stat collection timePingReply(senderAddress, packetData); @@ -199,6 +199,7 @@ void NodeList::processBulkNodeData(sockaddr *senderAddress, unsigned char *packe } currentPosition += updateNodeWithData(matchingNode, + NULL, packetHolder, numTotalBytes - (currentPosition - startPosition)); @@ -206,35 +207,32 @@ void NodeList::processBulkNodeData(sockaddr *senderAddress, unsigned char *packe } } -int NodeList::updateNodeWithData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) { - // find the node by the sockaddr - Node* matchingNode = nodeWithAddress(senderAddress); - - if (matchingNode) { - return updateNodeWithData(matchingNode, packetData, dataBytes); - } else { - return 0; - } -} - -int NodeList::updateNodeWithData(Node *node, unsigned char *packetData, int dataBytes) { +int NodeList::updateNodeWithData(Node *node, sockaddr* senderAddress, unsigned char *packetData, int dataBytes) { node->lock(); node->setLastHeardMicrostamp(usecTimestampNow()); - if (node->getActiveSocket()) { + if (senderAddress) { + activateSocketFromNodeCommunication(senderAddress); + } + + if (node->getActiveSocket() || !senderAddress) { node->recordBytesReceived(dataBytes); + + if (!node->getLinkedData() && linkedDataCreateCallback) { + linkedDataCreateCallback(node); + } + + int numParsedBytes = node->getLinkedData()->parseData(packetData, dataBytes); + + node->unlock(); + + return numParsedBytes; + } else { + // we weren't able to match the sender address to the address we have for this node, unlock and don't parse + node->unlock(); + return 0; } - - if (!node->getLinkedData() && linkedDataCreateCallback) { - linkedDataCreateCallback(node); - } - - int numParsedBytes = node->getLinkedData()->parseData(packetData, dataBytes); - - node->unlock(); - - return numParsedBytes; } Node* NodeList::nodeWithAddress(sockaddr *senderAddress) { @@ -671,7 +669,25 @@ unsigned NodeList::broadcastToNodes(unsigned char* broadcastData, size_t dataByt return n; } -void NodeList::activateSocketFromPingReply(sockaddr *nodeAddress) { +const uint64_t PING_INACTIVE_NODE_INTERVAL_USECS = 1 * 1000 * 1000; + +void NodeList::possiblyPingInactiveNodes() { + static timeval lastPing = {}; + + // make sure PING_INACTIVE_NODE_INTERVAL_USECS has elapsed since last ping + if (usecTimestampNow() - usecTimestamp(&lastPing) >= PING_INACTIVE_NODE_INTERVAL_USECS) { + gettimeofday(&lastPing, NULL); + + for(NodeList::iterator node = begin(); node != end(); node++) { + if (!node->getActiveSocket()) { + // we don't have an active link to this node, ping it to set that up + pingPublicAndLocalSocketsForInactiveNode(&(*node)); + } + } + } +} + +void NodeList::activateSocketFromNodeCommunication(sockaddr *nodeAddress) { for(NodeList::iterator node = begin(); node != end(); node++) { if (!node->getActiveSocket()) { // check both the public and local addresses for each node to see if we find a match diff --git a/libraries/shared/src/NodeList.h b/libraries/shared/src/NodeList.h index ed7f0e637a..f98ae7070c 100644 --- a/libraries/shared/src/NodeList.h +++ b/libraries/shared/src/NodeList.h @@ -117,8 +117,7 @@ public: void processNodeData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes); void processBulkNodeData(sockaddr *senderAddress, unsigned char *packetData, int numTotalBytes); - int updateNodeWithData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes); - int updateNodeWithData(Node *node, unsigned char *packetData, int dataBytes); + int updateNodeWithData(Node *node, sockaddr* senderAddress, unsigned char *packetData, int dataBytes); unsigned broadcastToNodes(unsigned char *broadcastData, size_t dataBytes, const char* nodeTypes, int numNodeTypes); @@ -140,6 +139,7 @@ public: void addDomainListener(DomainChangeListener* listener); void removeDomainListener(DomainChangeListener* listener); + void possiblyPingInactiveNodes(); private: static NodeList* _sharedInstance; @@ -172,7 +172,7 @@ private: uint16_t _publicPort; bool _shouldUseDomainServerAsSTUN; - void activateSocketFromPingReply(sockaddr *nodeAddress); + void activateSocketFromNodeCommunication(sockaddr *nodeAddress); void timePingReply(sockaddr *nodeAddress, unsigned char *packetData); std::vector _hooks; diff --git a/libraries/voxel-server-library/src/VoxelServer.cpp b/libraries/voxel-server-library/src/VoxelServer.cpp index a24f13854b..188a2c7e05 100644 --- a/libraries/voxel-server-library/src/VoxelServer.cpp +++ b/libraries/voxel-server-library/src/VoxelServer.cpp @@ -327,6 +327,9 @@ void VoxelServer::run() { NodeList* nodeList = NodeList::getInstance(); nodeList->setOwnerType(NODE_TYPE_VOXEL_SERVER); + // we need to ask the DS about agents so we can ping/reply with them + nodeList->setNodeTypesOfInterest(&NODE_TYPE_AGENT, 1); + setvbuf(stdout, NULL, _IOLBF, 0); // tell our NodeList about our desire to get notifications @@ -434,6 +437,9 @@ void VoxelServer::run() { NodeList::getInstance()->sendDomainServerCheckIn(); } + // ping our inactive nodes to punch holes with them + nodeList->possiblyPingInactiveNodes(); + if (nodeList->getNodeSocket()->receive(&senderAddress, packetData, &packetLength) && packetVersionMatch(packetData)) { @@ -445,23 +451,16 @@ void VoxelServer::run() { QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*)packetData + numBytesPacketHeader, NUM_BYTES_RFC4122_UUID)); - Node* node = NodeList::getInstance()->addOrUpdateNode(nodeUUID, - NODE_TYPE_AGENT, - &senderAddress, - &senderAddress); + Node* node = nodeList->nodeWithUUID(nodeUUID); - // temp activation of public socket before server ping/reply is setup - if (!node->getActiveSocket()) { - node->activatePublicSocket(); + if (node) { + NodeList::getInstance()->updateNodeWithData(node, &senderAddress, packetData, packetLength); + + VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); + if (nodeData && !nodeData->isVoxelSendThreadInitalized()) { + nodeData->initializeVoxelSendThread(this); + } } - - NodeList::getInstance()->updateNodeWithData(node, packetData, packetLength); - - VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData(); - if (nodeData && !nodeData->isVoxelSendThreadInitalized()) { - nodeData->initializeVoxelSendThread(this); - } - } else if (packetData[0] == PACKET_TYPE_PING || packetData[0] == PACKET_TYPE_DOMAIN || packetData[0] == PACKET_TYPE_STUN_RESPONSE) {