Merge pull request #203 from birarda/domain-rfd

distinction between RFD packets and keepalive packets when talking to domain server
This commit is contained in:
ZappoMan 2013-05-06 13:28:18 -07:00
commit 915d6e5024
7 changed files with 59 additions and 74 deletions

View file

@ -45,8 +45,6 @@ unsigned char packetData[MAX_PACKET_SIZE];
const int LOGOFF_CHECK_INTERVAL = 5000; const int LOGOFF_CHECK_INTERVAL = 5000;
#define DEBUG_TO_SELF 0
int lastActiveCount = 0; int lastActiveCount = 0;
unsigned char* addAgentToBroadcastPacket(unsigned char* currentPosition, Agent* agentToAdd) { unsigned char* addAgentToBroadcastPacket(unsigned char* currentPosition, Agent* agentToAdd) {
@ -81,13 +79,13 @@ int main(int argc, const char * argv[])
setvbuf(stdout, NULL, _IOLBF, 0); setvbuf(stdout, NULL, _IOLBF, 0);
ssize_t receivedBytes = 0; ssize_t receivedBytes = 0;
char agentType; char agentType = '\0';
unsigned char *broadcastPacket = new unsigned char[MAX_PACKET_SIZE]; unsigned char broadcastPacket[MAX_PACKET_SIZE];
*broadcastPacket = PACKET_HEADER_DOMAIN; broadcastPacket[0] = PACKET_HEADER_DOMAIN;
unsigned char *currentBufferPos; unsigned char* currentBufferPos;
unsigned char *startPointer; unsigned char* startPointer;
int packetBytesWithoutLeadingChar; int packetBytesWithoutLeadingChar;
sockaddr_in agentPublicAddress, agentLocalAddress; sockaddr_in agentPublicAddress, agentLocalAddress;
@ -101,8 +99,8 @@ int main(int argc, const char * argv[])
if (agentList->getAgentSocket().receive((sockaddr *)&agentPublicAddress, packetData, &receivedBytes)) { if (agentList->getAgentSocket().receive((sockaddr *)&agentPublicAddress, packetData, &receivedBytes)) {
std::map<char, Agent *> newestSoloAgents; std::map<char, Agent *> newestSoloAgents;
agentType = packetData[0]; agentType = packetData[1];
unpackSocket(&packetData[1], (sockaddr *)&agentLocalAddress); unpackSocket(&packetData[2], (sockaddr*) g&agentLocalAddress);
// check the agent public address // check the agent public address
// if it matches our local address we're on the same box // if it matches our local address we're on the same box
@ -115,21 +113,23 @@ int main(int argc, const char * argv[])
} }
} }
if (agentList->addOrUpdateAgent((sockaddr *)&agentPublicAddress, if (agentList->addOrUpdateAgent((sockaddr*) &agentPublicAddress,
(sockaddr *)&agentLocalAddress, (sockaddr*) &agentLocalAddress,
agentType, agentType,
agentList->getLastAgentId())) { agentList->getLastAgentId())) {
agentList->increaseAgentId(); agentList->increaseAgentId();
} else if (packetData[0] == PACKET_HEADER_DOMAIN_RFD) {
// if this is a previous agent, and they are re-reporting for duty
// then we need to update the first receive time
Agent* refreshedAgent = agentList->agentWithAddress((sockaddr*) &agentLocalAddress);
refreshedAgent->setWakeMicrostamp(usecTimestampNow());
} }
currentBufferPos = broadcastPacket + 1; currentBufferPos = broadcastPacket + 2;
startPointer = currentBufferPos; startPointer = currentBufferPos;
for (AgentList::iterator agent = agentList->begin(); agent != agentList->end(); agent++) { for (AgentList::iterator agent = agentList->begin(); agent != agentList->end(); agent++) {
if (DEBUG_TO_SELF || if (!agent->matches((sockaddr*) &agentPublicAddress, (sockaddr*) &agentLocalAddress, agentType)) {
!agent->matches((sockaddr *)&agentPublicAddress, (sockaddr *)&agentLocalAddress, agentType)) {
if (memchr(SOLO_AGENT_TYPES, agent->getType(), sizeof(SOLO_AGENT_TYPES)) == NULL) { if (memchr(SOLO_AGENT_TYPES, agent->getType(), sizeof(SOLO_AGENT_TYPES)) == NULL) {
// this is an agent of which there can be multiple, just add them to the packet // this is an agent of which there can be multiple, just add them to the packet
// don't send avatar agents to other avatars, that will come from avatar mixer // don't send avatar agents to other avatars, that will come from avatar mixer
@ -140,26 +140,26 @@ int main(int argc, const char * argv[])
} else { } else {
// solo agent, we need to only send newest // solo agent, we need to only send newest
if (newestSoloAgents[agent->getType()] == NULL || if (newestSoloAgents[agent->getType()] == NULL ||
newestSoloAgents[agent->getType()]->getFirstRecvTimeUsecs() < agent->getFirstRecvTimeUsecs()) { newestSoloAgents[agent->getType()]->getWakeMicrostamp() < agent->getWakeMicrostamp()) {
// we have to set the newer solo agent to add it to the broadcast later // we have to set the newer solo agent to add it to the broadcast later
newestSoloAgents[agent->getType()] = &(*agent); newestSoloAgents[agent->getType()] = &(*agent);
} }
} }
} else { } else {
// this is the agent, just update last receive to now // this is the agent, just update last receive to now
agent->setLastRecvTimeUsecs(usecTimestampNow()); agent->setLastHeardMicrostamp(usecTimestampNow());
} }
} }
for (std::map<char, Agent *>::iterator agentIterator = newestSoloAgents.begin(); for (std::map<char, Agent *>::iterator soloAgent = newestSoloAgents.begin();
agentIterator != newestSoloAgents.end(); soloAgent != newestSoloAgents.end();
agentIterator++) { soloAgent++) {
// this is the newest alive solo agent, add them to the packet // this is the newest alive solo agent, add them to the packet
currentBufferPos = addAgentToBroadcastPacket(currentBufferPos, agentIterator->second); currentBufferPos = addAgentToBroadcastPacket(currentBufferPos, soloAgent->second);
} }
if ((packetBytesWithoutLeadingChar = (currentBufferPos - startPointer))) { if ((packetBytesWithoutLeadingChar = (currentBufferPos - startPointer))) {
agentList->getAgentSocket().send((sockaddr *)&agentPublicAddress, agentList->getAgentSocket().send((sockaddr*) &agentPublicAddress,
broadcastPacket, broadcastPacket,
packetBytesWithoutLeadingChar + 1); packetBytesWithoutLeadingChar + 1);
} }

