abstract check in with DS to AgentList

This commit is contained in:
Stephen Birarda 2013-03-25 16:23:13 -07:00
parent 896047d2aa
commit 3ca5263582
4 changed files with 135 additions and 38 deletions

View file

@ -17,22 +17,25 @@
#endif
const char * SOLO_AGENT_TYPES_STRING = "MV";
char DOMAIN_HOSTNAME[] = "highfidelity.below92.com";
char DOMAIN_IP[100] = "192.168.1.47"; // IP Address will be re-set by lookup on startup
const int DOMAINSERVER_PORT = 40102;
bool stopAgentRemovalThread = false;
bool silentAgentThreadStopFlag = false;
bool domainServerCheckinStopFlag = false;
pthread_mutex_t vectorChangeMutex = PTHREAD_MUTEX_INITIALIZER;
AgentList::AgentList() : agentSocket(AGENT_SOCKET_LISTEN_PORT) {
linkedDataCreateCallback = NULL;
audioMixerSocketUpdate = NULL;
}
AgentList::AgentList(int socketListenPort) : agentSocket(socketListenPort) {
AgentList::AgentList(char newOwnerType, unsigned int newSocketListenPort) : agentSocket(newSocketListenPort) {
ownerType = newOwnerType;
socketListenPort = newSocketListenPort;
linkedDataCreateCallback = NULL;
audioMixerSocketUpdate = NULL;
}
AgentList::~AgentList() {
// stop the spawned threads, if they were started
stopSilentAgentRemovalThread();
stopDomainServerCheckInThread();
}
std::vector<Agent>& AgentList::getAgents() {
@ -43,6 +46,14 @@ UDPSocket& AgentList::getAgentSocket() {
return agentSocket;
}
char AgentList::getOwnerType() {
return ownerType;
}
unsigned int AgentList::getSocketListenPort() {
return socketListenPort;
}
void AgentList::processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes) {
switch (((char *)packetData)[0]) {
case 'D':
@ -154,10 +165,10 @@ bool AgentList::addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket,
// this is an audio mixer
// for now that means we need to tell the audio class
// to use the local socket information the domain server gave us
sockaddr_in *localSocketIn = (sockaddr_in *)localSocket;
audioMixerSocketUpdate(localSocketIn->sin_addr.s_addr, localSocketIn->sin_port);
sockaddr_in *publicSocketIn = (sockaddr_in *)publicSocket;
audioMixerSocketUpdate(publicSocketIn->sin_addr.s_addr, publicSocketIn->sin_port);
} else if (newAgent.getType() == 'V') {
newAgent.activateLocalSocket();
newAgent.activatePublicSocket();
}
std::cout << "Added agent - " << &newAgent << "\n";
@ -227,7 +238,7 @@ void *removeSilentAgents(void *args) {
std::vector<Agent> *agents = (std::vector<Agent> *)args;
double checkTimeUSecs, sleepTime;
while (!stopAgentRemovalThread) {
while (!silentAgentThreadStopFlag) {
checkTimeUSecs = usecTimestampNow();
for(std::vector<Agent>::iterator agent = agents->begin(); agent != agents->end();) {
@ -259,6 +270,58 @@ void AgentList::startSilentAgentRemovalThread() {
}
void AgentList::stopSilentAgentRemovalThread() {
stopAgentRemovalThread = true;
silentAgentThreadStopFlag = true;
pthread_join(removeSilentAgentsThread, NULL);
}
void *checkInWithDomainServer(void *args) {
AgentList *parentAgentList = (AgentList *)args;
timeval lastSend;
unsigned char output[7];
in_addr_t localAddress = getLocalAddress();
// Lookup the IP address of the domain server if we need to
if (atoi(DOMAIN_IP) == 0) {
struct hostent* pHostInfo;
if ((pHostInfo = gethostbyname(DOMAIN_HOSTNAME)) != NULL) {
sockaddr_in tempAddress;
memcpy(&tempAddress.sin_addr, pHostInfo->h_addr_list[0], pHostInfo->h_length);
strcpy(DOMAIN_IP, inet_ntoa(tempAddress.sin_addr));
printf("Domain server %s: %s\n", DOMAIN_HOSTNAME, DOMAIN_IP);
} else {
printf("Failed lookup domainserver\n");
}
} else printf("Using static domainserver IP: %s\n", DOMAIN_IP);
while (!domainServerCheckinStopFlag) {
gettimeofday(&lastSend, NULL);
output[0] = parentAgentList->getOwnerType();
packSocket(output + 1, localAddress, htons(parentAgentList->getSocketListenPort()));
parentAgentList->getAgentSocket().send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7);
double usecToSleep = 1000000 - (usecTimestampNow() - usecTimestamp(&lastSend));
if (usecToSleep > 0) {
usleep(usecToSleep);
}
}
pthread_exit(0);
return NULL;
}
void AgentList::startDomainServerCheckInThread() {
pthread_create(&checkInWithDomainServerThread, NULL, checkInWithDomainServer, (void *)this);
}
void AgentList::stopDomainServerCheckInThread() {
domainServerCheckinStopFlag = true;
pthread_join(checkInWithDomainServerThread, NULL);
}

View file

@ -19,38 +19,48 @@
#endif
const int MAX_PACKET_SIZE = 1500;
const unsigned short AGENT_SOCKET_LISTEN_PORT = 40103;
const unsigned int AGENT_SOCKET_LISTEN_PORT = 40103;
const int AGENT_SILENCE_THRESHOLD_USECS = 2 * 1000000;
extern const char *SOLO_AGENT_TYPES_STRING;
extern char DOMAIN_HOSTNAME[];
extern char DOMAIN_IP[100]; // IP Address will be re-set by lookup on startup
extern const int DOMAINSERVER_PORT;
class AgentList {
public:
AgentList();
AgentList(int socketListenPort);
~AgentList();
void(*linkedDataCreateCallback)(Agent *);
void(*audioMixerSocketUpdate)(in_addr_t, in_port_t);
std::vector<Agent>& getAgents();
UDPSocket& getAgentSocket();
UDPSocket agentSocket;
char ownerType;
unsigned int socketListenPort;
std::vector<Agent> agents;
pthread_t removeSilentAgentsThread;
pthread_t checkInWithDomainServerThread;
int updateList(unsigned char *packetData, size_t dataBytes);
int indexOfMatchingAgent(sockaddr *senderAddress);
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
void processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void broadcastToAgents(char *broadcastData, size_t dataBytes);
void sendToAgent(Agent *destAgent, void *packetData, size_t dataBytes);
void pingAgents();
void startSilentAgentRemovalThread();
void stopSilentAgentRemovalThread();
private:
UDPSocket agentSocket;
std::vector<Agent> agents;
pthread_t removeSilentAgentsThread;
void handlePingReply(sockaddr *agentAddress);
public:
AgentList(char ownerType, unsigned int socketListenPort = AGENT_SOCKET_LISTEN_PORT);
~AgentList();
void(*linkedDataCreateCallback)(Agent *);
void(*audioMixerSocketUpdate)(in_addr_t, in_port_t);
void handlePingReply(sockaddr *agentAddress);
std::vector<Agent>& getAgents();
UDPSocket& getAgentSocket();
int updateList(unsigned char *packetData, size_t dataBytes);
int indexOfMatchingAgent(sockaddr *senderAddress);
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
void processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void broadcastToAgents(char *broadcastData, size_t dataBytes);
void sendToAgent(Agent *destAgent, void *packetData, size_t dataBytes);
void pingAgents();
char getOwnerType();
unsigned int getSocketListenPort();
void startSilentAgentRemovalThread();
void stopSilentAgentRemovalThread();
void startDomainServerCheckInThread();
void stopDomainServerCheckInThread();
};
#endif /* defined(__hifi__AgentList__) */

View file

@ -17,6 +17,7 @@
#else
#include <sys/socket.h>
#include <arpa/inet.h>
#include <ifaddrs.h>
#endif
sockaddr_in destSockaddr, senderAddress;
@ -64,6 +65,28 @@ int unpackSocket(unsigned char *packedData, sockaddr *unpackDestSocket) {
return 6; // this could be more if we ever need IPv6
}
int getLocalAddress() {
// get this agent's local address so we can pass that to DS
struct ifaddrs * ifAddrStruct = NULL;
struct ifaddrs * ifa = NULL;
int family;
int localAddress = 0;
getifaddrs(&ifAddrStruct);
for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
family = ifa->ifa_addr->sa_family;
if (family == AF_INET) {
localAddress = ((sockaddr_in *)ifa->ifa_addr)->sin_addr.s_addr;
}
}
freeifaddrs(ifAddrStruct);
return localAddress;
}
UDPSocket::UDPSocket(int listeningPort) {
// create the socket
handle = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);

View file

@ -36,5 +36,6 @@ bool socketMatch(sockaddr *first, sockaddr *second);
int packSocket(unsigned char *packStore, in_addr_t inAddress, in_port_t networkOrderPort);
int packSocket(unsigned char *packStore, sockaddr *socketToPack);
int unpackSocket(unsigned char *packedData, sockaddr *unpackDestSocket);
int getLocalAddress();
#endif /* defined(__interface__UDPSocket__) */