hook AudioMixer to new event-driven assignment setup

This commit is contained in:
Stephen Birarda 2013-12-02 16:34:38 -08:00
parent 49191826af
commit 1e279cf99c
9 changed files with 223 additions and 167 deletions

View file

@ -26,7 +26,8 @@ AssignmentClient::AssignmentClient(int &argc, char **argv,
const HifiSockAddr& customAssignmentServerSocket,
const char* requestAssignmentPool) :
QCoreApplication(argc, argv),
_requestAssignment(Assignment::RequestCommand, requestAssignmentType, requestAssignmentPool)
_requestAssignment(Assignment::RequestCommand, requestAssignmentType, requestAssignmentPool),
_currentAssignment(NULL)
{
// set the logging target to the the CHILD_TARGET_NAME
Logging::setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME);
@ -45,8 +46,92 @@ AssignmentClient::AssignmentClient(int &argc, char **argv,
QTimer* timer = new QTimer(this);
connect(timer, SIGNAL(timeout()), SLOT(sendAssignmentRequest()));
timer->start(ASSIGNMENT_REQUEST_INTERVAL_MSECS);
// connect our readPendingDatagrams method to the readyRead() signal of the socket
connect(&nodeList->getNodeSocket(), SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
}
void AssignmentClient::sendAssignmentRequest() {
NodeList::getInstance()->sendAssignment(_requestAssignment);
if (!_currentAssignment) {
NodeList::getInstance()->sendAssignment(_requestAssignment);
}
}
void AssignmentClient::readPendingDatagrams() {
NodeList* nodeList = NodeList::getInstance();
static unsigned char packetData[1500];
static qint64 receivedBytes = 0;
static HifiSockAddr senderSockAddr;
while (nodeList->getNodeSocket().hasPendingDatagrams()) {
if ((receivedBytes = nodeList->getNodeSocket().readDatagram((char*) packetData, MAX_PACKET_SIZE,
senderSockAddr.getAddressPointer(),
senderSockAddr.getPortPointer()))
&& packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_DEPLOY_ASSIGNMENT || packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
if (_currentAssignment) {
qDebug() << "Dropping received assignment since we are currently running one.\n";
} else {
// construct the deployed assignment from the packet data
_currentAssignment = AssignmentFactory::unpackAssignment(packetData, receivedBytes);
qDebug() << "Received an assignment -" << *_currentAssignment << "\n";
// switch our nodelist domain IP and port to whoever sent us the assignment
if (packetData[0] == PACKET_TYPE_CREATE_ASSIGNMENT) {
nodeList->setDomainSockAddr(senderSockAddr);
nodeList->setOwnerUUID(_currentAssignment->getUUID());
qDebug("Destination IP for assignment is %s\n",
nodeList->getDomainIP().toString().toStdString().c_str());
// start the deployed assignment
QThread *workerThread = new QThread(this);
connect(workerThread, SIGNAL(started()), _currentAssignment, SLOT(run()));
connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(quit()));
connect(_currentAssignment, SIGNAL(finished()), _currentAssignment, SLOT(deleteLater()));
connect(_currentAssignment, SIGNAL(finished()), workerThread, SLOT(deleteLater()));
_currentAssignment->moveToThread(workerThread);
// Starts an event loop, and emits workerThread->started()
workerThread->start();
} else {
qDebug("Received a bad destination socket for assignment.\n");
}
}
} else if (_currentAssignment) {
qRegisterMetaType<HifiSockAddr>("HifiSockAddr");
// have the threaded current assignment handle this datagram
QMetaObject::invokeMethod(_currentAssignment, "processDatagram", Qt::QueuedConnection,
Q_ARG(const QByteArray&, QByteArray((char*) packetData, receivedBytes)),
Q_ARG(const HifiSockAddr&, senderSockAddr));
} else {
// have the NodeList attempt to handle it
nodeList->processNodeData(senderSockAddr, packetData, receivedBytes);
}
}
}
}
void AssignmentClient::assignmentCompleted() {
qDebug("Assignment finished or never started - waiting for new assignment\n");
// the _currentAssignment is being deleted, set our pointer to NULL
_currentAssignment = NULL;
NodeList* nodeList = NodeList::getInstance();
// reset our NodeList by switching back to unassigned and clearing the list
nodeList->setOwnerType(NODE_TYPE_UNASSIGNED);
nodeList->reset();
// reset the logging target to the the CHILD_TARGET_NAME
Logging::setTargetName(ASSIGNMENT_CLIENT_TARGET_NAME);
}

View file

