add SimpleMovingAverage to each Agent, collect bytes received stats

This commit is contained in:
Stephen Birarda 2013-04-18 15:02:26 -07:00
parent b9a1faf284
commit 588fc67cb9
5 changed files with 56 additions and 4 deletions

View file

@ -1269,7 +1269,7 @@ void *networkReceive(void *args)
case PACKET_HEADER_ERASE_VOXEL: case PACKET_HEADER_ERASE_VOXEL:
voxels.parseData(incomingPacket, bytesReceived); voxels.parseData(incomingPacket, bytesReceived);
break; break;
case PACKET_HEADER_BULK_AVATAR_DATA: case PACKET_HEADER_BULK_AVATAR_DATA:
AgentList::getInstance()->processBulkAgentData(&senderAddress, AgentList::getInstance()->processBulkAgentData(&senderAddress,
incomingPacket, incomingPacket,
bytesReceived, bytesReceived,

View file

@ -37,6 +37,7 @@ Agent::Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agent
activeSocket = NULL; activeSocket = NULL;
linkedData = NULL; linkedData = NULL;
_movingAverage = NULL;
deleteMutex = new pthread_mutex_t; deleteMutex = new pthread_mutex_t;
pthread_mutex_init(deleteMutex, NULL); pthread_mutex_init(deleteMutex, NULL);
@ -69,6 +70,13 @@ Agent::Agent(const Agent &otherAgent) {
linkedData = NULL; linkedData = NULL;
} }
if (otherAgent._movingAverage != NULL) {
_movingAverage = new SimpleMovingAverage(100);
memcpy(_movingAverage, otherAgent._movingAverage, sizeof(SimpleMovingAverage));
} else {
_movingAverage = NULL;
}
deleteMutex = new pthread_mutex_t; deleteMutex = new pthread_mutex_t;
pthread_mutex_init(deleteMutex, NULL); pthread_mutex_init(deleteMutex, NULL);
} }
@ -89,6 +97,7 @@ void Agent::swap(Agent &first, Agent &second) {
swap(first.agentId, second.agentId); swap(first.agentId, second.agentId);
swap(first.firstRecvTimeUsecs, second.firstRecvTimeUsecs); swap(first.firstRecvTimeUsecs, second.firstRecvTimeUsecs);
swap(first.lastRecvTimeUsecs, second.lastRecvTimeUsecs); swap(first.lastRecvTimeUsecs, second.lastRecvTimeUsecs);
swap(first._movingAverage, second._movingAverage);
swap(first.deleteMutex, second.deleteMutex); swap(first.deleteMutex, second.deleteMutex);
} }
@ -99,6 +108,7 @@ Agent::~Agent() {
delete publicSocket; delete publicSocket;
delete localSocket; delete localSocket;
delete linkedData; delete linkedData;
delete _movingAverage;
} }
char Agent::getType() const { char Agent::getType() const {
@ -211,6 +221,31 @@ bool Agent::matches(sockaddr *otherPublicSocket, sockaddr *otherLocalSocket, cha
&& socketMatch(localSocket, otherLocalSocket); && socketMatch(localSocket, otherLocalSocket);
} }
void Agent::recordBytesReceived(int bytesReceived) {
if (_movingAverage == NULL) {
printf("Setting up the moving average for agent\n");
_movingAverage = new SimpleMovingAverage(100);
}
_movingAverage->updateAverage((float) bytesReceived);
}
float Agent::getAveragePacketsPerSecond() {
if (_movingAverage != NULL) {
return (1 / _movingAverage->getEventDeltaAverage());
} else {
return 0;
}
}
float Agent::getAverageKilobitsPerSecond() {
if (_movingAverage != NULL) {
return (_movingAverage->getAverageSampleValuePerSecond() * (8.0f / 1000));
} else {
return 0;
}
}
void Agent::printLog(Agent const& agent) { void Agent::printLog(Agent const& agent) {
sockaddr_in *agentPublicSocket = (sockaddr_in *) agent.publicSocket; sockaddr_in *agentPublicSocket = (sockaddr_in *) agent.publicSocket;

View file

@ -11,7 +11,6 @@
#include <stdint.h> #include <stdint.h>
#include <ostream> #include <ostream>
#include "AgentData.h"
#ifdef _WIN32 #ifdef _WIN32
#include "Syssocket.h" #include "Syssocket.h"
@ -19,6 +18,9 @@
#include <sys/socket.h> #include <sys/socket.h>
#endif #endif
#include "SimpleMovingAverage.h"
#include "AgentData.h"
class Agent { class Agent {
public: public:
Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agentType, uint16_t thisAgentId); Agent(sockaddr *agentPublicSocket, sockaddr *agentLocalSocket, char agentType, uint16_t thisAgentId);
@ -34,32 +36,45 @@ public:
char getType() const; char getType() const;
const char* getTypeName() const; const char* getTypeName() const;
void setType(char newType); void setType(char newType);
uint16_t getAgentId(); uint16_t getAgentId();
void setAgentId(uint16_t thisAgentId); void setAgentId(uint16_t thisAgentId);
double getFirstRecvTimeUsecs(); double getFirstRecvTimeUsecs();
void setFirstRecvTimeUsecs(double newTimeUsecs); void setFirstRecvTimeUsecs(double newTimeUsecs);
double getLastRecvTimeUsecs(); double getLastRecvTimeUsecs();
void setLastRecvTimeUsecs(double newTimeUsecs); void setLastRecvTimeUsecs(double newTimeUsecs);
sockaddr* getPublicSocket(); sockaddr* getPublicSocket();
void setPublicSocket(sockaddr *newSocket); void setPublicSocket(sockaddr *newSocket);
sockaddr* getLocalSocket(); sockaddr* getLocalSocket();
void setLocalSocket(sockaddr *newSocket); void setLocalSocket(sockaddr *newSocket);
sockaddr* getActiveSocket(); sockaddr* getActiveSocket();
void activatePublicSocket(); void activatePublicSocket();
void activateLocalSocket(); void activateLocalSocket();
AgentData* getLinkedData(); AgentData* getLinkedData();
void setLinkedData(AgentData *newData); void setLinkedData(AgentData *newData);
void recordBytesReceived(int bytesReceived);
float getAverageKilobitsPerSecond();
float getAveragePacketsPerSecond();
static void printLog(Agent const&); static void printLog(Agent const&);
friend std::ostream& operator<<(std::ostream& os, const Agent* agent); friend std::ostream& operator<<(std::ostream& os, const Agent* agent);
private: private:
void swap(Agent &first, Agent &second); void swap(Agent &first, Agent &second);
sockaddr *publicSocket, *localSocket, *activeSocket; sockaddr *publicSocket, *localSocket, *activeSocket;
char type; char type;
uint16_t agentId; uint16_t agentId;
double firstRecvTimeUsecs; double firstRecvTimeUsecs;
double lastRecvTimeUsecs; double lastRecvTimeUsecs;
AgentData *linkedData; SimpleMovingAverage* _movingAverage;
AgentData* linkedData;
}; };
std::ostream& operator<<(std::ostream& os, const Agent* agent); std::ostream& operator<<(std::ostream& os, const Agent* agent);

View file

@ -108,6 +108,7 @@ void AgentList::processBulkAgentData(sockaddr *senderAddress, unsigned char *pac
if (bulkSendAgentIndex >= 0) { if (bulkSendAgentIndex >= 0) {
Agent *bulkSendAgent = &agents[bulkSendAgentIndex]; Agent *bulkSendAgent = &agents[bulkSendAgentIndex];
bulkSendAgent->setLastRecvTimeUsecs(usecTimestampNow()); bulkSendAgent->setLastRecvTimeUsecs(usecTimestampNow());
bulkSendAgent->recordBytesReceived(numTotalBytes);
} }
unsigned char *startPosition = packetData; unsigned char *startPosition = packetData;
@ -144,6 +145,7 @@ void AgentList::updateAgentWithData(sockaddr *senderAddress, unsigned char *pack
void AgentList::updateAgentWithData(Agent *agent, unsigned char *packetData, int dataBytes) { void AgentList::updateAgentWithData(Agent *agent, unsigned char *packetData, int dataBytes) {
agent->setLastRecvTimeUsecs(usecTimestampNow()); agent->setLastRecvTimeUsecs(usecTimestampNow());
agent->recordBytesReceived(dataBytes);
if (agent->getLinkedData() == NULL) { if (agent->getLinkedData() == NULL) {
if (linkedDataCreateCallback != NULL) { if (linkedDataCreateCallback != NULL) {

View file

@ -3,7 +3,7 @@
// hifi // hifi
// //
// Created by Stephen Birarda on 4/18/13. // Created by Stephen Birarda on 4/18/13.
// Based heavily on Brad Hefta-Gaub's CounterStats class (RIP) // Replaces Brad Hefta-Gaub's CounterStats class (RIP)
// //
// //