add method to parse bulk agent data from a mixer

This commit is contained in:
Stephen Birarda 2013-04-10 11:57:24 -07:00
parent 98db359db3
commit cf1f087b38
3 changed files with 73 additions and 50 deletions

View file

@ -1050,9 +1050,7 @@ void key(unsigned char k, int x, int y)
if (k == '.') addRandomSphere(wantColorRandomizer);
}
//
// Receive packets from other agents/servers and decide what to do with them!
//
void *networkReceive(void *args)
{
sockaddr senderAddress;
@ -1064,19 +1062,26 @@ void *networkReceive(void *args)
packetCount++;
bytesCount += bytesReceived;
if (incomingPacket[0] == PACKET_HEADER_TRANSMITTER_DATA) {
// Pass everything but transmitter data to the agent list
myHead.hand->processTransmitterData(incomingPacket, bytesReceived);
} else if (incomingPacket[0] == PACKET_HEADER_VOXEL_DATA ||
incomingPacket[0] == PACKET_HEADER_Z_COMMAND ||
incomingPacket[0] == PACKET_HEADER_ERASE_VOXEL) {
voxels.parseData(incomingPacket, bytesReceived);
} else {
agentList.processAgentData(&senderAddress, incomingPacket, bytesReceived);
switch (incomingPacket[0]) {
case PACKET_HEADER_TRANSMITTER_DATA:
myHead.hand->processTransmitterData(incomingPacket, bytesReceived);
break;
case PACKET_HEADER_VOXEL_DATA:
case PACKET_HEADER_Z_COMMAND:
case PACKET_HEADER_ERASE_VOXEL:
voxels.parseData(incomingPacket, bytesReceived);
break;
case PACKET_HEADER_HEAD_DATA:
agentList.processBulkAgentData(&senderAddress, incomingPacket, bytesReceived, sizeof(float) * 11);
break;
default:
agentList.processAgentData(&senderAddress, incomingPacket, bytesReceived);
break;
}
}
}
delete[] incomingPacket;
pthread_exit(0);
return NULL;
}

View file

