activate appropriate socket after receiving ping reply

This commit is contained in:
Stephen Birarda 2013-10-16 17:51:32 -07:00
parent a4be44c2c5
commit 7de88898e1
5 changed files with 88 additions and 92 deletions

View file

@ -484,8 +484,6 @@ int DomainServer::run() {
nodePublicAddress.sin_family = AF_INET; nodePublicAddress.sin_family = AF_INET;
nodeLocalAddress.sin_family = AF_INET; nodeLocalAddress.sin_family = AF_INET;
in_addr_t serverLocalAddress = getLocalAddress();
nodeList->startSilentNodeRemovalThread(); nodeList->startSilentNodeRemovalThread();
if (!_staticAssignmentFile.exists() || _voxelServerConfig) { if (!_staticAssignmentFile.exists() || _voxelServerConfig) {
@ -628,20 +626,11 @@ int DomainServer::run() {
qDebug() << "Received a create assignment -" << *createAssignment << "\n"; qDebug() << "Received a create assignment -" << *createAssignment << "\n";
// check the node public address
// if it matches our local address
// or if it's the loopback address we're on the same box
if (nodePublicAddress.sin_addr.s_addr == serverLocalAddress ||
nodePublicAddress.sin_addr.s_addr == htonl(INADDR_LOOPBACK)) {
nodePublicAddress.sin_addr.s_addr = 0;
}
// make sure we have a matching node with the UUID packed with the assignment // make sure we have a matching node with the UUID packed with the assignment
// if the node has sent no types of interest, assume they want nothing but their own ID back // if the node has sent no types of interest, assume they want nothing but their own ID back
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) { for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData() if (node->getLinkedData()
&& socketMatch((sockaddr*) &nodePublicAddress, node->getPublicSocket()) && socketMatch((sockaddr*) &senderAddress, node->getPublicSocket())
&& ((Assignment*) node->getLinkedData())->getUUID() == createAssignment->getUUID()) { && ((Assignment*) node->getLinkedData())->getUUID() == createAssignment->getUUID()) {
// give the create assignment a new UUID // give the create assignment a new UUID

View file

@ -106,10 +106,7 @@ inline void Audio::performIO(int16_t* inputLeft, int16_t* outputLeft, int16_t* o
Node* audioMixer = nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER); Node* audioMixer = nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER);
if (audioMixer) { if (audioMixer) {
audioMixer->lock(); if (audioMixer->getActiveSocket()) {
sockaddr_in audioSocket = *(sockaddr_in*) audioMixer->getActiveSocket();
audioMixer->unlock();
glm::vec3 headPosition = interfaceAvatar->getHeadJointPosition(); glm::vec3 headPosition = interfaceAvatar->getHeadJointPosition();
glm::quat headOrientation = interfaceAvatar->getHead().getOrientation(); glm::quat headOrientation = interfaceAvatar->getHead().getOrientation();
@ -164,13 +161,15 @@ inline void Audio::performIO(int16_t* inputLeft, int16_t* outputLeft, int16_t* o
// copy the audio data to the last BUFFER_LENGTH_BYTES bytes of the data packet // copy the audio data to the last BUFFER_LENGTH_BYTES bytes of the data packet
memcpy(currentPacketPtr, inputLeft, BUFFER_LENGTH_BYTES_PER_CHANNEL); memcpy(currentPacketPtr, inputLeft, BUFFER_LENGTH_BYTES_PER_CHANNEL);
nodeList->getNodeSocket()->send((sockaddr*) &audioSocket, nodeList->getNodeSocket()->send(audioMixer->getActiveSocket(),
dataPacket, dataPacket,
BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes); BUFFER_LENGTH_BYTES_PER_CHANNEL + leadingBytes);
interface->getBandwidthMeter()->outputStream(BandwidthMeter::AUDIO).updateValue(BUFFER_LENGTH_BYTES_PER_CHANNEL interface->getBandwidthMeter()->outputStream(BandwidthMeter::AUDIO).updateValue(BUFFER_LENGTH_BYTES_PER_CHANNEL
+ leadingBytes); + leadingBytes);
} else {
nodeList->pingPublicAndLocalSocketsForInactiveNode(audioMixer);
}
} }
} }

View file

@ -106,10 +106,12 @@ const char* Node::getTypeName() const {
} }
void Node::activateLocalSocket() { void Node::activateLocalSocket() {
qDebug() << "Activating local socket for node" << *this << "\n";
_activeSocket = _localSocket; _activeSocket = _localSocket;
} }
void Node::activatePublicSocket() { void Node::activatePublicSocket() {
qDebug() << "Activating public socket for node" << *this << "\n";
_activeSocket = _publicSocket; _activeSocket = _publicSocket;
} }

View file

@ -143,13 +143,16 @@ void NodeList::processNodeData(sockaddr* senderAddress, unsigned char* packetDat
break; break;
} }
case PACKET_TYPE_PING: { case PACKET_TYPE_PING: {
char pingPacket[dataBytes]; // send it right back
memcpy(pingPacket, packetData, dataBytes); populateTypeAndVersion(packetData, PACKET_TYPE_PING_REPLY);
populateTypeAndVersion((unsigned char*) pingPacket, PACKET_TYPE_PING_REPLY); _nodeSocket.send(senderAddress, packetData, dataBytes);
_nodeSocket.send(senderAddress, pingPacket, dataBytes);
break; break;
} }
case PACKET_TYPE_PING_REPLY: { case PACKET_TYPE_PING_REPLY: {
// activate the appropriate socket for this node, if not yet updated
handlePingReply(senderAddress);
// set the ping time for this node for stat collection
timePingReply(senderAddress, packetData); timePingReply(senderAddress, packetData);
break; break;
} }
@ -553,6 +556,22 @@ void NodeList::sendAssignment(Assignment& assignment) {
_nodeSocket.send(assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes); _nodeSocket.send(assignmentServerSocket, assignmentPacket, numHeaderBytes + numAssignmentBytes);
} }
void NodeList::pingPublicAndLocalSocketsForInactiveNode(Node* node) const {
uint64_t currentTime = 0;
// setup a ping packet to send to this node
unsigned char pingPacket[numBytesForPacketHeader((uchar*) &PACKET_TYPE_PING) + sizeof(currentTime)];
int numHeaderBytes = populateTypeAndVersion(pingPacket, PACKET_TYPE_PING);
currentTime = usecTimestampNow();
memcpy(pingPacket + numHeaderBytes, &currentTime, sizeof(currentTime));
// send the ping packet to the local and public sockets for this node
_nodeSocket.send(node->getLocalSocket(), pingPacket, sizeof(pingPacket));
_nodeSocket.send(node->getPublicSocket(), pingPacket, sizeof(pingPacket));
}
Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, char nodeType, uint16_t nodeId) { Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, char nodeType, uint16_t nodeId) {
NodeList::iterator node = end(); NodeList::iterator node = end();
@ -569,25 +588,10 @@ Node* NodeList::addOrUpdateNode(sockaddr* publicSocket, sockaddr* localSocket, c
// we didn't have this node, so add them // we didn't have this node, so add them
Node* newNode = new Node(publicSocket, localSocket, nodeType, nodeId); Node* newNode = new Node(publicSocket, localSocket, nodeType, nodeId);
if (socketMatch(publicSocket, localSocket)) {
// likely debugging scenario with two nodes on local network
// set the node active right away
newNode->activatePublicSocket();
}
if (newNode->getType() == NODE_TYPE_VOXEL_SERVER ||
newNode->getType() == NODE_TYPE_AVATAR_MIXER ||
newNode->getType() == NODE_TYPE_AUDIO_MIXER) {
// this is currently the cheat we use to talk directly to our test servers on EC2
// to be removed when we have a proper identification strategy
newNode->activatePublicSocket();
}
addNodeToList(newNode); addNodeToList(newNode);
return newNode; return newNode;
} else { } else {
if (node->getType() == NODE_TYPE_AUDIO_MIXER || if (node->getType() == NODE_TYPE_AUDIO_MIXER ||
node->getType() == NODE_TYPE_VOXEL_SERVER) { node->getType() == NODE_TYPE_VOXEL_SERVER) {
// until the Audio class also uses our nodeList, we need to update // until the Audio class also uses our nodeList, we need to update

View file

@ -112,6 +112,8 @@ public:
void setAssignmentServerSocket(sockaddr* serverSocket) { _assignmentServerSocket = serverSocket; } void setAssignmentServerSocket(sockaddr* serverSocket) { _assignmentServerSocket = serverSocket; }
void sendAssignment(Assignment& assignment); void sendAssignment(Assignment& assignment);
void pingPublicAndLocalSocketsForInactiveNode(Node* node) const;
Node* nodeWithAddress(sockaddr *senderAddress); Node* nodeWithAddress(sockaddr *senderAddress);
Node* nodeWithID(uint16_t nodeID); Node* nodeWithID(uint16_t nodeID);