// // AgentList.cpp // hifi // // Created by Stephen Birarda on 2/15/13. // Copyright (c) 2013 High Fidelity, Inc. All rights reserved. // #include #include #include #include #include "AgentList.h" #include "AgentTypes.h" #include "PacketHeaders.h" #include "SharedUtil.h" #include "shared_Log.h" #ifdef _WIN32 #include "Syssocket.h" #else #include #endif using shared_lib::printLog; const char SOLO_AGENT_TYPES[3] = { AGENT_TYPE_AVATAR_MIXER, AGENT_TYPE_AUDIO_MIXER, AGENT_TYPE_VOXEL }; char DOMAIN_HOSTNAME[] = "highfidelity.below92.com"; char DOMAIN_IP[100] = ""; // IP Address will be re-set by lookup on startup const int DOMAINSERVER_PORT = 40102; bool silentAgentThreadStopFlag = false; bool domainServerCheckinStopFlag = false; bool pingUnknownAgentThreadStopFlag = false; pthread_mutex_t vectorChangeMutex = PTHREAD_MUTEX_INITIALIZER; AgentList* AgentList::_sharedInstance = NULL; AgentList* AgentList::createInstance(char ownerType, unsigned int socketListenPort) { if (_sharedInstance == NULL) { _sharedInstance = new AgentList(ownerType, socketListenPort); } else { printLog("AgentList createInstance called with existing instance.\n"); } return _sharedInstance; } AgentList* AgentList::getInstance() { if (_sharedInstance == NULL) { printLog("AgentList getInstance called before call to createInstance. Returning NULL pointer.\n"); } return _sharedInstance; } AgentList::AgentList(char newOwnerType, unsigned int newSocketListenPort) : _agentBuckets(), _numAgents(0), agentSocket(newSocketListenPort), ownerType(newOwnerType), socketListenPort(newSocketListenPort), lastAgentId(0) { } AgentList::~AgentList() { // stop the spawned threads, if they were started stopSilentAgentRemovalThread(); stopDomainServerCheckInThread(); stopPingUnknownAgentsThread(); } UDPSocket& AgentList::getAgentSocket() { return agentSocket; } char AgentList::getOwnerType() { return ownerType; } unsigned int AgentList::getSocketListenPort() { return socketListenPort; } void AgentList::processAgentData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) { switch (((char *)packetData)[0]) { case PACKET_HEADER_DOMAIN: { updateList(packetData, dataBytes); break; } case PACKET_HEADER_PING: { agentSocket.send(senderAddress, &PACKET_HEADER_PING_REPLY, 1); break; } case PACKET_HEADER_PING_REPLY: { handlePingReply(senderAddress); break; } } } void AgentList::processBulkAgentData(sockaddr *senderAddress, unsigned char *packetData, int numTotalBytes) { // find the avatar mixer in our agent list and update the lastRecvTime from it Agent* bulkSendAgent = agentWithAddress(senderAddress); if (bulkSendAgent) { bulkSendAgent->setLastRecvTimeUsecs(usecTimestampNow()); bulkSendAgent->recordBytesReceived(numTotalBytes); } unsigned char *startPosition = packetData; unsigned char *currentPosition = startPosition + 1; unsigned char packetHolder[numTotalBytes]; packetHolder[0] = PACKET_HEADER_HEAD_DATA; uint16_t agentID = -1; while ((currentPosition - startPosition) < numTotalBytes) { currentPosition += unpackAgentId(currentPosition, &agentID); memcpy(packetHolder + 1, currentPosition, numTotalBytes - (currentPosition - startPosition)); Agent* matchingAgent = agentWithID(agentID); if (!matchingAgent) { // we're missing this agent, we need to add it to the list addOrUpdateAgent(NULL, NULL, AGENT_TYPE_AVATAR, agentID); // TODO: this is a really stupid way to do this // Add a reverse iterator and go from the end of the list matchingAgent = agentWithID(agentID); } currentPosition += updateAgentWithData(matchingAgent, packetHolder, numTotalBytes - (currentPosition - startPosition)); } } int AgentList::updateAgentWithData(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) { // find the agent by the sockaddr Agent* matchingAgent = agentWithAddress(senderAddress); if (matchingAgent) { return updateAgentWithData(matchingAgent, packetData, dataBytes); } else { return 0; } } int AgentList::updateAgentWithData(Agent *agent, unsigned char *packetData, int dataBytes) { agent->setLastRecvTimeUsecs(usecTimestampNow()); if (agent->getActiveSocket() != NULL) { agent->recordBytesReceived(dataBytes); } if (agent->getLinkedData() == NULL) { if (linkedDataCreateCallback != NULL) { linkedDataCreateCallback(agent); } } return agent->getLinkedData()->parseData(packetData, dataBytes); } Agent* AgentList::agentWithAddress(sockaddr *senderAddress) { for(AgentListIterator agent = begin(); agent != end(); agent++) { if ((*agent).getActiveSocket() != NULL && socketMatch((*agent).getActiveSocket(), senderAddress)) { return &*agent; } } return NULL; } Agent* AgentList::agentWithID(uint16_t agentID) { for(AgentListIterator agent = begin(); agent != end(); agent++) { if ((*agent).getAgentId() == agentID) { return &*agent; } } return NULL; } uint16_t AgentList::getLastAgentId() { return lastAgentId; } void AgentList::increaseAgentId() { ++lastAgentId; } int AgentList::updateList(unsigned char *packetData, size_t dataBytes) { int readAgents = 0; char agentType; uint16_t agentId; // assumes only IPv4 addresses sockaddr_in agentPublicSocket; agentPublicSocket.sin_family = AF_INET; sockaddr_in agentLocalSocket; agentLocalSocket.sin_family = AF_INET; unsigned char *readPtr = packetData + 1; unsigned char *startPtr = packetData; while((readPtr - startPtr) < dataBytes) { agentType = *readPtr++; readPtr += unpackAgentId(readPtr, (uint16_t *)&agentId); readPtr += unpackSocket(readPtr, (sockaddr *)&agentPublicSocket); readPtr += unpackSocket(readPtr, (sockaddr *)&agentLocalSocket); addOrUpdateAgent((sockaddr *)&agentPublicSocket, (sockaddr *)&agentLocalSocket, agentType, agentId); } return readAgents; } bool AgentList::addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType, uint16_t agentId) { AgentListIterator agent = end(); if (publicSocket != NULL) { for (agent = begin(); agent != end(); agent++) { if ((*agent).matches(publicSocket, localSocket, agentType)) { // we already have this agent, stop checking break; } } } if (agent == end()) { // we didn't have this agent, so add them Agent *newAgent = new Agent(publicSocket, localSocket, agentType, agentId); if (socketMatch(publicSocket, localSocket)) { // likely debugging scenario with two agents on local network // set the agent active right away newAgent->activatePublicSocket(); } if (newAgent->getType() == AGENT_TYPE_AUDIO_MIXER && audioMixerSocketUpdate != NULL) { // this is an audio mixer // for now that means we need to tell the audio class // to use the local socket information the domain server gave us sockaddr_in *publicSocketIn = (sockaddr_in *)publicSocket; audioMixerSocketUpdate(publicSocketIn->sin_addr.s_addr, publicSocketIn->sin_port); } else if (newAgent->getType() == AGENT_TYPE_VOXEL) { newAgent->activatePublicSocket(); } addAgentToList(newAgent); return true; } else { if ((*agent).getType() == AGENT_TYPE_AUDIO_MIXER || (*agent).getType() == AGENT_TYPE_VOXEL) { // until the Audio class also uses our agentList, we need to update // the lastRecvTimeUsecs for the audio mixer so it doesn't get killed and re-added continously (*agent).setLastRecvTimeUsecs(usecTimestampNow()); } // we had this agent already, do nothing for now return false; } } void AgentList::addAgentToList(Agent* newAgent) { // find the correct array to add this agent to int bucketIndex = _numAgents / AGENTS_PER_BUCKET; if (!_agentBuckets[bucketIndex]) { _agentBuckets[bucketIndex] = new Agent*[AGENTS_PER_BUCKET](); } _agentBuckets[bucketIndex][_numAgents % AGENTS_PER_BUCKET] = newAgent; ++_numAgents; printLog("Added agent - "); Agent::printLog(*newAgent); } void AgentList::broadcastToAgents(unsigned char *broadcastData, size_t dataBytes, const char* agentTypes, int numAgentTypes) { for(AgentListIterator agent = begin(); agent != end(); agent++) { // only send to the AgentTypes we are asked to send to. if ((*agent).getActiveSocket() != NULL && memchr(agentTypes, (*agent).getType(), numAgentTypes)) { // we know which socket is good for this agent, send there agentSocket.send((*agent).getActiveSocket(), broadcastData, dataBytes); } } } void AgentList::handlePingReply(sockaddr *agentAddress) { for(AgentListIterator agent = begin(); agent != end(); agent++) { // check both the public and local addresses for each agent to see if we find a match // prioritize the private address so that we prune erroneous local matches if (socketMatch((*agent).getPublicSocket(), agentAddress)) { (*agent).activatePublicSocket(); break; } else if (socketMatch((*agent).getLocalSocket(), agentAddress)) { (*agent).activateLocalSocket(); break; } } } Agent* AgentList::soloAgentOfType(char agentType) { if (memchr(SOLO_AGENT_TYPES, agentType, sizeof(SOLO_AGENT_TYPES)) != NULL) { for(AgentListIterator agent = begin(); agent != end(); agent++) { if ((*agent).getType() == agentType) { return &*agent; } } } return NULL; } void *pingUnknownAgents(void *args) { AgentList *agentList = (AgentList *)args; const int PING_INTERVAL_USECS = 1 * 1000000; timeval lastSend; while (!pingUnknownAgentThreadStopFlag) { gettimeofday(&lastSend, NULL); for(AgentListIterator agent = agentList->begin(); agent != agentList->end(); agent++) { if ((*agent).getActiveSocket() == NULL && ((*agent).getPublicSocket() != NULL && (*agent).getLocalSocket() != NULL)) { // ping both of the sockets for the agent so we can figure out // which socket we can use agentList->getAgentSocket().send((*agent).getPublicSocket(), &PACKET_HEADER_PING, 1); agentList->getAgentSocket().send((*agent).getLocalSocket(), &PACKET_HEADER_PING, 1); } } double usecToSleep = PING_INTERVAL_USECS - (usecTimestampNow() - usecTimestamp(&lastSend)); if (usecToSleep > 0) { usleep(usecToSleep); } } return NULL; } void AgentList::startPingUnknownAgentsThread() { pthread_create(&pingUnknownAgentsThread, NULL, pingUnknownAgents, (void *)this); } void AgentList::stopPingUnknownAgentsThread() { pingUnknownAgentThreadStopFlag = true; pthread_join(pingUnknownAgentsThread, NULL); } void *removeSilentAgents(void *args) { AgentList *agentList = (AgentList *)args; double checkTimeUSecs, sleepTime; while (!silentAgentThreadStopFlag) { checkTimeUSecs = usecTimestampNow(); for(AgentListIterator agent = agentList->begin(); agent != agentList->end(); agent++) { if ((checkTimeUSecs - (*agent).getLastRecvTimeUsecs()) > AGENT_SILENCE_THRESHOLD_USECS && (*agent).getType() != AGENT_TYPE_VOXEL) { printLog("Killing agent - "); Agent::printLog(*agent); (*agent).setAlive(false); } else { agent++; } } sleepTime = AGENT_SILENCE_THRESHOLD_USECS - (usecTimestampNow() - checkTimeUSecs); #ifdef _WIN32 Sleep( static_cast(1000.0f*sleepTime) ); #else usleep(sleepTime); #endif } pthread_exit(0); return NULL; } void AgentList::startSilentAgentRemovalThread() { pthread_create(&removeSilentAgentsThread, NULL, removeSilentAgents, (void *)this); } void AgentList::stopSilentAgentRemovalThread() { silentAgentThreadStopFlag = true; pthread_join(removeSilentAgentsThread, NULL); } void *checkInWithDomainServer(void *args) { const int DOMAIN_SERVER_CHECK_IN_USECS = 1 * 1000000; AgentList *parentAgentList = (AgentList *)args; timeval lastSend; unsigned char output[7]; in_addr_t localAddress = getLocalAddress(); // Lookup the IP address of the domain server if we need to if (atoi(DOMAIN_IP) == 0) { struct hostent* pHostInfo; if ((pHostInfo = gethostbyname(DOMAIN_HOSTNAME)) != NULL) { sockaddr_in tempAddress; memcpy(&tempAddress.sin_addr, pHostInfo->h_addr_list[0], pHostInfo->h_length); strcpy(DOMAIN_IP, inet_ntoa(tempAddress.sin_addr)); printLog("Domain server: %s - %s\n", DOMAIN_HOSTNAME, DOMAIN_IP); } else { printLog("Failed lookup domainserver\n"); } } else printLog("Using static domainserver IP: %s\n", DOMAIN_IP); while (!domainServerCheckinStopFlag) { gettimeofday(&lastSend, NULL); output[0] = parentAgentList->getOwnerType(); packSocket(output + 1, localAddress, htons(parentAgentList->getSocketListenPort())); parentAgentList->getAgentSocket().send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7); double usecToSleep = DOMAIN_SERVER_CHECK_IN_USECS - (usecTimestampNow() - usecTimestamp(&lastSend)); if (usecToSleep > 0) { usleep(usecToSleep); } } pthread_exit(0); return NULL; } void AgentList::startDomainServerCheckInThread() { pthread_create(&checkInWithDomainServerThread, NULL, checkInWithDomainServer, (void *)this); } void AgentList::stopDomainServerCheckInThread() { domainServerCheckinStopFlag = true; pthread_join(checkInWithDomainServerThread, NULL); } AgentListIterator AgentList::begin() const { Agent** agentBucket = NULL; for (int i = 0; i < _numAgents; i++) { if (i % AGENTS_PER_BUCKET == 0) { agentBucket = _agentBuckets[i / AGENTS_PER_BUCKET]; } if (agentBucket[i % AGENTS_PER_BUCKET]->isAlive()) { return AgentListIterator(this, i); } } return AgentListIterator(this, 0); } AgentListIterator AgentList::end() const { Agent** agentBucket = _agentBuckets[(_numAgents - 1) / AGENTS_PER_BUCKET]; for (int i = _numAgents - 1; i >= 0; i--) { if (i % AGENTS_PER_BUCKET == 0) { agentBucket = _agentBuckets[i / AGENTS_PER_BUCKET]; } if (agentBucket[i % AGENTS_PER_BUCKET]->isAlive()) { return AgentListIterator(this, i + 1); } } return AgentListIterator(this, 0); } AgentListIterator::AgentListIterator(const AgentList* agentList, int agentIndex) : _agentIndex(agentIndex) { _agentList = agentList; } AgentListIterator& AgentListIterator::operator=(const AgentListIterator& otherValue) { _agentList = otherValue._agentList; _agentIndex = otherValue._agentIndex; return (*this); } bool AgentListIterator::operator==(const AgentListIterator &otherValue) { return _agentIndex == otherValue._agentIndex; } bool AgentListIterator::operator!=(const AgentListIterator &otherValue) { return !(*this == otherValue); } Agent& AgentListIterator::operator*() { Agent** agentBucket = _agentList->_agentBuckets[_agentIndex / AGENTS_PER_BUCKET]; return *agentBucket[_agentIndex % AGENTS_PER_BUCKET]; } AgentListIterator& AgentListIterator::operator++() { if (*this != _agentList->end()) { ++_agentIndex; } return (*this); } AgentListIterator& AgentListIterator::operator++(int) { if (*this != _agentList->end()) { ++_agentIndex; } return (*this); }