mirror of
https://github.com/JulianGro/overte.git
synced 2025-04-13 23:46:29 +02:00
refactor mixer to use AgentList, AudioRingBuffer as AgentData
This commit is contained in:
parent
e2c3c253a8
commit
abfd3a0168
10 changed files with 222 additions and 241 deletions
|
@ -18,18 +18,12 @@
|
|||
|
||||
Oscilloscope * scope;
|
||||
|
||||
const short BUFFER_LENGTH_BYTES = 1024;
|
||||
const short BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t);
|
||||
|
||||
const short PACKET_LENGTH_BYTES = 1024;
|
||||
const short PACKET_LENGTH_SAMPLES = PACKET_LENGTH_BYTES / sizeof(int16_t);
|
||||
|
||||
const int PHASE_DELAY_AT_90 = 20;
|
||||
const float AMPLITUDE_RATIO_AT_90 = 0.5;
|
||||
|
||||
const short RING_BUFFER_FRAMES = 10;
|
||||
const short RING_BUFFER_SIZE_SAMPLES = RING_BUFFER_FRAMES * BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
const int SAMPLE_RATE = 22050;
|
||||
const float JITTER_BUFFER_LENGTH_MSECS = 30.0;
|
||||
const short JITTER_BUFFER_SAMPLES = JITTER_BUFFER_LENGTH_MSECS * (SAMPLE_RATE / 1000.0);
|
||||
|
@ -90,7 +84,7 @@ int audioCallback (const void *inputBuffer,
|
|||
audioMixerSocket.sin_family = AF_INET;
|
||||
audioMixerSocket.sin_addr.s_addr = data->mixerAddress;
|
||||
audioMixerSocket.sin_port = data->mixerPort;
|
||||
data->audioSocket->send((sockaddr *)&audioMixerSocket, (void *)inputLeft, BUFFER_LENGTH_BYTES);
|
||||
data->audioSocket->send((sockaddr *)&audioMixerSocket, (void *)inputLeft, BUFFER_LENGTH_BYTES);
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -134,27 +128,27 @@ int audioCallback (const void *inputBuffer,
|
|||
// if we've been reset, and there isn't any new packets yet
|
||||
// just play some silence
|
||||
|
||||
if (ringBuffer->endOfLastWrite != NULL) {
|
||||
if (ringBuffer->getEndOfLastWrite() != NULL) {
|
||||
|
||||
if (!ringBuffer->started && ringBuffer->diffLastWriteNextOutput() <= PACKET_LENGTH_SAMPLES + JITTER_BUFFER_SAMPLES) {
|
||||
if (!ringBuffer->isStarted() && ringBuffer->diffLastWriteNextOutput() <= PACKET_LENGTH_SAMPLES + JITTER_BUFFER_SAMPLES) {
|
||||
printf("Held back\n");
|
||||
} else if (ringBuffer->diffLastWriteNextOutput() < PACKET_LENGTH_SAMPLES) {
|
||||
ringBuffer->started = false;
|
||||
ringBuffer->setStarted(false);
|
||||
|
||||
starve_counter++;
|
||||
printf("Starved #%d\n", starve_counter);
|
||||
data->wasStarved = 10; // Frames to render the indication that the system was starved.
|
||||
} else {
|
||||
ringBuffer->started = true;
|
||||
ringBuffer->setStarted(true);
|
||||
// play whatever we have in the audio buffer
|
||||
|
||||
// no sample overlap, either a direct copy of the audio data, or a copy with some appended silence
|
||||
memcpy(queueBuffer, ringBuffer->nextOutput, BUFFER_LENGTH_BYTES);
|
||||
memcpy(queueBuffer, ringBuffer->getNextOutput(), BUFFER_LENGTH_BYTES);
|
||||
|
||||
ringBuffer->nextOutput += BUFFER_LENGTH_SAMPLES;
|
||||
ringBuffer->setNextOutput(ringBuffer->getNextOutput() + BUFFER_LENGTH_SAMPLES);
|
||||
|
||||
if (ringBuffer->nextOutput == ringBuffer->buffer + RING_BUFFER_SIZE_SAMPLES) {
|
||||
ringBuffer->nextOutput = ringBuffer->buffer;
|
||||
if (ringBuffer->getNextOutput() == ringBuffer->getBuffer() + RING_BUFFER_SAMPLES) {
|
||||
ringBuffer->setNextOutput(ringBuffer->getBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -204,7 +198,6 @@ void *receiveAudioViaUDP(void *args) {
|
|||
|
||||
while (!stopAudioReceiveThread) {
|
||||
if (sharedAudioData->audioSocket->receive((void *)receivedData, &receivedBytes)) {
|
||||
|
||||
bool firstSample = (currentReceiveTime.tv_sec == 0);
|
||||
|
||||
gettimeofday(¤tReceiveTime, NULL);
|
||||
|
@ -233,28 +226,24 @@ void *receiveAudioViaUDP(void *args) {
|
|||
|
||||
AudioRingBuffer *ringBuffer = sharedAudioData->ringBuffer;
|
||||
|
||||
if (ringBuffer->endOfLastWrite == NULL) {
|
||||
ringBuffer->endOfLastWrite = ringBuffer->buffer;
|
||||
} else if (ringBuffer->diffLastWriteNextOutput() > RING_BUFFER_SIZE_SAMPLES - PACKET_LENGTH_SAMPLES) {
|
||||
std::cout << "NAB: " << ringBuffer->nextOutput - ringBuffer->buffer << "\n";
|
||||
std::cout << "LAW: " << ringBuffer->endOfLastWrite - ringBuffer->buffer << "\n";
|
||||
std::cout << "D: " << ringBuffer->diffLastWriteNextOutput() << "\n";
|
||||
std::cout << "Full\n";
|
||||
if (ringBuffer->getEndOfLastWrite() == NULL) {
|
||||
ringBuffer->setEndOfLastWrite(ringBuffer->getBuffer());
|
||||
} else if (ringBuffer->diffLastWriteNextOutput() > RING_BUFFER_SAMPLES - PACKET_LENGTH_SAMPLES) {
|
||||
|
||||
// reset us to started state
|
||||
ringBuffer->endOfLastWrite = ringBuffer->buffer;
|
||||
ringBuffer->nextOutput = ringBuffer->buffer;
|
||||
ringBuffer->started = false;
|
||||
ringBuffer->setEndOfLastWrite(ringBuffer->getBuffer());
|
||||
ringBuffer->setNextOutput(ringBuffer->getBuffer());
|
||||
ringBuffer->setStarted(false);
|
||||
}
|
||||
|
||||
int16_t *copyToPointer = ringBuffer->endOfLastWrite;
|
||||
int16_t *copyToPointer = ringBuffer->getEndOfLastWrite();
|
||||
|
||||
// just copy the recieved data to the right spot and then add packet length to previous pointer
|
||||
memcpy(copyToPointer, receivedData, PACKET_LENGTH_BYTES);
|
||||
ringBuffer->endOfLastWrite += PACKET_LENGTH_SAMPLES;
|
||||
ringBuffer->setEndOfLastWrite(ringBuffer->getEndOfLastWrite() + PACKET_LENGTH_SAMPLES);
|
||||
|
||||
if (ringBuffer->endOfLastWrite == ringBuffer->buffer + RING_BUFFER_SIZE_SAMPLES) {
|
||||
ringBuffer->endOfLastWrite = ringBuffer->buffer;
|
||||
if (ringBuffer->getEndOfLastWrite() == ringBuffer->getBuffer() + RING_BUFFER_SAMPLES) {
|
||||
ringBuffer->setEndOfLastWrite(ringBuffer->getBuffer());
|
||||
}
|
||||
|
||||
if (LOG_SAMPLE_DELAY) {
|
||||
|
@ -284,7 +273,7 @@ Audio::Audio(Oscilloscope * s)
|
|||
|
||||
// setup a UDPSocket
|
||||
audioData->audioSocket = new UDPSocket(AUDIO_UDP_LISTEN_PORT);
|
||||
audioData->ringBuffer = new AudioRingBuffer(RING_BUFFER_SIZE_SAMPLES);
|
||||
audioData->ringBuffer = new AudioRingBuffer();
|
||||
|
||||
AudioRecThreadStruct threadArgs;
|
||||
threadArgs.sharedAudioData = audioData;
|
||||
|
@ -359,7 +348,7 @@ void Audio::render(int screenWidth, int screenHeight)
|
|||
float timeLeftInCurrentBuffer = 0;
|
||||
if (audioData->lastCallback.tv_usec > 0) timeLeftInCurrentBuffer = diffclock(&audioData->lastCallback, ¤tTime)/(1000.0*(float)BUFFER_LENGTH_SAMPLES/(float)SAMPLE_RATE) * frameWidth;
|
||||
|
||||
if (audioData->ringBuffer->endOfLastWrite != NULL)
|
||||
if (audioData->ringBuffer->getEndOfLastWrite() != NULL)
|
||||
remainingBuffer = audioData->ringBuffer->diffLastWriteNextOutput() / BUFFER_LENGTH_SAMPLES * frameWidth;
|
||||
|
||||
if (audioData->wasStarved == 0) glColor3f(0, 1, 0);
|
||||
|
|
|
@ -64,6 +64,10 @@ Head::~Head() {
|
|||
// all data is primitive, do nothing
|
||||
}
|
||||
|
||||
Head* Head::clone() const {
|
||||
return new Head(*this);
|
||||
}
|
||||
|
||||
void Head::reset()
|
||||
{
|
||||
Pitch = Yaw = Roll = 0;
|
||||
|
|
|
@ -24,6 +24,8 @@ class Head : public AgentData {
|
|||
public:
|
||||
Head();
|
||||
~Head();
|
||||
Head* clone() const;
|
||||
|
||||
void reset();
|
||||
void UpdatePos(float frametime, SerialInterface * serialInterface, int head_mirror, glm::vec3 * gravity);
|
||||
void setNoise (float mag) { noise = mag; }
|
||||
|
|
|
@ -13,24 +13,21 @@
|
|||
#include <fstream>
|
||||
#include <limits>
|
||||
#include "AudioRingBuffer.h"
|
||||
#include "UDPSocket.h"
|
||||
#include <AgentList.h>
|
||||
#include <SharedUtil.h>
|
||||
|
||||
const int MAX_AGENTS = 1000;
|
||||
const int LOGOFF_CHECK_INTERVAL = 1000;
|
||||
|
||||
const unsigned short MIXER_LISTEN_PORT = 55443;
|
||||
|
||||
const int BUFFER_LENGTH_BYTES = 1024;
|
||||
const int BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t);
|
||||
|
||||
const float SAMPLE_RATE = 22050.0;
|
||||
const float BUFFER_SEND_INTERVAL_USECS = (BUFFER_LENGTH_SAMPLES/SAMPLE_RATE) * 1000000;
|
||||
|
||||
const short JITTER_BUFFER_MSECS = 20;
|
||||
const short JITTER_BUFFER_SAMPLES = JITTER_BUFFER_MSECS * (SAMPLE_RATE / 1000.0);
|
||||
|
||||
const short RING_BUFFER_FRAMES = 10;
|
||||
const short RING_BUFFER_SAMPLES = RING_BUFFER_FRAMES * BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
const long MAX_SAMPLE_VALUE = std::numeric_limits<int16_t>::max();
|
||||
const long MIN_SAMPLE_VALUE = std::numeric_limits<int16_t>::min();
|
||||
|
||||
|
@ -40,190 +37,100 @@ const int DOMAINSERVER_PORT = 40102;
|
|||
|
||||
const int MAX_SOURCE_BUFFERS = 20;
|
||||
|
||||
sockaddr_in agentAddress;
|
||||
|
||||
UDPSocket audioSocket = UDPSocket(MIXER_LISTEN_PORT);
|
||||
|
||||
struct AgentList {
|
||||
char *address;
|
||||
unsigned short port;
|
||||
bool active;
|
||||
timeval time;
|
||||
bool bufferTransmitted;
|
||||
} agents[MAX_AGENTS];
|
||||
|
||||
int numAgents = 0;
|
||||
|
||||
AudioRingBuffer *sourceBuffers[MAX_SOURCE_BUFFERS];
|
||||
|
||||
double diffclock(timeval *clock1, timeval *clock2)
|
||||
{
|
||||
double diffms = (clock2->tv_sec - clock1->tv_sec) * 1000.0;
|
||||
diffms += (clock2->tv_usec - clock1->tv_usec) / 1000.0; // us to ms
|
||||
return diffms;
|
||||
}
|
||||
|
||||
double usecTimestamp(timeval *time, double addedUsecs = 0) {
|
||||
return (time->tv_sec * 1000000.0) + time->tv_usec + addedUsecs;
|
||||
}
|
||||
AgentList agentList(MIXER_LISTEN_PORT);
|
||||
|
||||
void *sendBuffer(void *args)
|
||||
{
|
||||
int sentBytes;
|
||||
int currentFrame = 1;
|
||||
timeval startTime, sendTime, now;
|
||||
int nextFrame = 0;
|
||||
timeval startTime;
|
||||
|
||||
int16_t *clientMix = new int16_t[BUFFER_LENGTH_SAMPLES];
|
||||
long *masterMix = new long[BUFFER_LENGTH_SAMPLES];
|
||||
|
||||
|
||||
gettimeofday(&startTime, NULL);
|
||||
|
||||
while (true) {
|
||||
sentBytes = 0;
|
||||
|
||||
for (int wb = 0; wb < BUFFER_LENGTH_SAMPLES; wb++) {
|
||||
masterMix[wb] = 0;
|
||||
|
||||
for (int ms = 0; ms < BUFFER_LENGTH_SAMPLES; ms++) {
|
||||
masterMix[ms] = 0;
|
||||
}
|
||||
|
||||
gettimeofday(&sendTime, NULL);
|
||||
|
||||
for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) {
|
||||
if (sourceBuffers[b]->endOfLastWrite != NULL) {
|
||||
if (!sourceBuffers[b]->started
|
||||
&& sourceBuffers[b]->diffLastWriteNextOutput() <= BUFFER_LENGTH_SAMPLES + JITTER_BUFFER_SAMPLES) {
|
||||
std::cout << "Held back buffer " << b << ".\n";
|
||||
} else if (sourceBuffers[b]->diffLastWriteNextOutput() < BUFFER_LENGTH_SAMPLES) {
|
||||
std::cout << "Buffer " << b << " starved.\n";
|
||||
sourceBuffers[b]->started = false;
|
||||
} else {
|
||||
sourceBuffers[b]->started = true;
|
||||
agents[b].bufferTransmitted = true;
|
||||
|
||||
for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) {
|
||||
masterMix[s] += sourceBuffers[b]->nextOutput[s];
|
||||
}
|
||||
|
||||
sourceBuffers[b]->nextOutput += BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
if (sourceBuffers[b]->nextOutput >= sourceBuffers[b]->buffer + RING_BUFFER_SAMPLES) {
|
||||
sourceBuffers[b]->nextOutput = sourceBuffers[b]->buffer;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int a = 0; a < numAgents; a++) {
|
||||
if (diffclock(&agents[a].time, &sendTime) <= LOGOFF_CHECK_INTERVAL) {
|
||||
|
||||
int16_t *previousOutput = NULL;
|
||||
if (agents[a].bufferTransmitted) {
|
||||
previousOutput = (sourceBuffers[a]->nextOutput == sourceBuffers[a]->buffer)
|
||||
? sourceBuffers[a]->buffer + RING_BUFFER_SAMPLES - BUFFER_LENGTH_SAMPLES
|
||||
: sourceBuffers[a]->nextOutput - BUFFER_LENGTH_SAMPLES;
|
||||
agents[a].bufferTransmitted = false;
|
||||
}
|
||||
|
||||
for(int as = 0; as < BUFFER_LENGTH_SAMPLES; as++) {
|
||||
long longSample = previousOutput != NULL
|
||||
? masterMix[as] - previousOutput[as]
|
||||
: masterMix[as];
|
||||
|
||||
int16_t shortSample;
|
||||
|
||||
if (longSample < 0) {
|
||||
shortSample = std::max(longSample, MIN_SAMPLE_VALUE);
|
||||
} else {
|
||||
shortSample = std::min(longSample, MAX_SAMPLE_VALUE);
|
||||
}
|
||||
|
||||
clientMix[as] = shortSample;
|
||||
|
||||
// std::cout << as << " - CM: " << clientMix[as] << " MM: " << masterMix[as] << "\n";
|
||||
// std::cout << previousOutput - sourceBuffers[a]->buffer << "\n";
|
||||
|
||||
if (previousOutput != NULL) {
|
||||
// std::cout << "PO: " << previousOutput[as] << "\n";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
sentBytes = audioSocket.send(agents[a].address, agents[a].port, clientMix, BUFFER_LENGTH_BYTES);
|
||||
for (int ab = 0; ab < agentList.getAgents().size(); ab++) {
|
||||
AudioRingBuffer *agentBuffer = (AudioRingBuffer *)agentList.getAgents()[ab].getLinkedData();
|
||||
|
||||
if (sentBytes < BUFFER_LENGTH_BYTES) {
|
||||
std::cout << "Error sending mix packet! " << sentBytes << strerror(errno) << "\n";
|
||||
if (agentBuffer != NULL && agentBuffer->getEndOfLastWrite() != NULL) {
|
||||
if (!agentBuffer->isStarted() && agentBuffer->diffLastWriteNextOutput() <= BUFFER_LENGTH_SAMPLES + JITTER_BUFFER_SAMPLES) {
|
||||
printf("Held back buffer %d.\n", ab);
|
||||
} else if (agentBuffer->diffLastWriteNextOutput() < BUFFER_LENGTH_SAMPLES) {
|
||||
printf("Buffer %d starved.\n", ab);
|
||||
agentBuffer->setStarted(false);
|
||||
} else {
|
||||
// good buffer, add this to the mix
|
||||
agentBuffer->setStarted(true);
|
||||
agentBuffer->setAddedToMix(true);
|
||||
|
||||
for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) {
|
||||
masterMix[s] += agentBuffer->getNextOutput()[s];
|
||||
}
|
||||
|
||||
agentBuffer->setNextOutput(agentBuffer->getNextOutput() + BUFFER_LENGTH_SAMPLES);
|
||||
|
||||
if (agentBuffer->getNextOutput() >= agentBuffer->getBuffer() + RING_BUFFER_SAMPLES) {
|
||||
agentBuffer->setNextOutput(agentBuffer->getBuffer());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
gettimeofday(&now, NULL);
|
||||
}
|
||||
|
||||
double usecToSleep = usecTimestamp(&startTime, (currentFrame * BUFFER_SEND_INTERVAL_USECS)) - usecTimestamp(&now);
|
||||
for (int ab = 0; ab < agentList.getAgents().size(); ab++) {
|
||||
Agent *agent = &agentList.getAgents()[ab];
|
||||
AudioRingBuffer *agentBuffer = (AudioRingBuffer *)agent->getLinkedData();
|
||||
|
||||
int16_t *previousOutput = NULL;
|
||||
|
||||
if (agentBuffer != NULL && agentBuffer->wasAddedToMix()) {
|
||||
previousOutput = (agentBuffer->getNextOutput() == agentBuffer->getBuffer())
|
||||
? agentBuffer->getBuffer() + RING_BUFFER_SAMPLES - BUFFER_LENGTH_SAMPLES
|
||||
: agentBuffer->getNextOutput() - BUFFER_LENGTH_SAMPLES;
|
||||
agentBuffer->setAddedToMix(false);
|
||||
}
|
||||
|
||||
for (int s = 0; s < BUFFER_LENGTH_SAMPLES; s++) {
|
||||
long longSample = (previousOutput != NULL)
|
||||
? masterMix[s] - previousOutput[s]
|
||||
: masterMix[s];
|
||||
|
||||
int16_t shortSample;
|
||||
|
||||
if (longSample < 0) {
|
||||
shortSample = std::max(longSample, MIN_SAMPLE_VALUE);
|
||||
} else {
|
||||
shortSample = std::min(longSample, MAX_SAMPLE_VALUE);
|
||||
}
|
||||
|
||||
clientMix[s] = shortSample;
|
||||
}
|
||||
|
||||
agentList.getAgentSocket().send(agent->getPublicSocket(), clientMix, BUFFER_LENGTH_BYTES);
|
||||
}
|
||||
|
||||
double usecToSleep = usecTimestamp(&startTime) + (++nextFrame * BUFFER_SEND_INTERVAL_USECS) - usecTimestampNow();
|
||||
|
||||
if (usecToSleep > 0) {
|
||||
usleep(usecToSleep);
|
||||
} else {
|
||||
std::cout << "NOT SLEEPING!";
|
||||
std::cout << "Took too much time, not sleeping!\n";
|
||||
}
|
||||
|
||||
currentFrame++;
|
||||
}
|
||||
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
int addAgent(sockaddr_in *newAddress, void *audioData) {
|
||||
// Search for agent in list and add if needed
|
||||
int is_new = 0;
|
||||
int i = 0;
|
||||
|
||||
for (i = 0; i < numAgents; i++) {
|
||||
if (strcmp(inet_ntoa(newAddress->sin_addr), agents[i].address) == 0
|
||||
&& ntohs(newAddress->sin_port) == agents[i].port) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ((i == numAgents) || (agents[i].active == false)) {
|
||||
is_new = 1;
|
||||
|
||||
agents[i].address = new char();
|
||||
strcpy(agents[i].address, inet_ntoa(newAddress->sin_addr));
|
||||
|
||||
agents[i].bufferTransmitted = false;
|
||||
}
|
||||
|
||||
|
||||
agents[i].port = ntohs(newAddress->sin_port);
|
||||
agents[i].active = true;
|
||||
gettimeofday(&agents[i].time, NULL);
|
||||
|
||||
if (sourceBuffers[i]->endOfLastWrite == NULL) {
|
||||
sourceBuffers[i]->endOfLastWrite = sourceBuffers[i]->buffer;
|
||||
} else if (sourceBuffers[i]->diffLastWriteNextOutput() > RING_BUFFER_SAMPLES - BUFFER_LENGTH_SAMPLES) {
|
||||
// reset us to started state
|
||||
sourceBuffers[i]->endOfLastWrite = sourceBuffers[i]->buffer;
|
||||
sourceBuffers[i]->nextOutput = sourceBuffers[i]->buffer;
|
||||
sourceBuffers[i]->started = false;
|
||||
}
|
||||
|
||||
memcpy(sourceBuffers[i]->endOfLastWrite, audioData, BUFFER_LENGTH_BYTES);
|
||||
|
||||
sourceBuffers[i]->endOfLastWrite += BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
if (sourceBuffers[i]->endOfLastWrite >= sourceBuffers[i]->buffer + RING_BUFFER_SAMPLES) {
|
||||
sourceBuffers[i]->endOfLastWrite = sourceBuffers[i]->buffer;
|
||||
}
|
||||
|
||||
if (i == numAgents) {
|
||||
numAgents++;
|
||||
}
|
||||
|
||||
return is_new;
|
||||
}
|
||||
|
||||
void *reportAliveToDS(void *args) {
|
||||
|
||||
timeval lastSend, now;
|
||||
timeval lastSend;
|
||||
unsigned char output[7];
|
||||
|
||||
while (true) {
|
||||
|
@ -231,11 +138,9 @@ void *reportAliveToDS(void *args) {
|
|||
|
||||
*output = 'M';
|
||||
packSocket(output + 1, 895283510, htons(MIXER_LISTEN_PORT));
|
||||
audioSocket.send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7);
|
||||
agentList.getAgentSocket().send(DOMAIN_IP, DOMAINSERVER_PORT, output, 7);
|
||||
|
||||
gettimeofday(&now, NULL);
|
||||
|
||||
double usecToSleep = 1000000 - (usecTimestamp(&now) - usecTimestamp(&lastSend));
|
||||
double usecToSleep = 1000000 - (usecTimestampNow() - usecTimestamp(&lastSend));
|
||||
|
||||
if (usecToSleep > 0) {
|
||||
usleep(usecToSleep);
|
||||
|
@ -245,11 +150,18 @@ void *reportAliveToDS(void *args) {
|
|||
}
|
||||
}
|
||||
|
||||
void attachNewBufferToAgent(Agent *newAgent) {
|
||||
if (newAgent->getLinkedData() == NULL) {
|
||||
newAgent->setLinkedData(new AudioRingBuffer());
|
||||
}
|
||||
}
|
||||
|
||||
int main(int argc, const char * argv[])
|
||||
{
|
||||
timeval lastAgentUpdate;
|
||||
{
|
||||
ssize_t receivedBytes = 0;
|
||||
|
||||
agentList.linkedDataCreateCallback = attachNewBufferToAgent;
|
||||
|
||||
// setup the agentSocket to report to domain server
|
||||
pthread_t reportAliveThread;
|
||||
pthread_create(&reportAliveThread, NULL, reportAliveToDS, NULL);
|
||||
|
@ -270,25 +182,19 @@ int main(int argc, const char * argv[])
|
|||
printf("Using static domainserver IP: %s\n", DOMAIN_IP);
|
||||
}
|
||||
|
||||
gettimeofday(&lastAgentUpdate, NULL);
|
||||
|
||||
int16_t packetData[BUFFER_LENGTH_SAMPLES];
|
||||
|
||||
for (int b = 0; b < MAX_SOURCE_BUFFERS; b++) {
|
||||
sourceBuffers[b] = new AudioRingBuffer(10 * BUFFER_LENGTH_SAMPLES);
|
||||
}
|
||||
int16_t *packetData = new int16_t[BUFFER_LENGTH_SAMPLES];
|
||||
|
||||
pthread_t sendBufferThread;
|
||||
pthread_create(&sendBufferThread, NULL, sendBuffer, NULL);
|
||||
|
||||
sockaddr *agentAddress = new sockaddr;
|
||||
|
||||
while (true) {
|
||||
if(audioSocket.receive((sockaddr *)&agentAddress, packetData, &receivedBytes)) {
|
||||
if(agentList.getAgentSocket().receive(agentAddress, packetData, &receivedBytes)) {
|
||||
if (receivedBytes == BUFFER_LENGTH_BYTES) {
|
||||
if (addAgent(&agentAddress, packetData)) {
|
||||
std::cout << "Added agent: " <<
|
||||
inet_ntoa(agentAddress.sin_addr) << " on " <<
|
||||
ntohs(agentAddress.sin_port) << "\n";
|
||||
}
|
||||
// add or update the existing interface agent
|
||||
agentList.addOrUpdateAgent(agentAddress, agentAddress, 'I');
|
||||
agentList.updateAgentWithData(agentAddress, (void *)packetData, receivedBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,8 +49,11 @@ Agent::Agent(const Agent &otherAgent) {
|
|||
lastRecvTimeUsecs = otherAgent.lastRecvTimeUsecs;
|
||||
type = otherAgent.type;
|
||||
|
||||
// linked data is transient, gets re-assigned on next packet receive
|
||||
linkedData = NULL;
|
||||
if (otherAgent.linkedData != NULL) {
|
||||
linkedData = otherAgent.linkedData->clone();
|
||||
} else {
|
||||
linkedData = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
Agent& Agent::operator=(Agent otherAgent) {
|
||||
|
@ -116,8 +119,6 @@ void Agent::activatePublicSocket() {
|
|||
activeSocket = publicSocket;
|
||||
}
|
||||
|
||||
|
||||
|
||||
AgentData* Agent::getLinkedData() {
|
||||
return linkedData;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ class AgentData {
|
|||
public:
|
||||
virtual ~AgentData() = 0;
|
||||
virtual void parseData(void * data, int size) = 0;
|
||||
virtual AgentData* clone() const = 0;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
|
|
@ -33,6 +33,7 @@ class AgentList {
|
|||
int updateList(unsigned char *packetData, size_t dataBytes);
|
||||
bool addOrUpdateAgent(sockaddr *publicSocket, sockaddr *localSocket, char agentType);
|
||||
void processAgentData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
|
||||
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
|
||||
void broadcastToAgents(char *broadcastData, size_t dataBytes);
|
||||
void sendToAgent(Agent *destAgent, void *packetData, size_t dataBytes);
|
||||
void pingAgents();
|
||||
|
@ -44,7 +45,6 @@ class AgentList {
|
|||
pthread_t removeSilentAgentsThread;
|
||||
|
||||
int indexOfMatchingAgent(sockaddr *senderAddress);
|
||||
void updateAgentWithData(sockaddr *senderAddress, void *packetData, size_t dataBytes);
|
||||
void handlePingReply(sockaddr *agentAddress);
|
||||
};
|
||||
|
||||
|
|
|
@ -8,20 +8,90 @@
|
|||
|
||||
#include "AudioRingBuffer.h"
|
||||
|
||||
AudioRingBuffer::AudioRingBuffer(short ringBufferSamples) {
|
||||
ringBufferLengthSamples = ringBufferSamples;
|
||||
AudioRingBuffer::AudioRingBuffer() {
|
||||
started = false;
|
||||
addedToMix = false;
|
||||
|
||||
endOfLastWrite = NULL;
|
||||
|
||||
buffer = new int16_t[ringBufferLengthSamples];
|
||||
buffer = new int16_t[RING_BUFFER_SAMPLES];
|
||||
nextOutput = buffer;
|
||||
};
|
||||
|
||||
AudioRingBuffer::AudioRingBuffer(const AudioRingBuffer &otherRingBuffer) {
|
||||
started = otherRingBuffer.started;
|
||||
addedToMix = otherRingBuffer.addedToMix;
|
||||
|
||||
buffer = new int16_t[RING_BUFFER_SAMPLES];
|
||||
memcpy(buffer, otherRingBuffer.buffer, sizeof(int16_t) * RING_BUFFER_SAMPLES);
|
||||
|
||||
nextOutput = buffer + (otherRingBuffer.nextOutput - otherRingBuffer.buffer);
|
||||
endOfLastWrite = buffer + (otherRingBuffer.endOfLastWrite - otherRingBuffer.buffer);
|
||||
}
|
||||
|
||||
AudioRingBuffer::~AudioRingBuffer() {
|
||||
delete[] buffer;
|
||||
};
|
||||
|
||||
AudioRingBuffer* AudioRingBuffer::clone() const {
|
||||
return new AudioRingBuffer(*this);
|
||||
}
|
||||
|
||||
int16_t* AudioRingBuffer::getNextOutput() {
|
||||
return nextOutput;
|
||||
}
|
||||
|
||||
void AudioRingBuffer::setNextOutput(int16_t *newPointer) {
|
||||
nextOutput = newPointer;
|
||||
}
|
||||
|
||||
int16_t* AudioRingBuffer::getEndOfLastWrite() {
|
||||
return endOfLastWrite;
|
||||
}
|
||||
|
||||
void AudioRingBuffer::setEndOfLastWrite(int16_t *newPointer) {
|
||||
endOfLastWrite = newPointer;
|
||||
}
|
||||
|
||||
int16_t* AudioRingBuffer::getBuffer() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
bool AudioRingBuffer::isStarted() {
|
||||
return started;
|
||||
}
|
||||
|
||||
void AudioRingBuffer::setStarted(bool status) {
|
||||
started = status;
|
||||
}
|
||||
|
||||
bool AudioRingBuffer::wasAddedToMix() {
|
||||
return addedToMix;
|
||||
}
|
||||
|
||||
void AudioRingBuffer::setAddedToMix(bool added) {
|
||||
addedToMix = added;
|
||||
}
|
||||
|
||||
void AudioRingBuffer::parseData(void *data, int size) {
|
||||
int16_t *audioData = (int16_t *)data;
|
||||
|
||||
if (endOfLastWrite == NULL) {
|
||||
endOfLastWrite = buffer;
|
||||
} else if (diffLastWriteNextOutput() > RING_BUFFER_SAMPLES - BUFFER_LENGTH_SAMPLES) {
|
||||
endOfLastWrite = buffer;
|
||||
nextOutput = buffer;
|
||||
started = false;
|
||||
}
|
||||
|
||||
memcpy(endOfLastWrite, audioData, BUFFER_LENGTH_BYTES);
|
||||
endOfLastWrite += BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
if (endOfLastWrite >= buffer + RING_BUFFER_SAMPLES) {
|
||||
endOfLastWrite = buffer;
|
||||
}
|
||||
}
|
||||
|
||||
short AudioRingBuffer::diffLastWriteNextOutput()
|
||||
{
|
||||
if (endOfLastWrite == NULL) {
|
||||
|
@ -30,20 +100,9 @@ short AudioRingBuffer::diffLastWriteNextOutput()
|
|||
short sampleDifference = endOfLastWrite - nextOutput;
|
||||
|
||||
if (sampleDifference < 0) {
|
||||
sampleDifference += ringBufferLengthSamples;
|
||||
sampleDifference += RING_BUFFER_SAMPLES;
|
||||
}
|
||||
|
||||
return sampleDifference;
|
||||
}
|
||||
}
|
||||
|
||||
short AudioRingBuffer::bufferOverlap(int16_t *pointer, short addedDistance)
|
||||
{
|
||||
short samplesLeft = (buffer + ringBufferLengthSamples) - pointer;
|
||||
|
||||
if (samplesLeft < addedDistance) {
|
||||
return addedDistance - samplesLeft;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,20 +11,40 @@
|
|||
|
||||
#include <iostream>
|
||||
#include <stdint.h>
|
||||
#include "AgentData.h"
|
||||
|
||||
class AudioRingBuffer {
|
||||
const int BUFFER_LENGTH_BYTES = 1024;
|
||||
const int BUFFER_LENGTH_SAMPLES = BUFFER_LENGTH_BYTES / sizeof(int16_t);
|
||||
|
||||
const short RING_BUFFER_FRAMES = 10;
|
||||
const short RING_BUFFER_SAMPLES = RING_BUFFER_FRAMES * BUFFER_LENGTH_SAMPLES;
|
||||
|
||||
class AudioRingBuffer : public AgentData {
|
||||
public:
|
||||
AudioRingBuffer();
|
||||
~AudioRingBuffer();
|
||||
AudioRingBuffer(const AudioRingBuffer &otherRingBuffer);
|
||||
|
||||
void parseData(void *data, int size);
|
||||
AudioRingBuffer* clone() const;
|
||||
|
||||
int16_t* getNextOutput();
|
||||
void setNextOutput(int16_t *newPointer);
|
||||
int16_t* getEndOfLastWrite();
|
||||
void setEndOfLastWrite(int16_t *newPointer);
|
||||
int16_t* getBuffer();
|
||||
bool isStarted();
|
||||
void setStarted(bool status);
|
||||
bool wasAddedToMix();
|
||||
void setAddedToMix(bool added);
|
||||
|
||||
short diffLastWriteNextOutput();
|
||||
private:
|
||||
int16_t *nextOutput;
|
||||
int16_t *endOfLastWrite;
|
||||
int16_t *buffer;
|
||||
short ringBufferLengthSamples;
|
||||
bool started;
|
||||
|
||||
short diffLastWriteNextOutput();
|
||||
short bufferOverlap(int16_t *pointer, short addedDistance);
|
||||
|
||||
AudioRingBuffer(short ringBufferSamples);
|
||||
~AudioRingBuffer();
|
||||
bool addedToMix;
|
||||
};
|
||||
|
||||
#endif /* defined(__interface__AudioRingBuffer__) */
|
||||
|
|
|
@ -113,7 +113,6 @@ bool UDPSocket::receive(sockaddr *recvAddress, void *receivedData, ssize_t *rece
|
|||
|
||||
int UDPSocket::send(sockaddr *destAddress, const void *data, size_t byteLength) {
|
||||
// send data via UDP
|
||||
|
||||
int sent_bytes = sendto(handle, (const char*)data, byteLength,
|
||||
0, (sockaddr *) destAddress, sizeof(sockaddr_in));
|
||||
|
||||
|
|
Loading…
Reference in a new issue