add ping/reply to assignment-client servers

This commit is contained in:
Stephen Birarda 2013-10-17 16:56:52 -07:00
parent 964200cdab
commit b4ef3dbd8a
5 changed files with 88 additions and 74 deletions

View file

@ -81,6 +81,9 @@ void AudioMixer::run() {
NodeList *nodeList = NodeList::getInstance();
nodeList->setOwnerType(NODE_TYPE_AUDIO_MIXER);
const char AUDIO_MIXER_NODE_TYPES_OF_INTEREST[2] = { NODE_TYPE_AGENT, NODE_TYPE_AUDIO_INJECTOR };
nodeList->setNodeTypesOfInterest(AUDIO_MIXER_NODE_TYPES_OF_INTEREST, sizeof(AUDIO_MIXER_NODE_TYPES_OF_INTEREST));
ssize_t receivedBytes = 0;
nodeList->linkedDataCreateCallback = attachNewBufferToNode;
@ -144,6 +147,9 @@ void AudioMixer::run() {
}
}
// get the NodeList to ping any inactive nodes, for hole punching
nodeList->possiblyPingInactiveNodes();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
PositionalAudioRingBuffer* positionalRingBuffer = (PositionalAudioRingBuffer*) node->getLinkedData();
if (positionalRingBuffer && positionalRingBuffer->shouldBeAddedToMix(JITTER_BUFFER_SAMPLES)) {
@ -353,39 +359,24 @@ void AudioMixer::run() {
// pull any new audio data from nodes off of the network stack
while (nodeList->getNodeSocket()->receive(nodeAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO ||
packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO) {
if (packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO
|| packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO
|| packetData[0] == PACKET_TYPE_INJECT_AUDIO) {
unsigned char* currentBuffer = packetData + numBytesForPacketHeader(packetData);
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*) currentBuffer, NUM_BYTES_RFC4122_UUID));
Node* avatarNode = nodeList->addOrUpdateNode(nodeUUID,
NODE_TYPE_AGENT,
nodeAddress,
nodeAddress);
// temp activation of public socket before server ping/reply is setup
if (!avatarNode->getActiveSocket()) {
avatarNode->activatePublicSocket();
}
nodeList->updateNodeWithData(nodeAddress, packetData, receivedBytes);
if (std::isnan(((PositionalAudioRingBuffer *)avatarNode->getLinkedData())->getOrientation().x)) {
// kill off this node - temporary solution to mixer crash on mac sleep
avatarNode->setAlive(false);
}
} else if (packetData[0] == PACKET_TYPE_INJECT_AUDIO) {
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*) packetData + numBytesForPacketHeader(packetData),
NUM_BYTES_RFC4122_UUID));
Node* matchingInjector = nodeList->addOrUpdateNode(nodeUUID,
NODE_TYPE_AUDIO_INJECTOR,
NULL,
NULL);
Node* matchingNode = nodeList->nodeWithUUID(nodeUUID);
// give the new audio data to the matching injector node
nodeList->updateNodeWithData(matchingInjector, packetData, receivedBytes);
if (matchingNode) {
nodeList->updateNodeWithData(matchingNode, nodeAddress, packetData, receivedBytes);
if (packetData[0] != PACKET_TYPE_INJECT_AUDIO
&& std::isnan(((PositionalAudioRingBuffer *)matchingNode->getLinkedData())->getOrientation().x)) {
// kill off this node - temporary solution to mixer crash on mac sleep
matchingNode->setAlive(false);
}
}
} else {
// let processNodeData handle it.
nodeList->processNodeData(nodeAddress, packetData, receivedBytes);

View file

@ -97,6 +97,8 @@ void AvatarMixer::run() {
NodeList* nodeList = NodeList::getInstance();
nodeList->setOwnerType(NODE_TYPE_AVATAR_MIXER);
nodeList->setNodeTypesOfInterest(&NODE_TYPE_AGENT, 1);
nodeList->linkedDataCreateCallback = attachAvatarDataToNode;
nodeList->startSilentNodeRemovalThread();
@ -123,6 +125,8 @@ void AvatarMixer::run() {
NodeList::getInstance()->sendDomainServerCheckIn();
}
nodeList->possiblyPingInactiveNodes();
if (nodeList->getNodeSocket()->receive(&nodeAddress, packetData, &receivedBytes) &&
packetVersionMatch(packetData)) {
switch (packetData[0]) {
@ -131,10 +135,14 @@ void AvatarMixer::run() {
NUM_BYTES_RFC4122_UUID));
// add or update the node in our list
avatarNode = nodeList->addOrUpdateNode(nodeUUID, NODE_TYPE_AGENT, &nodeAddress, &nodeAddress);
avatarNode = nodeList->nodeWithUUID(nodeUUID);
// parse positional data from an node
nodeList->updateNodeWithData(avatarNode, packetData, receivedBytes);
if (avatarNode) {
// parse positional data from an node
nodeList->updateNodeWithData(avatarNode, &nodeAddress, packetData, receivedBytes);
} else {
break;
}
case PACKET_TYPE_INJECT_AUDIO:
broadcastAvatarData(nodeList, nodeUUID, &nodeAddress);
break;

View file

@ -151,7 +151,7 @@ void NodeList::processNodeData(sockaddr* senderAddress, unsigned char* packetDat
}
case PACKET_TYPE_PING_REPLY: {
// activate the appropriate socket for this node, if not yet updated
activateSocketFromPingReply(senderAddress);
activateSocketFromNodeCommunication(senderAddress);
// set the ping time for this node for stat collection
timePingReply(senderAddress, packetData);
@ -199,6 +199,7 @@ void NodeList::processBulkNodeData(sockaddr *senderAddress, unsigned char *packe
}
currentPosition += updateNodeWithData(matchingNode,
NULL,
packetHolder,
numTotalBytes - (currentPosition - startPosition));
@ -206,35 +207,32 @@ void NodeList::processBulkNodeData(sockaddr *senderAddress, unsigned char *packe
}
}
int NodeList::updateNodeWithData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) {
// find the node by the sockaddr
Node* matchingNode = nodeWithAddress(senderAddress);
if (matchingNode) {
return updateNodeWithData(matchingNode, packetData, dataBytes);
} else {
return 0;
}
}
int NodeList::updateNodeWithData(Node *node, unsigned char *packetData, int dataBytes) {
int NodeList::updateNodeWithData(Node *node, sockaddr* senderAddress, unsigned char *packetData, int dataBytes) {
node->lock();
node->setLastHeardMicrostamp(usecTimestampNow());
if (node->getActiveSocket()) {
if (senderAddress) {
activateSocketFromNodeCommunication(senderAddress);
}
if (node->getActiveSocket() || !senderAddress) {
node->recordBytesReceived(dataBytes);
if (!node->getLinkedData() && linkedDataCreateCallback) {
linkedDataCreateCallback(node);
}
int numParsedBytes = node->getLinkedData()->parseData(packetData, dataBytes);
node->unlock();
return numParsedBytes;
} else {
// we weren't able to match the sender address to the address we have for this node, unlock and don't parse
node->unlock();
return 0;
}
if (!node->getLinkedData() && linkedDataCreateCallback) {
linkedDataCreateCallback(node);
}
int numParsedBytes = node->getLinkedData()->parseData(packetData, dataBytes);
node->unlock();
return numParsedBytes;
}
Node* NodeList::nodeWithAddress(sockaddr *senderAddress) {
@ -671,7 +669,25 @@ unsigned NodeList::broadcastToNodes(unsigned char* broadcastData, size_t dataByt
return n;
}
void NodeList::activateSocketFromPingReply(sockaddr *nodeAddress) {
const uint64_t PING_INACTIVE_NODE_INTERVAL_USECS = 1 * 1000 * 1000;
void NodeList::possiblyPingInactiveNodes() {
static timeval lastPing = {};
// make sure PING_INACTIVE_NODE_INTERVAL_USECS has elapsed since last ping
if (usecTimestampNow() - usecTimestamp(&lastPing) >= PING_INACTIVE_NODE_INTERVAL_USECS) {
gettimeofday(&lastPing, NULL);
for(NodeList::iterator node = begin(); node != end(); node++) {
if (!node->getActiveSocket()) {
// we don't have an active link to this node, ping it to set that up
pingPublicAndLocalSocketsForInactiveNode(&(*node));
}
}
}
}
void NodeList::activateSocketFromNodeCommunication(sockaddr *nodeAddress) {
for(NodeList::iterator node = begin(); node != end(); node++) {
if (!node->getActiveSocket()) {
// check both the public and local addresses for each node to see if we find a match

View file

@ -117,8 +117,7 @@ public:
void processNodeData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes);
void processBulkNodeData(sockaddr *senderAddress, unsigned char *packetData, int numTotalBytes);
int updateNodeWithData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes);
int updateNodeWithData(Node *node, unsigned char *packetData, int dataBytes);
int updateNodeWithData(Node *node, sockaddr* senderAddress, unsigned char *packetData, int dataBytes);
unsigned broadcastToNodes(unsigned char *broadcastData, size_t dataBytes, const char* nodeTypes, int numNodeTypes);
@ -140,6 +139,7 @@ public:
void addDomainListener(DomainChangeListener* listener);
void removeDomainListener(DomainChangeListener* listener);
void possiblyPingInactiveNodes();
private:
static NodeList* _sharedInstance;
@ -172,7 +172,7 @@ private:
uint16_t _publicPort;
bool _shouldUseDomainServerAsSTUN;
void activateSocketFromPingReply(sockaddr *nodeAddress);
void activateSocketFromNodeCommunication(sockaddr *nodeAddress);
void timePingReply(sockaddr *nodeAddress, unsigned char *packetData);
std::vector<NodeListHook*> _hooks;

View file

@ -327,6 +327,9 @@ void VoxelServer::run() {
NodeList* nodeList = NodeList::getInstance();
nodeList->setOwnerType(NODE_TYPE_VOXEL_SERVER);
// we need to ask the DS about agents so we can ping/reply with them
nodeList->setNodeTypesOfInterest(&NODE_TYPE_AGENT, 1);
setvbuf(stdout, NULL, _IOLBF, 0);
// tell our NodeList about our desire to get notifications
@ -434,6 +437,9 @@ void VoxelServer::run() {
NodeList::getInstance()->sendDomainServerCheckIn();
}
// ping our inactive nodes to punch holes with them
nodeList->possiblyPingInactiveNodes();
if (nodeList->getNodeSocket()->receive(&senderAddress, packetData, &packetLength) &&
packetVersionMatch(packetData)) {
@ -445,23 +451,16 @@ void VoxelServer::run() {
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*)packetData + numBytesPacketHeader,
NUM_BYTES_RFC4122_UUID));
Node* node = NodeList::getInstance()->addOrUpdateNode(nodeUUID,
NODE_TYPE_AGENT,
&senderAddress,
&senderAddress);
Node* node = nodeList->nodeWithUUID(nodeUUID);
// temp activation of public socket before server ping/reply is setup
if (!node->getActiveSocket()) {
node->activatePublicSocket();
if (node) {
NodeList::getInstance()->updateNodeWithData(node, &senderAddress, packetData, packetLength);
VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData();
if (nodeData && !nodeData->isVoxelSendThreadInitalized()) {
nodeData->initializeVoxelSendThread(this);
}
}
NodeList::getInstance()->updateNodeWithData(node, packetData, packetLength);
VoxelNodeData* nodeData = (VoxelNodeData*) node->getLinkedData();
if (nodeData && !nodeData->isVoxelSendThreadInitalized()) {
nodeData->initializeVoxelSendThread(this);
}
} else if (packetData[0] == PACKET_TYPE_PING
|| packetData[0] == PACKET_TYPE_DOMAIN
|| packetData[0] == PACKET_TYPE_STUN_RESPONSE) {