View file

@ -34,11 +34,6 @@ const int GRAVITY_SAMPLES = 200; // Use the first samples to
const bool USING_INVENSENSE_MPU9150 = 1; const bool USING_INVENSENSE_MPU9150 = 1;
SerialInterface::SerialInterface() :
active(false),
_failedOpenAttempts(0) {
}
void SerialInterface::pair() { void SerialInterface::pair() {
#ifdef __APPLE__ #ifdef __APPLE__

View file

@ -36,7 +36,8 @@ extern const bool USING_INVENSENSE_MPU9150;
class SerialInterface { class SerialInterface {
public: public:
SerialInterface(); SerialInterface() : active(false),
_failedOpenAttempts(0) {}
void pair(); void pair();
void readData(); void readData();

View file

@ -52,8 +52,8 @@ Agent::Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agent
type = agentType; type = agentType;
agentId = thisAgentId; agentId = thisAgentId;
firstRecvTimeUsecs = usecTimestampNow(); _wakeMicrostamp = usecTimestampNow();
lastRecvTimeUsecs = usecTimestampNow(); _lastHeardMicrostamp = usecTimestampNow();
activeSocket = NULL; activeSocket = NULL;
linkedData = NULL; linkedData = NULL;
@ -87,8 +87,8 @@ Agent::Agent(const Agent &otherAgent) {
activeSocket = NULL; activeSocket = NULL;
} }
firstRecvTimeUsecs = otherAgent.firstRecvTimeUsecs; _wakeMicrostamp = otherAgent._wakeMicrostamp;
lastRecvTimeUsecs = otherAgent.lastRecvTimeUsecs; _lastHeardMicrostamp = otherAgent._lastHeardMicrostamp;
type = otherAgent.type; type = otherAgent.type;
if (otherAgent.linkedData != NULL) { if (otherAgent.linkedData != NULL) {
@ -120,8 +120,8 @@ void Agent::swap(Agent &first, Agent &second) {
swap(first.type, second.type); swap(first.type, second.type);
swap(first.linkedData, second.linkedData); swap(first.linkedData, second.linkedData);
swap(first.agentId, second.agentId); swap(first.agentId, second.agentId);
swap(first.firstRecvTimeUsecs, second.firstRecvTimeUsecs); swap(first._wakeMicrostamp, second._wakeMicrostamp);
swap(first.lastRecvTimeUsecs, second.lastRecvTimeUsecs); swap(first._lastHeardMicrostamp, second._lastHeardMicrostamp);
swap(first._bytesReceivedMovingAverage, second._bytesReceivedMovingAverage); swap(first._bytesReceivedMovingAverage, second._bytesReceivedMovingAverage);
} }
@ -178,22 +178,6 @@ void Agent::setAgentId(uint16_t thisAgentId) {
agentId = thisAgentId; agentId = thisAgentId;
} }
double Agent::getFirstRecvTimeUsecs() {
return firstRecvTimeUsecs;
}
void Agent::setFirstRecvTimeUsecs(double newTimeUsecs) {
firstRecvTimeUsecs = newTimeUsecs;
}
double Agent::getLastRecvTimeUsecs() {
return lastRecvTimeUsecs;
}
void Agent::setLastRecvTimeUsecs(double newTimeUsecs) {
lastRecvTimeUsecs = newTimeUsecs;
}
sockaddr* Agent::getPublicSocket() { sockaddr* Agent::getPublicSocket() {
return publicSocket; return publicSocket;
} }

View file

@ -38,11 +38,11 @@ public:
uint16_t getAgentId(); uint16_t getAgentId();
void setAgentId(uint16_t thisAgentId); void setAgentId(uint16_t thisAgentId);
double getFirstRecvTimeUsecs(); double getWakeMicrostamp() const { return _wakeMicrostamp; }
void setFirstRecvTimeUsecs(double newTimeUsecs); void setWakeMicrostamp(double wakeMicrostamp) { _wakeMicrostamp = wakeMicrostamp; }
double getLastRecvTimeUsecs(); double getLastHeardMicrostamp() const { return _lastHeardMicrostamp; }
void setLastRecvTimeUsecs(double newTimeUsecs); void setLastHeardMicrostamp(double lastHeardMicrostamp) { _lastHeardMicrostamp = lastHeardMicrostamp; }
sockaddr* getPublicSocket(); sockaddr* getPublicSocket();
void setPublicSocket(sockaddr *newSocket); void setPublicSocket(sockaddr *newSocket);
@ -70,8 +70,8 @@ private:
sockaddr *publicSocket, *localSocket, *activeSocket; sockaddr *publicSocket, *localSocket, *activeSocket;
char type; char type;
uint16_t agentId; uint16_t agentId;
double firstRecvTimeUsecs; double _wakeMicrostamp;
double lastRecvTimeUsecs; double _lastHeardMicrostamp;
SimpleMovingAverage* _bytesReceivedMovingAverage; SimpleMovingAverage* _bytesReceivedMovingAverage;
AgentData* linkedData; AgentData* linkedData;
bool _isAlive; bool _isAlive;

View file

@ -64,8 +64,7 @@ AgentList::AgentList(char newOwnerType, unsigned int newSocketListenPort) :
agentSocket(newSocketListenPort), agentSocket(newSocketListenPort),
ownerType(newOwnerType), ownerType(newOwnerType),
socketListenPort(newSocketListenPort), socketListenPort(newSocketListenPort),
lastAgentId(0) lastAgentId(0) {
{
pthread_mutex_init(&mutex, 0); pthread_mutex_init(&mutex, 0);
} }
@ -114,7 +113,7 @@ void AgentList::processBulkAgentData(sockaddr *senderAddress, unsigned char *pac
Agent* bulkSendAgent = agentWithAddress(senderAddress); Agent* bulkSendAgent = agentWithAddress(senderAddress);
if (bulkSendAgent) { if (bulkSendAgent) {
bulkSendAgent->setLastRecvTimeUsecs(usecTimestampNow()); bulkSendAgent->setLastHeardMicrostamp(usecTimestampNow());
bulkSendAgent->recordBytesReceived(numTotalBytes); bulkSendAgent->recordBytesReceived(numTotalBytes);
} }
@ -161,7 +160,7 @@ int AgentList::updateAgentWithData(sockaddr *senderAddress, unsigned char *packe
} }
int AgentList::updateAgentWithData(Agent *agent, unsigned char *packetData, int dataBytes) { int AgentList::updateAgentWithData(Agent *agent, unsigned char *packetData, int dataBytes) {
agent->setLastRecvTimeUsecs(usecTimestampNow()); agent->setLastHeardMicrostamp(usecTimestampNow());
if (agent->getActiveSocket() != NULL) { if (agent->getActiveSocket() != NULL) {
agent->recordBytesReceived(dataBytes); agent->recordBytesReceived(dataBytes);
@ -273,7 +272,7 @@ bool AgentList::addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket,
if (agent->getType() == AGENT_TYPE_AUDIO_MIXER || agent->getType() == AGENT_TYPE_VOXEL) { if (agent->getType() == AGENT_TYPE_AUDIO_MIXER || agent->getType() == AGENT_TYPE_VOXEL) {
// until the Audio class also uses our agentList, we need to update // until the Audio class also uses our agentList, we need to update
// the lastRecvTimeUsecs for the audio mixer so it doesn't get killed and re-added continously // the lastRecvTimeUsecs for the audio mixer so it doesn't get killed and re-added continously
agent->setLastRecvTimeUsecs(usecTimestampNow()); agent->setLastHeardMicrostamp(usecTimestampNow());
} }
// we had this agent already, do nothing for now // we had this agent already, do nothing for now
@ -383,7 +382,7 @@ void *removeSilentAgents(void *args) {
for(AgentList::iterator agent = agentList->begin(); agent != agentList->end(); ++agent) { for(AgentList::iterator agent = agentList->begin(); agent != agentList->end(); ++agent) {
if ((checkTimeUSecs - agent->getLastRecvTimeUsecs()) > AGENT_SILENCE_THRESHOLD_USECS if ((checkTimeUSecs - agent->getLastHeardMicrostamp()) > AGENT_SILENCE_THRESHOLD_USECS
&& agent->getType() != AGENT_TYPE_VOXEL) { && agent->getType() != AGENT_TYPE_VOXEL) {
printLog("Killing agent - "); printLog("Killing agent - ");
@ -418,13 +417,6 @@ void *checkInWithDomainServer(void *args) {
const int DOMAIN_SERVER_CHECK_IN_USECS = 1 * 1000000; const int DOMAIN_SERVER_CHECK_IN_USECS = 1 * 1000000;
AgentList* parentAgentList = (AgentList*) args;
timeval lastSend;
unsigned char output[7];
in_addr_t localAddress = getLocalAddress();
// Lookup the IP address of the domain server if we need to // Lookup the IP address of the domain server if we need to
if (atoi(DOMAIN_IP) == 0) { if (atoi(DOMAIN_IP) == 0) {
struct hostent* pHostInfo; struct hostent* pHostInfo;
@ -439,14 +431,23 @@ void *checkInWithDomainServer(void *args) {
} }
} else printLog("Using static domainserver IP: %s\n", DOMAIN_IP); } else printLog("Using static domainserver IP: %s\n", DOMAIN_IP);
AgentList* parentAgentList = (AgentList*) args;
timeval lastSend;
in_addr_t localAddress = getLocalAddress();
unsigned char packet[8];
packet[0] = PACKET_HEADER_DOMAIN_RFD;
packet[1] = parentAgentList->getOwnerType();
while (!domainServerCheckinStopFlag) { while (!domainServerCheckinStopFlag) {
gettimeofday(&lastSend, NULL); gettimeofday(&lastSend, NULL);
output[0] = parentAgentList->getOwnerType(); packSocket(packet + 2, localAddress, htons(parentAgentList->getSocketListenPort()));
packSocket(output + 1, localAddress, htons(parentAgentList->getSocketListenPort()));
parentAgentList->getAgentSocket().send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7); parentAgentList->getAgentSocket().send(DOMAIN_IP, DOMAINSERVER_PORT, packet, sizeof(packet));
packet[0] = PACKET_HEADER_DOMAIN_LIST_REQUEST;
double usecToSleep = DOMAIN_SERVER_CHECK_IN_USECS - (usecTimestampNow() - usecTimestamp(&lastSend)); double usecToSleep = DOMAIN_SERVER_CHECK_IN_USECS - (usecTimestampNow() - usecTimestamp(&lastSend));
@ -481,7 +482,8 @@ AgentList::iterator AgentList::begin() const {
} }
} }
return AgentListIterator(this, 0); // there's no alive agent to start from - return the end
return end();
} }
AgentList::iterator AgentList::end() const { AgentList::iterator AgentList::end() const {

View file

@ -1,3 +1,4 @@
// //
// PacketHeaders.h // PacketHeaders.h
// hifi // hifi
@ -23,5 +24,7 @@ const char PACKET_HEADER_ERASE_VOXEL = 'E';
const char PACKET_HEADER_VOXEL_DATA = 'V'; const char PACKET_HEADER_VOXEL_DATA = 'V';
const char PACKET_HEADER_BULK_AVATAR_DATA = 'X'; const char PACKET_HEADER_BULK_AVATAR_DATA = 'X';
const char PACKET_HEADER_TRANSMITTER_DATA = 't'; const char PACKET_HEADER_TRANSMITTER_DATA = 't';
const char PACKET_HEADER_DOMAIN_LIST_REQUEST = 'L';
const char PACKET_HEADER_DOMAIN_RFD = 'C';
#endif #endif