hook domain server up to AgentList, pack sockets

This commit is contained in:
Stephen Birarda 2013-02-20 18:32:23 -08:00
parent 7205817476
commit c57380012e
8 changed files with 133 additions and 199 deletions

View file

@ -27,167 +27,55 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/time.h> #include <sys/time.h>
#include "UDPSocket.h" #include "AgentList.h"
const int DOMAIN_LISTEN_PORT = 40102; const int DOMAIN_LISTEN_PORT = 40102;
const char DESTINATION_IP[] = "127.0.0.1";
const int MAX_PACKET_SIZE = 1500; const int MAX_PACKET_SIZE = 1500;
char packet_data[MAX_PACKET_SIZE]; unsigned char packetData[MAX_PACKET_SIZE];
const int MAX_AGENTS = 1000;
const int LOGOFF_CHECK_INTERVAL = 5000; const int LOGOFF_CHECK_INTERVAL = 5000;
struct AgentList {
char agentType;
uint32_t ip;
in_addr public_sin_addr;
unsigned short public_port;
char *private_addr;
unsigned short private_port;
float x, y, z;
bool active;
timeval time, connectTime;
} agents[MAX_AGENTS];
int num_agents = 0;
int lastActiveCount = 0; int lastActiveCount = 0;
AgentList agentList(DOMAIN_LISTEN_PORT);
UDPSocket domainSocket = UDPSocket(DOMAIN_LISTEN_PORT); int listForBroadcast(unsigned char *listBuffer) {
unsigned char *currentBufferPos = listBuffer + 1;
double diffclock(timeval clock1,timeval clock2) unsigned char *startPointer = currentBufferPos;
{
double diffms = (clock2.tv_sec - clock1.tv_sec) * 1000.0;
diffms += (clock2.tv_usec - clock1.tv_usec) / 1000.0; // us to ms
return diffms;
}
int addAgent(uint32_t ip, in_port_t port, char *private_ip, unsigned short private_port, char agentType, float x, float y, float z) {
// Search for agent in list and add if needed
int i = 0;
int is_new = 0;
while ((ip != agents[i].ip || port != agents[i].public_port) && (i < num_agents)) {
i++;
}
if ((i == num_agents) || (agents[i].active == false)) { for(std::vector<Agent>::iterator agent = agentList.agents.begin(); agent != agentList.agents.end(); agent++) {
is_new = 1; *currentBufferPos++ = agent->type;
agents[i].private_addr = new char[255];
}
agents[i].ip = ip;
agents[i].x = x;
agents[i].y = y;
agents[i].z = z;
agents[i].active = true;
agents[i].public_sin_addr.s_addr = ip;
agents[i].public_port = port;
strcpy(agents[i].private_addr, private_ip);
agents[i].private_port = private_port;
agents[i].agentType = agentType;
gettimeofday(&agents[i].time, NULL);
if (is_new) gettimeofday(&agents[i].connectTime, NULL);
if (i == num_agents) {
num_agents++;
}
return is_new;
}
void update_agent_list(timeval now) {
int i;
//std::cout << "Checking agent list" << "\n";
for (i = 0; i < num_agents; i++) {
if ((diffclock(agents[i].time, now) > LOGOFF_CHECK_INTERVAL) &&
agents[i].active) {
std::cout << "Expired Agent type " << agents[i].agentType << " from " <<
inet_ntoa(agents[i].public_sin_addr) << ":" << agents[i].public_port << "\n";
agents[i].active = false;
}
}
}
void send_agent_list(sockaddr_in *agentAddrPointer) {
int i, length = 0;
ssize_t sentBytes;
char buffer[MAX_PACKET_SIZE];
char * public_address;
char public_portstring[10];
char * private_address;
char private_portstring[10];
int numSent = 0;
buffer[length++] = 'D';
//std::cout << "send list to: " << inet_ntoa(dest_address->sin_addr) << "\n";
for (i = 0; i < num_agents; i++) {
if (agents[i].active) {
// Write the type of the agent
buffer[length++] = agents[i].agentType;
// Write agent's public IP address
public_address = inet_ntoa(agents[i].public_sin_addr);
memcpy(&buffer[length], public_address, strlen(public_address));
length += strlen(public_address);
// Add public port number
buffer[length++] = ' ';
sprintf(public_portstring, "%hd", agents[i].public_port);
memcpy(&buffer[length], public_portstring, strlen(public_portstring));
length += strlen(public_portstring);
// Write agent's private IP address
buffer[length++] = ' ';
private_address = agents[i].private_addr;
memcpy(&buffer[length], private_address, strlen(private_address));
length += strlen(private_address);
// Add private port number
buffer[length++] = ' ';
sprintf(private_portstring, "%hd,", agents[i].private_port);
memcpy(&buffer[length], private_portstring, strlen(private_portstring));
length += strlen(private_portstring);
numSent++;
}
}
std::cout << "The sent buffer was " << buffer << "\n";
if (length > 1) {
sentBytes = domainSocket.send(agentAddrPointer, buffer, length);
if (sentBytes < length) currentBufferPos += packSocket(currentBufferPos, agent->publicSocket);
std::cout << "Error sending agent list!\n"; currentBufferPos += packSocket(currentBufferPos, agent->localSocket);
else if (numSent != lastActiveCount) {
std::cout << numSent << " Active Agents\n";
lastActiveCount = numSent;
}
} }
return 1 + (currentBufferPos - startPointer); // 1 is added for the leading 'D'
} }
int main(int argc, const char * argv[]) int main(int argc, const char * argv[])
{ {
ssize_t receivedBytes = 0; ssize_t receivedBytes = 0;
timeval time, last_time; char agentType;
sockaddr_in agentAddress; unsigned char *broadcastPacket = new unsigned char[MAX_PACKET_SIZE];
*broadcastPacket = 'D';
gettimeofday(&last_time, NULL); sockaddr_in agentPublicAddress, agentLocalAddress;
agentLocalAddress.sin_family = AF_INET;
while (true) { while (true) {
if (domainSocket.receive(&agentAddress, packet_data, &receivedBytes)) { if (agentList.getAgentSocket()->receive((sockaddr *)&agentPublicAddress, packetData, &receivedBytes)) {
float x,y,z; agentType = packetData[0];
char agentType; unpackSocket(&packetData[1], (sockaddr *)&agentLocalAddress);
char private_ip[50];
unsigned short private_port; agentList.addOrUpdateAgent((sockaddr *)&agentPublicAddress, (sockaddr *)&agentLocalAddress, agentType);
sscanf(packet_data, "%c %f,%f,%f,%s %hd", &agentType, &x, &y, &z, private_ip, &private_port);
if (addAgent(agentAddress.sin_addr.s_addr, ntohs(agentAddress.sin_port), private_ip, private_port, agentType, x, y, z)) { int listBytes = listForBroadcast(broadcastPacket);
std::cout << "Added Agent, type " << agentType << " from " <<
inet_ntoa(agentAddress.sin_addr) << ":" << ntohs(agentAddress.sin_port) << if (listBytes > 0) {
" (" << private_ip << ":" << private_port << ")" << "\n"; agentList.getAgentSocket()->send((sockaddr *)&agentPublicAddress, broadcastPacket, listBytes);
} }
// Reply with packet listing nearby active agents
send_agent_list(&agentAddress);
}
gettimeofday(&time, NULL);
if (diffclock(last_time, time) > LOGOFF_CHECK_INTERVAL) {
gettimeofday(&last_time, NULL);
update_agent_list(last_time);
} }
} }
return 0; return 0;

View file

@ -55,14 +55,12 @@ int simulate_on = 1;
const int MAX_PACKET_SIZE = 1500; const int MAX_PACKET_SIZE = 1500;
char DOMAIN_HOSTNAME[] = "highfidelity.below92.com"; char DOMAIN_HOSTNAME[] = "highfidelity.below92.com";
char DOMAIN_IP[100] = ""; // IP Address will be used first if not empty string char DOMAIN_IP[100] = "192.168.1.47"; // IP Address will be used first if not empty string
const int DOMAINSERVER_PORT = 40102; const int DOMAINSERVER_PORT = 40102;
AgentList agentList; AgentList agentList;
// For testing, add milliseconds of delay for received UDP packets // For testing, add milliseconds of delay for received UDP packets
char* incomingPacket;
int packetcount = 0; int packetcount = 0;
int packets_per_second = 0; int packets_per_second = 0;
int bytes_per_second = 0; int bytes_per_second = 0;
@ -79,7 +77,7 @@ int WIDTH = 1200;
int HEIGHT = 800; int HEIGHT = 800;
int fullscreen = 0; int fullscreen = 0;
char localAddressBuffer[INET_ADDRSTRLEN]; in_addr_t localAddress;
Oscilloscope audioScope(512,200,true); Oscilloscope audioScope(512,200,true);
@ -222,11 +220,11 @@ void Timer(int extra)
// //
// Send a message to the domainserver telling it we are ALIVE // Send a message to the domainserver telling it we are ALIVE
// //
char output[100]; unsigned char output[7];
sprintf(output, "%c %f,%f,%f,%s %hd", 'I', location[0], location[1], location[2], localAddressBuffer, AGENT_SOCKET_LISTEN_PORT); output[0] = 'I';
std::cout << "sending " << output << " to domain server\n"; packSocket(output + 1, localAddress, htons(AGENT_SOCKET_LISTEN_PORT));
int packet_size = strlen(output);
agentList.getAgentSocket()->send(DOMAIN_IP, DOMAINSERVER_PORT, output, packet_size); agentList.getAgentSocket()->send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7);
// Ping the agents we can see // Ping the agents we can see
agentList.pingAgents(); agentList.pingAgents();
@ -764,8 +762,10 @@ void *networkReceive(void *args)
{ {
sockaddr senderAddress; sockaddr senderAddress;
ssize_t bytesReceived; ssize_t bytesReceived;
unsigned char *incomingPacket = new unsigned char[MAX_PACKET_SIZE];
while (true) { while (true) {
if (agentList.getAgentSocket()->receive(&senderAddress, (void *)incomingPacket, &bytesReceived)) { if (agentList.getAgentSocket()->receive(&senderAddress, incomingPacket, &bytesReceived)) {
packetcount++; packetcount++;
bytescount += bytesReceived; bytescount += bytesReceived;
@ -926,20 +926,15 @@ void *poll_marker_capture(void *threadarg){
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
// Create network socket and buffer
incomingPacket = new char[MAX_PACKET_SIZE];
struct ifaddrs * ifAddrStruct=NULL; struct ifaddrs * ifAddrStruct=NULL;
struct ifaddrs * ifa=NULL; struct ifaddrs * ifa=NULL;
void * tmpAddrPtr=NULL;
getifaddrs(&ifAddrStruct); getifaddrs(&ifAddrStruct);
for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) { for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
if (ifa ->ifa_addr->sa_family==AF_INET) { // check it is IP4 if (ifa ->ifa_addr->sa_family==AF_INET) { // check it is IP4
// is a valid IP4 Address // is a valid IP4 Address
tmpAddrPtr=&((struct sockaddr_in *)ifa->ifa_addr)->sin_addr; localAddress = ((struct sockaddr_in *)ifa->ifa_addr)->sin_addr.s_addr;
// inet_ntop(AF_INET, tmpAddrPtr, localAddressBuffer, INET_ADDRSTRLEN);
} }
} }

View file

@ -77,4 +77,14 @@ bool Agent::matches(sockaddr *otherPublicSocket, sockaddr *otherLocalSocket, cha
return type == otherAgentType return type == otherAgentType
&& socketMatch(publicSocket, otherPublicSocket) && socketMatch(publicSocket, otherPublicSocket)
&& socketMatch(localSocket, otherLocalSocket); && socketMatch(localSocket, otherLocalSocket);
}
std::ostream& operator<<(std::ostream& os, const Agent* agent) {
sockaddr_in *agentPublicSocket = (sockaddr_in *)agent->publicSocket;
sockaddr_in *agentLocalSocket = (sockaddr_in *)agent->localSocket;
os << "T: " << agent->type << " PA: " << inet_ntoa(agentPublicSocket->sin_addr) <<
":" << ntohs(agentPublicSocket->sin_port) << " LA: " << inet_ntoa(agentLocalSocket->sin_addr) <<
":" << ntohs(agentLocalSocket->sin_port);
return os;
} }

View file

@ -21,16 +21,21 @@ class Agent {
Agent& operator=(Agent otherAgent); Agent& operator=(Agent otherAgent);
bool operator==(const Agent& otherAgent); bool operator==(const Agent& otherAgent);
~Agent(); ~Agent();
bool matches(sockaddr *otherPublicSocket, sockaddr *otherLocalSocket, char otherAgentType); bool matches(sockaddr *otherPublicSocket, sockaddr *otherLocalSocket, char otherAgentType);
sockaddr *publicSocket, *localSocket, *activeSocket; sockaddr *publicSocket, *localSocket, *activeSocket;
char type; char type;
timeval pingStarted; timeval pingStarted;
int pingMsecs; int pingMsecs;
bool isSelf; bool isSelf;
AgentData *linkedData; AgentData *linkedData;
friend std::ostream& operator<<(std::ostream& os, const Agent* agent);
private: private:
void swap(Agent &first, Agent &second); void swap(Agent &first, Agent &second);
}; };
std::ostream& operator<<(std::ostream& os, const Agent* agent);
#endif /* defined(__hifi__Agent__) */ #endif /* defined(__hifi__Agent__) */

View file

@ -7,21 +7,26 @@
// //
#include "AgentList.h" #include "AgentList.h"
#include <arpa/inet.h>
AgentList::AgentList() : agentSocket(AGENT_SOCKET_LISTEN_PORT) { AgentList::AgentList() : agentSocket(AGENT_SOCKET_LISTEN_PORT) {
} }
AgentList::AgentList(int socketListenPort) : agentSocket(socketListenPort) {
}
UDPSocket * AgentList::getAgentSocket() { UDPSocket * AgentList::getAgentSocket() {
return &agentSocket; return &agentSocket;
} }
void AgentList::processAgentData(sockaddr *senderAddress, char *packetData, size_t dataBytes) { void AgentList::processAgentData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) {
switch (packetData[0]) { switch (packetData[0]) {
case 'D': case 'D':
{ {
// list of agents from domain server // list of agents from domain server
updateList(packetData); updateList(packetData, dataBytes);
break; break;
} }
case 'P': case 'P':
@ -40,52 +45,54 @@ void AgentList::processAgentData(sockaddr *senderAddress, char *packetData, size
} }
} }
int AgentList::updateList(char *packetData) { int AgentList::updateList(unsigned char *packetData, size_t dataBytes) {
int readAgents = 0; int readAgents = 0;
int scannedItems = 0;
char agentType; char agentType;
// assumes only IPv4 addresses // assumes only IPv4 addresses
sockaddr_in *agentPublicSocket = new sockaddr_in; sockaddr_in agentPublicSocket;
agentPublicSocket->sin_family = AF_INET; agentPublicSocket.sin_family = AF_INET;
sockaddr_in *agentLocalSocket = new sockaddr_in; sockaddr_in agentLocalSocket;
agentLocalSocket->sin_family = AF_INET; agentLocalSocket.sin_family = AF_INET;
unsigned char *readPtr = packetData + 1;
unsigned char *startPtr = packetData;
Agent newAgent; while((readPtr - startPtr) < dataBytes) {
agentType = *readPtr++;
while((scannedItems = sscanf(packetData, readPtr += unpackSocket(readPtr, (sockaddr *)&agentPublicSocket);
"%c %u %hd %u %hd,", readPtr += unpackSocket(readPtr, (sockaddr *)&agentLocalSocket);
&agentType,
&agentPublicSocket->sin_addr.s_addr,
&agentPublicSocket->sin_port,
&agentLocalSocket->sin_addr.s_addr,
&agentLocalSocket->sin_port
))) {
std::vector<Agent>::iterator agent; addOrUpdateAgent((sockaddr *)&agentPublicSocket, (sockaddr *)&agentLocalSocket, agentType);
}
for(agent = agents.begin(); agent != agents.end(); agent++) {
if (agent->matches((sockaddr *)&agentPublicSocket, (sockaddr *)&agentLocalSocket, agentType)) { return readAgents;
// we already have this agent, stop checking }
break;
} bool AgentList::addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType) {
std::vector<Agent>::iterator agent;
for(agent = agents.begin(); agent != agents.end(); agent++) {
if (agent->matches(publicSocket, localSocket, agentType)) {
// we already have this agent, stop checking
break;
} }
if (agent == agents.end()) {
// we didn't have this agent, so add them
newAgent = Agent((sockaddr *)agentPublicSocket, (sockaddr *)agentLocalSocket, agentType);
std::cout << "Added new agent - PS: " << agentPublicSocket << " LS: " << agentLocalSocket << " AT: " << agentType << "\n";
agents.push_back(newAgent);
} else {
// we had this agent already, don't do anything
}
readAgents++;
} }
return readAgents; if (agent == agents.end()) {
// we didn't have this agent, so add them
Agent newAgent = Agent(publicSocket, localSocket, agentType);
std::cout << "Added agent - " << &newAgent << "\n";
agents.push_back(newAgent);
return true;
} else {
// we had this agent already
return false;
}
} }
void AgentList::pingAgents() { void AgentList::pingAgents() {

View file

@ -19,16 +19,19 @@ const unsigned short AGENT_SOCKET_LISTEN_PORT = 40103;
class AgentList { class AgentList {
public: public:
AgentList(); AgentList();
AgentList(int socketListenPort);
std::vector<Agent> agents; std::vector<Agent> agents;
int updateList(char *packetData);
void processAgentData(sockaddr *senderAddress, char *packetData, size_t dataBytes);
int broadcastToAgents(char *broadcastData, size_t dataBytes, bool sendToSelf);
void pingAgents();
UDPSocket* getAgentSocket(); UDPSocket* getAgentSocket();
int updateList(unsigned char *packetData, size_t dataBytes);
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
void processAgentData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes);
void broadcastToAgents(char *broadcastData, size_t dataBytes, bool sendToSelf);
void pingAgents();
private: private:
UDPSocket agentSocket; UDPSocket agentSocket;
int addAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
void handlePingReply(sockaddr *agentAddress); void handlePingReply(sockaddr *agentAddress);
}; };

View file

@ -36,6 +36,29 @@ bool socketMatch(sockaddr *first, sockaddr *second) {
} }
} }
int packSocket(unsigned char *packStore, in_addr_t inAddress, in_port_t networkOrderPort) {
packStore[0] = inAddress >> 24;
packStore[1] = inAddress >> 16;
packStore[2] = inAddress >> 8;
packStore[3] = inAddress;
packStore[4] = networkOrderPort >> 8;
packStore[5] = networkOrderPort;
return 6; // could be dynamically more if we need IPv6
}
int packSocket(unsigned char *packStore, sockaddr *socketToPack) {
return packSocket(packStore, ((sockaddr_in *) socketToPack)->sin_addr.s_addr, ((sockaddr_in *) socketToPack)->sin_port);
}
int unpackSocket(unsigned char *packedData, sockaddr *unpackDestSocket) {
sockaddr_in *destinationSocket = (sockaddr_in *) unpackDestSocket;
destinationSocket->sin_addr.s_addr = (packedData[0] << 24) + (packedData[1] << 16) + (packedData[2] << 8) + packedData[3];
destinationSocket->sin_port = (packedData[4] << 8) + packedData[5];
return 6; // this could be more if we ever need IPv6
}
UDPSocket::UDPSocket(int listeningPort) { UDPSocket::UDPSocket(int listeningPort) {
// create the socket // create the socket
handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);

View file

@ -28,5 +28,8 @@ class UDPSocket {
}; };
bool socketMatch(sockaddr *first, sockaddr *second); bool socketMatch(sockaddr *first, sockaddr *second);
int packSocket(unsigned char *packStore, in_addr_t inAddress, in_port_t networkOrderPort);
int packSocket(unsigned char *packStore, sockaddr *socketToPack);
int unpackSocket(unsigned char *packedData, sockaddr *unpackDestSocket);
#endif /* defined(__interface__UDPSocket__) */ #endif /* defined(__interface__UDPSocket__) */