Merge pull request #547 from birarda/injector

fix the injector loop and DS comm with EC2 agents
This commit is contained in:
ZappoMan 2013-06-18 13:48:46 -07:00
commit 232c29ace4
2 changed files with 68 additions and 111 deletions

View file

@ -65,8 +65,8 @@ int main(int argc, const char * argv[])
// with the EC2 IP. Otherwise, we will replace the IP like we used to
// this allows developers to run a local domain without recompiling the
// domain server
bool useLocal = cmdOptionExists(argc, argv, "--local");
if (useLocal) {
bool isLocalMode = cmdOptionExists(argc, argv, "--local");
if (isLocalMode) {
printf("NOTE: Running in Local Mode!\n");
} else {
printf("--------------------------------------------------\n");
@ -103,14 +103,17 @@ int main(int argc, const char * argv[])
int numBytesSocket = unpackSocket(packetData + sizeof(PACKET_HEADER) + sizeof(AGENT_TYPE),
(sockaddr*) &agentLocalAddress);
sockaddr* destinationSocket = (sockaddr*) &agentPublicAddress;
// check the agent public address
// if it matches our local address we're on the same box
// so hardcode the EC2 public address for now
if (agentPublicAddress.sin_addr.s_addr == serverLocalAddress) {
// If we're not running "local" then we do replace the IP
// with the EC2 IP. Otherwise, we use our normal public IP
if (!useLocal) {
if (!isLocalMode) {
agentPublicAddress.sin_addr.s_addr = 895283510; // local IP in this format...
destinationSocket = (sockaddr*) &agentLocalAddress;
}
}
@ -178,9 +181,9 @@ int main(int argc, const char * argv[])
currentBufferPos += packAgentId(currentBufferPos, newAgent->getAgentID());
// send the constructed list back to this agent
agentList->getAgentSocket()->send((sockaddr*) &agentPublicAddress,
broadcastPacket,
(currentBufferPos - startPointer) + 1);
agentList->getAgentSocket()->send(destinationSocket,
broadcastPacket,
(currentBufferPos - startPointer) + 1);
}
}

View file

@ -35,11 +35,12 @@ enum {
};
// Command line parameter defaults
bool loopAudio = true;
bool shouldLoopAudio = true;
bool hasInjectedAudioOnce = false;
float sleepIntervalMin = 1.00;
float sleepIntervalMax = 2.00;
char *sourceAudioFile = NULL;
const char *allowedParameters = ":sb::t::c::a::f::d::r:";
const char *allowedParameters = ":sc::a::f::t::r:";
float floatArguments[4] = {0.0f, 0.0f, 0.0f, 0.0f};
unsigned char volume = DEFAULT_INJECTOR_VOLUME;
float triggerDistance = 0.0f;
@ -47,13 +48,11 @@ float radius = 0.0f;
void usage(void) {
std::cout << "High Fidelity - Interface audio injector" << std::endl;
std::cout << " -s Random sleep mode. If not specified will default to constant loop." << std::endl;
std::cout << " -b FLOAT Min. number of seconds to sleep. Only valid in random sleep mode. Default 1.0" << std::endl;
std::cout << " -t FLOAT Max. number of seconds to sleep. Only valid in random sleep mode. Default 2.0" << std::endl;
std::cout << " -s Single play mode. If not specified will default to constant loop." << std::endl;
std::cout << " -c FLOAT,FLOAT,FLOAT,FLOAT X,Y,Z,YAW position in universe where audio will be originating from and direction. Defaults to 0,0,0,0" << std::endl;
std::cout << " -a 0-255 Attenuation curve modifier, defaults to 255" << std::endl;
std::cout << " -f FILENAME Name of audio source file. Required - RAW format, 22050hz 16bit signed mono" << std::endl;
std::cout << " -d FLOAT Trigger distance for injection. If not specified will loop constantly" << std::endl;
std::cout << " -t FLOAT Trigger distance for injection. If not specified will loop constantly" << std::endl;
std::cout << " -r FLOAT Radius for spherical source. If not specified injected audio is point source" << std::endl;
}
@ -62,16 +61,8 @@ bool processParameters(int parameterCount, char* parameterData[]) {
while ((p = getopt(parameterCount, parameterData, allowedParameters)) != -1) {
switch (p) {
case 's':
::loopAudio = false;
std::cout << "[DEBUG] Random sleep mode enabled" << std::endl;
break;
case 'b':
::sleepIntervalMin = atof(optarg);
std::cout << "[DEBUG] Min delay between plays " << sleepIntervalMin << "sec" << std::endl;
break;
case 't':
::sleepIntervalMax = atof(optarg);
std::cout << "[DEBUG] Max delay between plays " << sleepIntervalMax << "sec" << std::endl;
::shouldLoopAudio = false;
std::cout << "[DEBUG] Single play mode enabled" << std::endl;
break;
case 'f':
::sourceAudioFile = optarg;
@ -97,7 +88,7 @@ bool processParameters(int parameterCount, char* parameterData[]) {
::volume = atoi(optarg);
std::cout << "[DEBUG] Attenuation modifier: " << optarg << std::endl;
break;
case 'd':
case 't':
::triggerDistance = atof(optarg);
std::cout << "[DEBUG] Trigger distance: " << optarg << std::endl;
break;
@ -113,35 +104,6 @@ bool processParameters(int parameterCount, char* parameterData[]) {
return true;
};
bool stopReceiveAgentDataThread;
void *receiveAgentData(void *args) {
sockaddr senderAddress;
ssize_t bytesReceived;
unsigned char incomingPacket[MAX_PACKET_SIZE];
AgentList* agentList = AgentList::getInstance();
while (!::stopReceiveAgentDataThread) {
if (agentList->getAgentSocket()->receive(&senderAddress, incomingPacket, &bytesReceived)) {
switch (incomingPacket[0]) {
case PACKET_HEADER_BULK_AVATAR_DATA:
// this is the positional data for other agents
// pass that off to the agentList processBulkAgentData method
agentList->processBulkAgentData(&senderAddress, incomingPacket, bytesReceived);
break;
default:
// have the agentList handle list of agents from DS, replies from other agents, etc.
agentList->processAgentData(&senderAddress, incomingPacket, bytesReceived);
break;
}
}
}
pthread_exit(0);
return NULL;
}
void createAvatarDataForAgent(Agent* agent) {
if (!agent->getLinkedData()) {
agent->setLinkedData(new AvatarData(agent));
@ -164,9 +126,6 @@ int main(int argc, char* argv[]) {
// create an AgentList instance to handle communication with other agents
AgentList* agentList = AgentList::createInstance(AGENT_TYPE_AUDIO_INJECTOR, AUDIO_UDP_SEND_PORT);
pthread_t receiveAgentDataThread;
pthread_create(&receiveAgentDataThread, NULL, receiveAgentData, NULL);
// start the agent list thread that will kill off agents when they stop talking
agentList->startSilentAgentRemovalThread();
@ -183,42 +142,47 @@ int main(int argc, char* argv[]) {
// register the callback for agent data creation
agentList->linkedDataCreateCallback = createAvatarDataForAgent;
unsigned char broadcastPacket = PACKET_HEADER_INJECT_AUDIO;
timeval thisSend;
long long numMicrosecondsSleep = 0;
timeval lastSend = {};
unsigned char broadcastPacket = PACKET_HEADER_INJECT_AUDIO;
timeval lastDomainServerCheckIn = {};
// the audio injector needs to know about the avatar mixer and the audio mixer
const char INJECTOR_AGENTS_OF_INTEREST[] = {AGENT_TYPE_AVATAR_MIXER, AGENT_TYPE_AUDIO_MIXER};
AgentList::getInstance()->setAgentTypesOfInterest(INJECTOR_AGENTS_OF_INTEREST, sizeof(INJECTOR_AGENTS_OF_INTEREST));
sockaddr senderAddress;
ssize_t bytesReceived;
unsigned char incomingPacket[MAX_PACKET_SIZE];
while (true) {
// the audio injector needs to know about the avatar mixer and the audio mixer
const char INJECTOR_AGENTS_OF_INTEREST[] = {AGENT_TYPE_AUDIO_MIXER, AGENT_TYPE_AVATAR_MIXER};
int bytesAgentsOfInterest = (::triggerDistance > 0)
? sizeof(INJECTOR_AGENTS_OF_INTEREST)
: sizeof(INJECTOR_AGENTS_OF_INTEREST) - 1;
AgentList::getInstance()->setAgentTypesOfInterest(INJECTOR_AGENTS_OF_INTEREST, bytesAgentsOfInterest);
while (true) {
// send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed
if (usecTimestampNow() - usecTimestamp(&lastDomainServerCheckIn) >= DOMAIN_SERVER_CHECK_IN_USECS) {
gettimeofday(&lastDomainServerCheckIn, NULL);
AgentList::getInstance()->sendDomainServerCheckIn();
}
if (::triggerDistance) {
// update the thisSend timeval to the current time
gettimeofday(&thisSend, NULL);
// find the current avatar mixer
Agent* avatarMixer = agentList->soloAgentOfType(AGENT_TYPE_AVATAR_MIXER);
// make sure we actually have an avatar mixer with an active socket
if (avatarMixer && avatarMixer->getActiveSocket() != NULL) {
// use the UDPSocket instance attached to our agent list to ask avatar mixer for a list of avatars
agentList->getAgentSocket()->send(avatarMixer->getActiveSocket(),
&broadcastPacket,
sizeof(broadcastPacket));
while (agentList->getAgentSocket()->receive(&senderAddress, incomingPacket, &bytesReceived)) {
switch (incomingPacket[0]) {
case PACKET_HEADER_BULK_AVATAR_DATA:
// this is the positional data for other agents
// pass that off to the agentList processBulkAgentData method
agentList->processBulkAgentData(&senderAddress, incomingPacket, bytesReceived);
break;
default:
// have the agentList handle list of agents from DS, replies from other agents, etc.
agentList->processAgentData(&senderAddress, incomingPacket, bytesReceived);
break;
}
}
if (::triggerDistance) {
if (!injector.isInjectingAudio()) {
// enumerate the other agents to decide if one is close enough that we should inject
for (AgentList::iterator agent = agentList->begin(); agent != agentList->end(); agent++) {
@ -226,49 +190,39 @@ int main(int argc, char* argv[]) {
if (avatarData) {
glm::vec3 tempVector = injector.getPosition() - avatarData->getPosition();
float squareDistance = glm::dot(tempVector, tempVector);
if (squareDistance <= ::triggerDistance) {
// look for an audio mixer in our agent list
Agent* audioMixer = AgentList::getInstance()->soloAgentOfType(AGENT_TYPE_AUDIO_MIXER);
if (audioMixer) {
// we have an active audio mixer we can send data to
AudioInjectionManager::threadInjector(&injector);
}
if (glm::dot(tempVector, tempVector) <= ::triggerDistance) {
// use the AudioInjectionManager to thread this injector
AudioInjectionManager::threadInjector(&injector);
}
}
}
}
// sleep for the correct amount of time to have data send be consistently timed
if ((numMicrosecondsSleep = (AVATAR_MIXER_DATA_SEND_INTERVAL_MSECS * 1000) -
(usecTimestampNow() - usecTimestamp(&thisSend))) > 0) {
usleep(numMicrosecondsSleep);
// find the current avatar mixer
Agent* avatarMixer = agentList->soloAgentOfType(AGENT_TYPE_AVATAR_MIXER);
// make sure we actually have an avatar mixer with an active socket
if (avatarMixer && avatarMixer->getActiveSocket() != NULL
&& (usecTimestampNow() - usecTimestamp(&lastSend) > AVATAR_MIXER_DATA_SEND_INTERVAL_MSECS)) {
// update the lastSend timeval to the current time
gettimeofday(&lastSend, NULL);
// use the UDPSocket instance attached to our agent list to ask avatar mixer for a list of avatars
agentList->getAgentSocket()->send(avatarMixer->getActiveSocket(),
&broadcastPacket,
sizeof(broadcastPacket));
}
} else {
// look for an audio mixer in our agent list
Agent* audioMixer = AgentList::getInstance()->soloAgentOfType(AGENT_TYPE_AUDIO_MIXER);
if (audioMixer) {
injector.injectAudio(agentList->getAgentSocket(), audioMixer->getActiveSocket());
if (!injector.isInjectingAudio() && (::shouldLoopAudio || !::hasInjectedAudioOnce)) {
// use the AudioInjectionManager to thread this injector
AudioInjectionManager::threadInjector(&injector);
::hasInjectedAudioOnce = true;
}
float delay = 0;
int usecDelay = 0;
if (!::loopAudio) {
delay = randFloatInRange(::sleepIntervalMin, ::sleepIntervalMax);
usecDelay = delay * 1000 * 1000;
usleep(usecDelay);
}
}
}
// stop the receive agent data thread
stopReceiveAgentDataThread = true;
pthread_join(receiveAgentDataThread, NULL);
// stop the agent list's threads
agentList->stopSilentAgentRemovalThread();
}