This commit is contained in:
Andrzej Kapolka 2013-09-10 10:32:56 -07:00
commit 13a19e2986
7 changed files with 98 additions and 47 deletions

View file

@ -64,25 +64,25 @@ void childClient() {
if (nodeList->getNodeSocket()->receive(packetData, &receivedBytes) &&
packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT && packetVersionMatch(packetData)) {
// construct the deployed assignment from the packet data
Assignment deployedAssignment(packetData, receivedBytes);
qDebug() << "Received an assignment -" << deployedAssignment << "\n";
// switch our nodelist DOMAIN_IP to the ip receieved in the assignment
if (deployedAssignment.getDomainSocket()->sa_family == AF_INET) {
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getDomainSocket())->sin_addr;
if (deployedAssignment.getAttachedPublicSocket()->sa_family == AF_INET) {
in_addr domainSocketAddr = ((sockaddr_in*) deployedAssignment.getAttachedPublicSocket())->sin_addr;
nodeList->setDomainIP(inet_ntoa(domainSocketAddr));
qDebug("Changed Domain IP to %s\n", inet_ntoa(domainSocketAddr));
}
if (deployedAssignment.getType() == Assignment::AudioMixer) {
AudioMixer::run();
qDebug("Destination IP for assignment is %s\n", inet_ntoa(domainSocketAddr));
if (deployedAssignment.getType() == Assignment::AudioMixer) {
AudioMixer::run();
} else {
AvatarMixer::run();
}
} else {
AvatarMixer::run();
qDebug("Received a bad destination socket for assignment.\n");
}
qDebug("Assignment finished or never started - waiting for new assignment\n");

View file

@ -68,6 +68,15 @@ int main(int argc, const char* argv[]) {
&& strcmp((*assignment)->getPool(), requestAssignment.getPool()) == 0)
|| !eitherHasPool) {
// check if the requestor is on the same network as the destination for the assignment
if (senderSocket.sin_addr.s_addr ==
((sockaddr_in*) (*assignment)->getAttachedPublicSocket())->sin_addr.s_addr) {
// if this is the case we remove the public socket on the assignment by setting it to NULL
// this ensures the local IP and port sent to the requestor is the local address of destination
(*assignment)->setAttachedPublicSocket(NULL);
}
int numAssignmentBytes = (*assignment)->packToBuffer(assignmentPacket + numSendHeaderBytes);
// send the assignment
@ -96,10 +105,10 @@ int main(int argc, const char* argv[]) {
qDebug() << "Received a created assignment:" << *createdAssignment;
qDebug() << "Current queue size is" << assignmentQueue.size();
// assignment server is on a public server
// assignment server is likely on a public server
// assume that the address we now have for the sender is the public address/port
// and store that with the assignment so it can be given to the requestor later
createdAssignment->setDomainSocket((sockaddr*) &senderSocket);
// and store that with the assignment so it can be given to the requestor later if necessary
createdAssignment->setAttachedPublicSocket((sockaddr*) &senderSocket);
// add this assignment to the queue
assignmentQueue.push_back(createdAssignment);

View file

@ -106,6 +106,12 @@ int main(int argc, const char* argv[]) {
Assignment* audioAssignment = NULL;
Assignment* avatarAssignment = NULL;
// construct a local socket to send with our created assignments
sockaddr_in localSocket = {};
localSocket.sin_family = AF_INET;
localSocket.sin_port = htons(nodeList->getInstance()->getNodeSocket()->getListeningPort());
localSocket.sin_addr.s_addr = serverLocalAddress;
while (true) {
if (!nodeList->soloNodeOfType(NODE_TYPE_AUDIO_MIXER)) {
if (!audioAssignment
@ -113,6 +119,7 @@ int main(int argc, const char* argv[]) {
if (!audioAssignment) {
audioAssignment = new Assignment(Assignment::Create, Assignment::AudioMixer, assignmentPool);
audioAssignment->setAttachedLocalSocket((sockaddr*) &localSocket);
}
nodeList->sendAssignment(*audioAssignment);
@ -125,6 +132,7 @@ int main(int argc, const char* argv[]) {
|| usecTimestampNow() - usecTimestamp(&avatarAssignment->getTime()) >= ASSIGNMENT_SILENCE_MAX_USECS) {
if (!avatarAssignment) {
avatarAssignment = new Assignment(Assignment::Create, Assignment::AvatarMixer, assignmentPool);
avatarAssignment->setAttachedLocalSocket((sockaddr*) &localSocket);
}
nodeList->sendAssignment(*avatarAssignment);

View file

@ -17,7 +17,8 @@ Assignment::Assignment(Assignment::Direction direction, Assignment::Type type, c
_direction(direction),
_type(type),
_pool(NULL),
_domainSocket(NULL)
_attachedPublicSocket(NULL),
_attachedLocalSocket(NULL)
{
// set the create time on this assignment
gettimeofday(&_time, NULL);
@ -34,7 +35,8 @@ Assignment::Assignment(Assignment::Direction direction, Assignment::Type type, c
Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
_pool(NULL),
_domainSocket(NULL)
_attachedPublicSocket(NULL),
_attachedLocalSocket(NULL)
{
// set the create time on this assignment
gettimeofday(&_time, NULL);
@ -64,41 +66,55 @@ Assignment::Assignment(const unsigned char* dataBuffer, int numBytes) :
}
if (numBytes > numBytesRead) {
sockaddr* newSocket = NULL;
if (dataBuffer[numBytesRead++] == IPv4_ADDRESS_DESIGNATOR) {
// IPv4 address
sockaddr_in destinationSocket = {};
memcpy(&destinationSocket, dataBuffer + numBytesRead, sizeof(sockaddr_in));
destinationSocket.sin_family = AF_INET;
setDomainSocket((sockaddr*) &destinationSocket);
newSocket = (sockaddr*) new sockaddr_in;
unpackSocket(dataBuffer + numBytesRead, newSocket);
} else {
// IPv6 address
sockaddr_in6 destinationSocket = {};
memcpy(&destinationSocket, dataBuffer + numBytesRead, sizeof(sockaddr_in6));
setDomainSocket((sockaddr*) &destinationSocket);
// IPv6 address, or bad designator
qDebug("Received a socket that cannot be unpacked!\n");
}
if (_direction == Assignment::Create) {
delete _attachedLocalSocket;
_attachedLocalSocket = newSocket;
} else {
delete _attachedPublicSocket;
_attachedPublicSocket = newSocket;
}
}
}
Assignment::~Assignment() {
delete _domainSocket;
delete _attachedPublicSocket;
delete _attachedLocalSocket;
delete _pool;
}
void Assignment::setDomainSocket(const sockaddr* domainSocket) {
if (_domainSocket) {
// delete the old _domainSocket if it exists
delete _domainSocket;
_domainSocket = NULL;
void Assignment::setAttachedPublicSocket(const sockaddr* attachedPublicSocket) {
if (_attachedPublicSocket) {
// delete the old socket if it exists
delete _attachedPublicSocket;
_attachedPublicSocket = NULL;
}
// create a new sockaddr or sockaddr_in depending on what type of address this is
if (domainSocket->sa_family == AF_INET) {
_domainSocket = (sockaddr*) new sockaddr_in;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in));
} else {
_domainSocket = (sockaddr*) new sockaddr_in6;
memcpy(_domainSocket, domainSocket, sizeof(sockaddr_in6));
if (attachedPublicSocket) {
copySocketToEmptySocketPointer(&_attachedPublicSocket, attachedPublicSocket);
}
}
void Assignment::setAttachedLocalSocket(const sockaddr* attachedLocalSocket) {
if (_attachedLocalSocket) {
// delete the old socket if it exists
delete _attachedLocalSocket;
_attachedLocalSocket = NULL;
}
if (attachedLocalSocket) {
copySocketToEmptySocketPointer(&_attachedLocalSocket, attachedLocalSocket);
}
}
@ -118,13 +134,14 @@ int Assignment::packToBuffer(unsigned char* buffer) {
numPackedBytes += sizeof(char);
}
if (_domainSocket) {
buffer[numPackedBytes++] = (_domainSocket->sa_family == AF_INET) ? IPv4_ADDRESS_DESIGNATOR : IPv6_ADDRESS_DESIGNATOR;
if (_attachedPublicSocket || _attachedLocalSocket) {
sockaddr* socketToPack = (_attachedPublicSocket) ? _attachedPublicSocket : _attachedLocalSocket;
int numSocketBytes = (_domainSocket->sa_family == AF_INET) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6);
// we have a socket to pack, add the designator
buffer[numPackedBytes++] = (socketToPack->sa_family == AF_INET)
? IPv4_ADDRESS_DESIGNATOR : IPv6_ADDRESS_DESIGNATOR;
memcpy(buffer + numPackedBytes, _domainSocket, numSocketBytes);
numPackedBytes += numSocketBytes;
numPackedBytes += packSocket(buffer + numPackedBytes, socketToPack);
}
return numPackedBytes;

View file

@ -43,8 +43,11 @@ public:
const char* getPool() const { return _pool; }
const timeval& getTime() const { return _time; }
const sockaddr* getDomainSocket() { return _domainSocket; }
void setDomainSocket(const sockaddr* domainSocket);
const sockaddr* getAttachedPublicSocket() { return _attachedPublicSocket; }
void setAttachedPublicSocket(const sockaddr* attachedPublicSocket);
const sockaddr* getAttachedLocalSocket() { return _attachedLocalSocket; }
void setAttachedLocalSocket(const sockaddr* attachedLocalSocket);
/// Packs the assignment to the passed buffer
/// \param buffer the buffer in which to pack the assignment
@ -58,7 +61,8 @@ private:
Assignment::Direction _direction; /// the direction of the assignment (Create, Deploy, Request)
Assignment::Type _type; /// the type of the assignment, defines what the assignee will do
char* _pool; /// the pool this assignment is for/from
sockaddr* _domainSocket; /// pointer to socket for domain server that created assignment
sockaddr* _attachedPublicSocket; /// pointer to a public socket that relates to assignment, depends on direction
sockaddr* _attachedLocalSocket; /// pointer to a local socket that relates to assignment, depends on direction
timeval _time; /// time the assignment was created (set in constructor)
};

View file

@ -68,13 +68,25 @@ int packSocket(unsigned char* packStore, sockaddr* socketToPack) {
return packSocket(packStore, ((sockaddr_in*) socketToPack)->sin_addr.s_addr, ((sockaddr_in*) socketToPack)->sin_port);
}
int unpackSocket(unsigned char* packedData, sockaddr* unpackDestSocket) {
int unpackSocket(const unsigned char* packedData, sockaddr* unpackDestSocket) {
sockaddr_in* destinationSocket = (sockaddr_in*) unpackDestSocket;
destinationSocket->sin_family = AF_INET;
destinationSocket->sin_addr.s_addr = (packedData[0] << 24) + (packedData[1] << 16) + (packedData[2] << 8) + packedData[3];
destinationSocket->sin_port = (packedData[4] << 8) + packedData[5];
return 6; // this could be more if we ever need IPv6
}
void copySocketToEmptySocketPointer(sockaddr** destination, const sockaddr* source) {
// create a new sockaddr or sockaddr_in depending on what type of address this is
if (source->sa_family == AF_INET) {
*destination = (sockaddr*) new sockaddr_in;
memcpy(*destination, source, sizeof(sockaddr_in));
} else {
*destination = (sockaddr*) new sockaddr_in6;
memcpy(*destination, source, sizeof(sockaddr_in6));
}
}
int getLocalAddress() {
// get this node's local address so we can pass that to DS
struct ifaddrs* ifAddrStruct = NULL;

View file

@ -44,7 +44,8 @@ private:
bool socketMatch(const sockaddr* first, const sockaddr* second);
int packSocket(unsigned char* packStore, in_addr_t inAddress, in_port_t networkOrderPort);
int packSocket(unsigned char* packStore, sockaddr* socketToPack);
int unpackSocket(unsigned char* packedData, sockaddr* unpackDestSocket);
int unpackSocket(const unsigned char* packedData, sockaddr* unpackDestSocket);
void copySocketToEmptySocketPointer(sockaddr** destination, const sockaddr* source);
int getLocalAddress();
unsigned short loadBufferWithSocketInfo(char* addressBuffer, sockaddr* socket);
sockaddr_in socketForHostnameAndHostOrderPort(const char* hostname, unsigned short port = 0);