@ -20,8 +20,11 @@ public:
const char* requestAssignmentPool = NULL);
private slots:
void sendAssignmentRequest();
void readPendingDatagrams();
void assignmentCompleted();
private:
Assignment _requestAssignment;
Assignment* _currentAssignment;
};
#endif /* defined(__hifi__AssignmentClient__) */

View file

@ -32,6 +32,8 @@
#include <glm/gtx/norm.hpp>
#include <glm/gtx/vector_angle.hpp>
#include <QtCore/QTimer>
#include <Logging.h>
#include <NodeList.h>
#include <Node.h>
@ -51,7 +53,7 @@
const short JITTER_BUFFER_MSECS = 12;
const short JITTER_BUFFER_SAMPLES = JITTER_BUFFER_MSECS * (SAMPLE_RATE / 1000.0);
const unsigned int BUFFER_SEND_INTERVAL_USECS = floorf((BUFFER_LENGTH_SAMPLES_PER_CHANNEL / SAMPLE_RATE) * 1000000);
const unsigned int BUFFER_SEND_INTERVAL_MSECS = floorf((BUFFER_LENGTH_SAMPLES_PER_CHANNEL / SAMPLE_RATE) * 1000);
const int MAX_SAMPLE_VALUE = std::numeric_limits<int16_t>::max();
const int MIN_SAMPLE_VALUE = std::numeric_limits<int16_t>::min();
@ -217,6 +219,80 @@ void AudioMixer::prepareMixForListeningNode(Node* node) {
}
}
void AudioMixer::processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr) {
// pull any new audio data from nodes off of the network stack
if (dataByteArray.data()[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO
|| dataByteArray.data()[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO
|| dataByteArray.data()[0] == PACKET_TYPE_INJECT_AUDIO) {
QUuid nodeUUID = QUuid::fromRfc4122(dataByteArray.mid(numBytesForPacketHeader((unsigned char*) dataByteArray.data()),
NUM_BYTES_RFC4122_UUID));
NodeList* nodeList = NodeList::getInstance();
Node* matchingNode = nodeList->nodeWithUUID(nodeUUID);
if (matchingNode) {
nodeList->updateNodeWithData(matchingNode, senderSockAddr, (unsigned char*) dataByteArray.data(), dataByteArray.size());
if (!matchingNode->getActiveSocket()) {
// we don't have an active socket for this node, but they're talking to us
// this means they've heard from us and can reply, let's assume public is active
matchingNode->activatePublicSocket();
}
}
} else {
// let processNodeData handle it.
NodeList::getInstance()->processNodeData(senderSockAddr, (unsigned char*) dataByteArray.data(), dataByteArray.size());
}
}
void AudioMixer::checkInWithDomainServerOrExit() {
if (NodeList::getInstance()->getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS) {
emit finished();
} else {
NodeList::getInstance()->sendDomainServerCheckIn();
}
}
void AudioMixer::sendClientMixes() {
NodeList* nodeList = NodeList::getInstance();
// get the NodeList to ping any inactive nodes, for hole punching
nodeList->possiblyPingInactiveNodes();
int numBytesPacketHeader = numBytesForPacketHeader((unsigned char*) &PACKET_TYPE_MIXED_AUDIO);
unsigned char clientPacket[BUFFER_LENGTH_BYTES_STEREO + numBytesPacketHeader];
populateTypeAndVersion(clientPacket, PACKET_TYPE_MIXED_AUDIO);
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
((AudioMixerClientData*) node->getLinkedData())->checkBuffersBeforeFrameSend(JITTER_BUFFER_SAMPLES);
}
}
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getType() == NODE_TYPE_AGENT && node->getActiveSocket() && node->getLinkedData()
&& ((AudioMixerClientData*) node->getLinkedData())->getAvatarAudioRingBuffer()) {
prepareMixForListeningNode(&(*node));
memcpy(clientPacket + numBytesPacketHeader, _clientSamples, sizeof(_clientSamples));
nodeList->getNodeSocket().writeDatagram((char*) clientPacket, sizeof(clientPacket),
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
}
}
// push forward the next output pointers for any audio buffers we used
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
((AudioMixerClientData*) node->getLinkedData())->pushBuffersAfterFrameSend();
}
}
}
void AudioMixer::run() {
// change the logging target name while this is running
Logging::setTargetName(AUDIO_MIXER_LOGGING_TARGET_NAME);
@ -227,140 +303,17 @@ void AudioMixer::run() {
const char AUDIO_MIXER_NODE_TYPES_OF_INTEREST[2] = { NODE_TYPE_AGENT, NODE_TYPE_AUDIO_INJECTOR };
nodeList->setNodeTypesOfInterest(AUDIO_MIXER_NODE_TYPES_OF_INTEREST, sizeof(AUDIO_MIXER_NODE_TYPES_OF_INTEREST));
ssize_t receivedBytes = 0;
nodeList->linkedDataCreateCallback = attachNewBufferToNode;
nodeList->startSilentNodeRemovalThread();
QTimer* domainServerTimer = new QTimer(this);
connect(domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_USECS / 1000);
unsigned char packetData[MAX_PACKET_SIZE] = {};
QTimer* silentNodeTimer = new QTimer(this);
connect(silentNodeTimer, SIGNAL(timeout()), nodeList, SLOT(removeSilentNodes()));
silentNodeTimer->start(NODE_SILENCE_THRESHOLD_USECS / 1000);
HifiSockAddr nodeSockAddr;
int nextFrame = 0;
timeval startTime;
int numBytesPacketHeader = numBytesForPacketHeader((unsigned char*) &PACKET_TYPE_MIXED_AUDIO);
unsigned char clientPacket[BUFFER_LENGTH_BYTES_STEREO + numBytesPacketHeader];
populateTypeAndVersion(clientPacket, PACKET_TYPE_MIXED_AUDIO);
gettimeofday(&startTime, NULL);
timeval lastDomainServerCheckIn = {};
timeval beginSendTime, endSendTime;
float sumFrameTimePercentages = 0.0f;
int numStatCollections = 0;
// if we'll be sending stats, call the Logstash::socket() method to make it load the logstash IP outside the loop
if (Logging::shouldSendStats()) {
Logging::socket();
}
while (true) {
if (NodeList::getInstance()->getNumNoReplyDomainCheckIns() == MAX_SILENT_DOMAIN_SERVER_CHECK_INS) {
break;
}
if (Logging::shouldSendStats()) {
gettimeofday(&beginSendTime, NULL);
}
// send a check in packet to the domain server if DOMAIN_SERVER_CHECK_IN_USECS has elapsed
if (usecTimestampNow() - usecTimestamp(&lastDomainServerCheckIn) >= DOMAIN_SERVER_CHECK_IN_USECS) {
gettimeofday(&lastDomainServerCheckIn, NULL);
NodeList::getInstance()->sendDomainServerCheckIn();
if (Logging::shouldSendStats() && numStatCollections > 0) {
// if we should be sending stats to Logstash send the appropriate average now
const char MIXER_LOGSTASH_METRIC_NAME[] = "audio-mixer-frame-time-usage";
float averageFrameTimePercentage = sumFrameTimePercentages / numStatCollections;
Logging::stashValue(STAT_TYPE_TIMER, MIXER_LOGSTASH_METRIC_NAME, averageFrameTimePercentage);
sumFrameTimePercentages = 0.0f;
numStatCollections = 0;
}
}
// get the NodeList to ping any inactive nodes, for hole punching
nodeList->possiblyPingInactiveNodes();
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
((AudioMixerClientData*) node->getLinkedData())->checkBuffersBeforeFrameSend(JITTER_BUFFER_SAMPLES);
}
}
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getType() == NODE_TYPE_AGENT && node->getActiveSocket() && node->getLinkedData()
&& ((AudioMixerClientData*) node->getLinkedData())->getAvatarAudioRingBuffer()) {
prepareMixForListeningNode(&(*node));
memcpy(clientPacket + numBytesPacketHeader, _clientSamples, sizeof(_clientSamples));
nodeList->getNodeSocket().writeDatagram((char*) clientPacket, sizeof(clientPacket),
node->getActiveSocket()->getAddress(),
node->getActiveSocket()->getPort());
}
}
// push forward the next output pointers for any audio buffers we used
for (NodeList::iterator node = nodeList->begin(); node != nodeList->end(); node++) {
if (node->getLinkedData()) {
((AudioMixerClientData*) node->getLinkedData())->pushBuffersAfterFrameSend();
}
}
// pull any new audio data from nodes off of the network stack
while (nodeList->getNodeSocket().hasPendingDatagrams() &&
(receivedBytes = nodeList->getNodeSocket().readDatagram((char*) packetData, MAX_PACKET_SIZE,
nodeSockAddr.getAddressPointer(),
nodeSockAddr.getPortPointer())) &&
packetVersionMatch(packetData)) {
if (packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_NO_ECHO
|| packetData[0] == PACKET_TYPE_MICROPHONE_AUDIO_WITH_ECHO
|| packetData[0] == PACKET_TYPE_INJECT_AUDIO) {
QUuid nodeUUID = QUuid::fromRfc4122(QByteArray((char*) packetData + numBytesForPacketHeader(packetData),
NUM_BYTES_RFC4122_UUID));
Node* matchingNode = nodeList->nodeWithUUID(nodeUUID);
if (matchingNode) {
nodeList->updateNodeWithData(matchingNode, nodeSockAddr, packetData, receivedBytes);
if (!matchingNode->getActiveSocket()) {
// we don't have an active socket for this node, but they're talking to us
// this means they've heard from us and can reply, let's assume public is active
matchingNode->activatePublicSocket();
}
}
} else {
// let processNodeData handle it.
nodeList->processNodeData(nodeSockAddr, packetData, receivedBytes);
}
}
if (Logging::shouldSendStats()) {
// send a packet to our logstash instance
// calculate the percentage value for time elapsed for this send (of the max allowable time)
gettimeofday(&endSendTime, NULL);
float percentageOfMaxElapsed = ((float) (usecTimestamp(&endSendTime) - usecTimestamp(&beginSendTime))
/ BUFFER_SEND_INTERVAL_USECS) * 100.0f;
sumFrameTimePercentages += percentageOfMaxElapsed;
numStatCollections++;
}
int usecToSleep = usecTimestamp(&startTime) + (++nextFrame * BUFFER_SEND_INTERVAL_USECS) - usecTimestampNow();
if (usecToSleep > 0) {
usleep(usecToSleep);
} else {
qDebug("Took too much time, not sleeping!\n");
}
}
QTimer* mixSendTimer = new QTimer(this);
connect(mixSendTimer, SIGNAL(timeout()), this, SLOT(sendClientMixes()));
mixSendTimer->start(BUFFER_SEND_INTERVAL_MSECS);
}