@ -74,10 +74,6 @@ void AgentList::processAgentData(sockaddr *senderAddress, void *packetData, size
updateList((unsigned char *)packetData, dataBytes);
break;
}
case PACKET_HEADER_HEAD_DATA: {
updateDataForAllAgents(senderAddress, (unsigned char *)packetData, dataBytes);
break;
}
case PACKET_HEADER_PING: {
agentSocket.send(senderAddress, &PACKET_HEADER_PING_REPLY, 1);
break;
@ -89,33 +85,36 @@ void AgentList::processAgentData(sockaddr *senderAddress, void *packetData, size
}
}
void AgentList::updateDataForAllAgents(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes) {
void AgentList::processBulkAgentData(sockaddr *senderAddress, void *packetData, int numTotalBytes, int numBytesPerAgent) {
// find the avatar mixer in our agent list and update the lastRecvTime from it
int avatarMixerIndex = indexOfMatchingAgent(senderAddress);
Agent *avatarMixerAgent = &agents[avatarMixerIndex];
avatarMixerAgent->setLastRecvTimeUsecs(usecTimestampNow());
int bulkSendAgentIndex = indexOfMatchingAgent(senderAddress);
int bytesForEachAgent = sizeof(float) * 11;
unsigned char *currentPosition = packetData + 1;
unsigned char *startPosition = packetData;
unsigned char *packetHolder = new unsigned char[bytesForEachAgent + 1];
if (bulkSendAgentIndex >= 0) {
Agent *bulkSendAgent = &agents[bulkSendAgentIndex];
bulkSendAgent->setLastRecvTimeUsecs(usecTimestampNow());
}
unsigned char *startPosition = (unsigned char *)packetData;
unsigned char *currentPosition = startPosition + 1;
unsigned char *packetHolder = new unsigned char[numBytesPerAgent + 1];
packetHolder[0] = PACKET_HEADER_HEAD_DATA;
uint16_t agentId;
uint16_t agentID = -1;
sockaddr_in *agentActiveSocket = new sockaddr_in;
agentActiveSocket->sin_family = AF_INET;
while ((currentPosition - startPosition) < dataBytes) {
currentPosition += unpackAgentId(currentPosition, &agentId);
currentPosition += unpackSocket(currentPosition, (sockaddr *)&agentActiveSocket);
memcpy(packetHolder + 1, currentPosition, bytesForEachAgent);
while ((currentPosition - startPosition) < numTotalBytes) {
currentPosition += unpackAgentId(currentPosition, &agentID);
memcpy(packetHolder + 1, currentPosition, numBytesPerAgent);
updateAgentWithData((sockaddr *)agentActiveSocket, packetHolder, bytesForEachAgent + 1);
currentPosition += bytesForEachAgent;
int matchingAgentIndex = indexOfMatchingAgent(agentID);
if (matchingAgentIndex >= 0) {
updateAgentWithData(&agents[matchingAgentIndex], packetHolder, numBytesPerAgent + 1);
}
currentPosition += numBytesPerAgent;
}
delete[] packetHolder;
}
void AgentList::updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes) {
@ -123,19 +122,20 @@ void AgentList::updateAgentWithData(sockaddr *senderAddress, void *packetData, s
int agentIndex = indexOfMatchingAgent(senderAddress);
if (agentIndex != -1) {
Agent *matchingAgent = &agents[agentIndex];
matchingAgent->setLastRecvTimeUsecs(usecTimestampNow());
if (matchingAgent->getLinkedData() == NULL) {
if (linkedDataCreateCallback != NULL) {
linkedDataCreateCallback(matchingAgent);
}
}
matchingAgent->getLinkedData()->parseData(packetData, dataBytes);
updateAgentWithData(&agents[agentIndex], packetData, dataBytes);
}
}
void AgentList::updateAgentWithData(Agent *agent, void *packetData, int dataBytes) {
agent->setLastRecvTimeUsecs(usecTimestampNow());
if (agent->getLinkedData() == NULL) {
if (linkedDataCreateCallback != NULL) {
linkedDataCreateCallback(agent);
}
}
agent->getLinkedData()->parseData(packetData, dataBytes);
}
int AgentList::indexOfMatchingAgent(sockaddr *senderAddress) {
@ -148,6 +148,16 @@ int AgentList::indexOfMatchingAgent(sockaddr *senderAddress) {
return -1;
}
int AgentList::indexOfMatchingAgent(uint16_t agentID) {
for(std::vector<Agent>::iterator agent = agents.begin(); agent != agents.end(); agent++) {
if (agent->getActiveSocket() != NULL && agent->getAgentId() == agentID) {
return agent - agents.begin();
}
}
return -1;
}
uint16_t AgentList::getLastAgentId() {
return lastAgentId;
}

View file

@ -49,14 +49,22 @@ public:
std::vector<Agent>& getAgents();
UDPSocket& getAgentSocket();
int updateList(unsigned char *packetData, size_t dataBytes);
int indexOfMatchingAgent(sockaddr *senderAddress);
uint16_t getLastAgentId();
void increaseAgentId();
int updateList(unsigned char *packetData, size_t dataBytes);
int indexOfMatchingAgent(sockaddr *senderAddress);
int indexOfMatchingAgent(uint16_t agentID);
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType, uint16_t agentId);
void processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void updateDataForAllAgents(sockaddr *senderAddress, unsigned char *packetData, size_t dataBytes);
void processBulkAgentData(sockaddr *senderAddress, void *packetData, int numTotalBytes, int numBytesPerAgent);
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
void updateAgentWithData(Agent *agent, void *packetData, int dataBytes);
void broadcastToAgents(char *broadcastData, size_t dataBytes, const char* agentTypes);
void pingAgents();
char getOwnerType();
@ -69,7 +77,7 @@ public:
};
int unpackAgentId(unsigned char *packedData, uint16_t *agentId);
int packAgentId(unsigned char *packStore, uint16_t agentId);
int unpackAgentId(char *packedData, uint16_t *agentId);
int packAgentId(char *packStore, uint16_t agentId);
#endif /* defined(__hifi__AgentList__) */