add silent agent removal to AgentList class

This commit is contained in:
Stephen Birarda 2013-02-22 13:38:33 -08:00
parent 345ec8b938
commit 5d2cba0b2a
7 changed files with 102 additions and 7 deletions

View file

@ -942,6 +942,9 @@ int main(int argc, char** argv)
agentList.linkedDataCreateCallback = &attachNewHeadToAgent;
agentList.audioMixerSocketUpdate = &audioMixerUpdate;
// start the thread which checks for silent agents
agentList.startSilentAgentRemovalThread();
// create thread for receipt of data via UDP
pthread_create(&networkReceiveThread, NULL, networkReceive, NULL);

View file

@ -10,10 +10,9 @@
#include <arpa/inet.h>
#include <cstring>
#include "UDPSocket.h"
#include "SharedUtil.h"
Agent::Agent() {
}
Agent::Agent() {}
Agent::Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agentType) {
publicSocket = new sockaddr;
@ -23,6 +22,7 @@ Agent::Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agent
memcpy(localSocket, agentLocalSocket, sizeof(sockaddr));
type = agentType;
lastRecvTimeUsecs = usecTimestampNow();
activeSocket = NULL;
linkedData = NULL;

View file

@ -28,6 +28,7 @@ class Agent {
char type;
timeval pingStarted;
int pingMsecs;
double lastRecvTimeUsecs;
bool isSelf;
AgentData *linkedData;

View file

@ -8,6 +8,11 @@
#include "AgentList.h"
#include <arpa/inet.h>
#include <pthread.h>
#include "SharedUtil.h"
bool stopAgentRemovalThread = false;
pthread_mutex_t vectorChangeMutex = PTHREAD_MUTEX_INITIALIZER;
AgentList::AgentList() : agentSocket(AGENT_SOCKET_LISTEN_PORT) {
linkedDataCreateCallback = NULL;
@ -15,7 +20,12 @@ AgentList::AgentList() : agentSocket(AGENT_SOCKET_LISTEN_PORT) {
}
AgentList::AgentList(int socketListenPort) : agentSocket(socketListenPort) {
linkedDataCreateCallback = NULL;
audioMixerSocketUpdate = NULL;
}
AgentList::~AgentList() {
stopSilentAgentRemovalThread();
}
UDPSocket * AgentList::getAgentSocket() {
@ -59,6 +69,8 @@ void AgentList::updateAgentWithData(sockaddr *senderAddress, void *packetData, s
if (agentIndex != -1) {
Agent *matchingAgent = &agents[agentIndex];
matchingAgent->lastRecvTimeUsecs = usecTimestampNow();
if (matchingAgent->linkedData == NULL) {
if (linkedDataCreateCallback != NULL) {
linkedDataCreateCallback(matchingAgent);
@ -135,7 +147,9 @@ bool AgentList::addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket,
std::cout << "Added agent - " << &newAgent << "\n";
agents.push_back(newAgent);
pthread_mutex_lock(&vectorChangeMutex);
agents.push_back(newAgent);
pthread_mutex_unlock(&vectorChangeMutex);
return true;
} else {
@ -155,7 +169,6 @@ void AgentList::broadcastToAgents(char *broadcastData, size_t dataBytes) {
}
}
void AgentList::pingAgents() {
char payload[] = "P";
@ -193,4 +206,39 @@ void AgentList::handlePingReply(sockaddr *agentAddress) {
break;
}
}
}
void *removeSilentAgents(void *args) {
std::vector<Agent> *agents = (std::vector<Agent> *)args;
double checkTimeUSecs, sleepTime;
while (!stopAgentRemovalThread) {
checkTimeUSecs = usecTimestampNow();
for(std::vector<Agent>::iterator agent = agents->begin(); agent != agents->end();) {
if ((checkTimeUSecs - agent->lastRecvTimeUsecs) > AGENT_SILENCE_THRESHOLD_USECS) {
std::cout << "Killing agent " << &(*agent) << "\n";
pthread_mutex_lock(&vectorChangeMutex);
agent = agents->erase(agent);
pthread_mutex_unlock(&vectorChangeMutex);
} else {
agent++;
}
}
sleepTime = AGENT_SILENCE_THRESHOLD_USECS - (usecTimestampNow() - checkTimeUSecs);
usleep(sleepTime);
}
pthread_exit(0);
}
void AgentList::startSilentAgentRemovalThread() {
pthread_create(&removeSilentAgentsThread, NULL, removeSilentAgents, (void *)&agents);
}
void AgentList::stopSilentAgentRemovalThread() {
stopAgentRemovalThread = true;
pthread_join(removeSilentAgentsThread, NULL);
}

View file

@ -15,11 +15,13 @@
#include "UDPSocket.h"
const unsigned short AGENT_SOCKET_LISTEN_PORT = 40103;
const int AGENT_SILENCE_THRESHOLD_USECS = 2 * 1000000;
class AgentList {
public:
AgentList();
AgentList(int socketListenPort);
~AgentList();
std::vector<Agent> agents;
void(*linkedDataCreateCallback)(Agent *);
void(*audioMixerSocketUpdate)(in_addr_t, in_port_t);
@ -30,12 +32,16 @@ class AgentList {
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
void processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void broadcastToAgents(char *broadcastData, size_t dataBytes);
void sendToAgent(Agent *destAgent, void *packetData, size_t dataBytes);
void pingAgents();
void startSilentAgentRemovalThread();
void stopSilentAgentRemovalThread();
private:
UDPSocket agentSocket;
int indexOfMatchingAgent(sockaddr *senderAddress);
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void handlePingReply(sockaddr *agentAddress);
UDPSocket agentSocket;
pthread_t removeSilentAgentsThread;
};
#endif /* defined(__hifi__AgentList__) */

19
shared/src/SharedUtil.cpp Normal file
View file

@ -0,0 +1,19 @@
//
// SharedUtil.cpp
// hifi
//
// Created by Stephen Birarda on 2/22/13.
//
//
#include "SharedUtil.h"
double usecTimestamp(timeval *time) {
return (time->tv_sec * 1000000.0);
}
double usecTimestampNow() {
timeval now;
gettimeofday(&now, NULL);
return (now.tv_sec * 1000000.0);
}

18
shared/src/SharedUtil.h Normal file
View file

@ -0,0 +1,18 @@
//
// SharedUtil.h
// hifi
//
// Created by Stephen Birarda on 2/22/13.
//
//
#ifndef __hifi__SharedUtil__
#define __hifi__SharedUtil__
#include <iostream>
#include <sys/time.h>
double usecTimestamp(timeval *time);
double usecTimestampNow();
#endif /* defined(__hifi__SharedUtil__) */