View file

@ -9,6 +9,7 @@
#ifndef __hifi__AudioMixer__
#define __hifi__AudioMixer__
#include <Assignment.h>
#include <AudioRingBuffer.h>
@ -17,11 +18,16 @@ class AvatarAudioRingBuffer;
/// Handles assignments of type AudioMixer - mixing streams of audio and re-distributing to various clients.
class AudioMixer : public Assignment {
Q_OBJECT
public:
AudioMixer(const unsigned char* dataBuffer, int numBytes);
public slots:
/// runs the audio mixer
void run();
void processDatagram(const QByteArray& dataByteArray, const HifiSockAddr& senderSockAddr);
signals:
void finished();
private:
/// adds one buffer to the mix for a listening node
void addBufferToMixForListeningNodeWithBuffer(PositionalAudioRingBuffer* bufferToAdd,
@ -32,6 +38,9 @@ private:
int16_t _clientSamples[BUFFER_LENGTH_SAMPLES_PER_CHANNEL * 2];
private slots:
void checkInWithDomainServerOrExit();
void sendClientMixes();
};
#endif /* defined(__hifi__AudioMixer__) */

View file

@ -11,6 +11,7 @@
#include <sys/time.h>
#include <QTCore/QThread>
#include <QtCore/QUuid>
#include "NodeList.h"
@ -89,7 +90,7 @@ public:
// implement parseData to return 0 so we can be a subclass of NodeData
int parseData(unsigned char* sourceBuffer, int numBytes) { return 0; }
/// blocking run of the assignment
/// threaded run of assignment
virtual void run();
friend QDebug operator<<(QDebug debug, const Assignment& assignment);

View file

@ -45,4 +45,6 @@ private:
quint32 getLocalAddress();
Q_DECLARE_METATYPE(HifiSockAddr)
#endif /* defined(__hifi__HifiSockAddr__) */

View file

@ -14,7 +14,7 @@
class Node;
class NodeData : public QObject {
Q_OBJECT
Q_OBJECT
public:
NodeData(Node* owningNode = NULL);

View file

@ -75,7 +75,7 @@ NodeList::NodeList(char newOwnerType, unsigned short int newSocketListenPort) :
_hasCompletedInitialSTUNFailure(false),
_stunRequestsSinceSuccess(0)
{
_nodeSocket.bind(QHostAddress::LocalHost, newSocketListenPort);
_nodeSocket.bind(QHostAddress::AnyIPv4, newSocketListenPort);
}
NodeList::~NodeList() {
@ -339,23 +339,15 @@ void NodeList::sendSTUNRequest() {
memcpy(stunRequestPacket + packetIndex, &transactionID, sizeof(transactionID));
// lookup the IP for the STUN server
static QHostInfo stunInfo = QHostInfo::fromName(STUN_SERVER_HOSTNAME);
static HifiSockAddr stunSockAddr(STUN_SERVER_HOSTNAME, STUN_SERVER_PORT);
for (int i = 0; i < stunInfo.addresses().size(); i++) {
if (stunInfo.addresses()[i].protocol() == QAbstractSocket::IPv4Protocol) {
QString stunIPAddress = stunInfo.addresses()[i].toString();
if (!_hasCompletedInitialSTUNFailure) {
qDebug("Sending intial stun request to %s\n", stunIPAddress.toLocal8Bit().constData());
}
_nodeSocket.writeDatagram((char*) stunRequestPacket, sizeof(stunRequestPacket),
QHostAddress(stunIPAddress), STUN_SERVER_PORT);
break;
}
if (!_hasCompletedInitialSTUNFailure) {
qDebug("Sending intial stun request to %s\n", stunSockAddr.getAddress().toString().toLocal8Bit().constData());
}
_nodeSocket.writeDatagram((char*) stunRequestPacket, sizeof(stunRequestPacket),
stunSockAddr.getAddress(), stunSockAddr.getPort());
_stunRequestsSinceSuccess++;
if (_stunRequestsSinceSuccess >= NUM_STUN_REQUESTS_BEFORE_FALLBACK) {
@ -800,7 +792,22 @@ void NodeList::killNode(Node* node, bool mustLockNode) {
}
}
void* removeSilentNodes(void *args) {
void NodeList::removeSilentNodes() {
NodeList* nodeList = NodeList::getInstance();
for(NodeList::iterator node = nodeList->begin(); node != nodeList->end(); ++node) {
node->lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > NODE_SILENCE_THRESHOLD_USECS) {
// kill this node, don't lock - we already did it
nodeList->killNode(&(*node), false);
}
node->unlock();
}
}
void* removeSilentNodesAndSleep(void *args) {
NodeList* nodeList = (NodeList*) args;
uint64_t checkTimeUsecs = 0;
int sleepTime = 0;
@ -809,16 +816,7 @@ void* removeSilentNodes(void *args) {
checkTimeUsecs = usecTimestampNow();
for(NodeList::iterator node = nodeList->begin(); node != nodeList->end(); ++node) {
node->lock();
if ((usecTimestampNow() - node->getLastHeardMicrostamp()) > NODE_SILENCE_THRESHOLD_USECS) {
// kill this node, don't lock - we already did it
nodeList->killNode(&(*node), false);
}
node->unlock();
}
nodeList->removeSilentNodes();
sleepTime = NODE_SILENCE_THRESHOLD_USECS - (usecTimestampNow() - checkTimeUsecs);
@ -841,7 +839,7 @@ void* removeSilentNodes(void *args) {
void NodeList::startSilentNodeRemovalThread() {
if (!::silentNodeThreadStopFlag) {
pthread_create(&removeSilentNodesThread, NULL, removeSilentNodes, (void*) this);
pthread_create(&removeSilentNodesThread, NULL, removeSilentNodesAndSleep, (void*) this);
} else {
qDebug("Refusing to start silent node removal thread from previously failed join.\n");
}

View file

@ -60,7 +60,8 @@ public:
virtual void domainChanged(QString domain) = 0;
};
class NodeList {
class NodeList : public QObject {
Q_OBJECT
public:
static NodeList* createInstance(char ownerType, unsigned short int socketListenPort = 0);
static NodeList* getInstance();
@ -79,6 +80,8 @@ public:
const QHostAddress& getDomainIP() const { return _domainSockAddr.getAddress(); }
void setDomainIPToLocalhost() { _domainSockAddr.setAddress(QHostAddress(INADDR_LOOPBACK)); }
void setDomainSockAddr(const HifiSockAddr& domainSockAddr) { _domainSockAddr = domainSockAddr; }
unsigned short getDomainPort() const { return _domainSockAddr.getPort(); }
const QUuid& getOwnerUUID() const { return _ownerUUID; }
@ -98,7 +101,6 @@ public:
void setNodeTypesOfInterest(const char* nodeTypesOfInterest, int numNodeTypesOfInterest);
void sendDomainServerCheckIn();
int processDomainServerList(unsigned char *packetData, size_t dataBytes);
void setAssignmentServerSocket(const HifiSockAddr& serverSocket) { _assignmentServerSocket = serverSocket; }
@ -141,6 +143,9 @@ public:
void possiblyPingInactiveNodes();
const HifiSockAddr* getNodeActiveSocketOrPing(Node* node);
public slots:
void sendDomainServerCheckIn();
void removeSilentNodes();
private:
static NodeList* _sharedInstance;