rework audio mixer so it can exit cleanly

This commit is contained in:
Seth Alves 2015-04-28 18:38:41 -07:00
parent a234c02d7d
commit 02fd9987c7
10 changed files with 155 additions and 81 deletions

View file

@ -114,7 +114,16 @@ void AssignmentClient::stopAssignmentClient() {
qDebug() << "Exiting."; qDebug() << "Exiting.";
_requestTimer.stop(); _requestTimer.stop();
_statsTimerACM.stop(); _statsTimerACM.stop();
QCoreApplication::quit(); if (_currentAssignment) {
_currentAssignment->aboutToQuit();
// _currentAssignment->aboutToFinish();
_currentAssignment->thread()->wait();
}
}
void AssignmentClient::aboutToQuit() {
stopAssignmentClient();
} }
@ -197,6 +206,7 @@ void AssignmentClient::readPendingDatagrams() {
// start the deployed assignment // start the deployed assignment
AssignmentThread* workerThread = new AssignmentThread(_currentAssignment, this); AssignmentThread* workerThread = new AssignmentThread(_currentAssignment, this);
workerThread->setObjectName("worker");
connect(workerThread, &QThread::started, _currentAssignment.data(), &ThreadedAssignment::run); connect(workerThread, &QThread::started, _currentAssignment.data(), &ThreadedAssignment::run);
connect(_currentAssignment.data(), &ThreadedAssignment::finished, workerThread, &QThread::quit); connect(_currentAssignment.data(), &ThreadedAssignment::finished, workerThread, &QThread::quit);

View file

@ -34,6 +34,9 @@ private slots:
void sendStatsPacketToACM(); void sendStatsPacketToACM();
void stopAssignmentClient(); void stopAssignmentClient();
public slots:
void aboutToQuit();
private: private:
void setUpStatsToMonitor(int ppid); void setUpStatsToMonitor(int ppid);
Assignment _requestAssignment; Assignment _requestAssignment;

View file

@ -10,6 +10,7 @@
// //
#include <QCommandLineParser> #include <QCommandLineParser>
#include <QThread>
#include <LogHandler.h> #include <LogHandler.h>
#include <SharedUtil.h> #include <SharedUtil.h>
@ -180,14 +181,19 @@ AssignmentClientApp::AssignmentClientApp(int argc, char* argv[]) :
} }
} }
QThread::currentThread()->setObjectName("main thread");
if (numForks || minForks || maxForks) { if (numForks || minForks || maxForks) {
AssignmentClientMonitor monitor(numForks, minForks, maxForks, requestAssignmentType, assignmentPool, AssignmentClientMonitor monitor(numForks, minForks, maxForks, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort); walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &monitor, &AssignmentClientMonitor::aboutToQuit);
exec(); exec();
} else { } else {
AssignmentClient client(ppid, requestAssignmentType, assignmentPool, AssignmentClient client(ppid, requestAssignmentType, assignmentPool,
walletUUID, assignmentServerHostname, assignmentServerPort); walletUUID, assignmentServerHostname, assignmentServerPort);
connect(this, &QCoreApplication::aboutToQuit, &client, &AssignmentClient::aboutToQuit);
exec(); exec();
} }
} }

View file

@ -80,6 +80,12 @@ void AssignmentClientMonitor::stopChildProcesses() {
}); });
} }
void AssignmentClientMonitor::aboutToQuit() {
stopChildProcesses();
}
void AssignmentClientMonitor::spawnChildClient() { void AssignmentClientMonitor::spawnChildClient() {
QProcess *assignmentClient = new QProcess(this); QProcess *assignmentClient = new QProcess(this);

View file

@ -38,6 +38,9 @@ private slots:
void readPendingDatagrams(); void readPendingDatagrams();
void checkSpares(); void checkSpares();
public slots:
void aboutToQuit();
private: private:
void spawnChildClient(); void spawnChildClient();
QTimer _checkSparesTimer; // every few seconds see if it need fewer or more spare children QTimer _checkSparesTimer; // every few seconds see if it need fewer or more spare children

View file

@ -588,7 +588,6 @@ void AudioMixer::sendStatsPacket() {
_sumMixes = 0; _sumMixes = 0;
_numStatFrames = 0; _numStatFrames = 0;
// NOTE: These stats can be too large to fit in an MTU, so we break it up into multiple packts... // NOTE: These stats can be too large to fit in an MTU, so we break it up into multiple packts...
QJsonObject statsObject2; QJsonObject statsObject2;
@ -712,78 +711,90 @@ void AudioMixer::run() {
// check the settings object to see if we have anything we can parse out // check the settings object to see if we have anything we can parse out
parseSettingsObject(settingsObject); parseSettingsObject(settingsObject);
int nextFrame = 0; _nextFrame = 0;
QElapsedTimer timer; _timer.start();
timer.start();
char clientMixBuffer[MAX_PACKET_SIZE]; _idleTimer = new QTimer();
connect(_idleTimer, SIGNAL(timeout()), this, SLOT(insideLoop()));
int usecToSleep = AudioConstants::NETWORK_FRAME_USECS; _idleTimer->start(0);
}
const int TRAILING_AVERAGE_FRAMES = 100;
int framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES;
while (!_isFinished) {
const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f; void AudioMixer::insideLoop() {
const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f; if (_isFinished) {
qDebug() << "AudioMixer::insideLoop stoping _idleTimer";
_idleTimer->stop();
delete _idleTimer;
_idleTimer = nullptr;
QThread *thisThread = QThread::currentThread();
thisThread->quit();
return;
}
auto nodeList = DependencyManager::get<NodeList>();
const float STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.10f;
const float BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD = 0.20f;
const float RATIO_BACK_OFF = 0.02f; const float RATIO_BACK_OFF = 0.02f;
const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES; const float CURRENT_FRAME_RATIO = 1.0f / TRAILING_AVERAGE_FRAMES;
const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO; const float PREVIOUS_FRAMES_RATIO = 1.0f - CURRENT_FRAME_RATIO;
if (usecToSleep < 0) { if (_usecToSleep < 0) {
usecToSleep = 0; _usecToSleep = 0;
} }
_trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio) _trailingSleepRatio = (PREVIOUS_FRAMES_RATIO * _trailingSleepRatio)
+ (usecToSleep * CURRENT_FRAME_RATIO / (float) AudioConstants::NETWORK_FRAME_USECS); + (_usecToSleep * CURRENT_FRAME_RATIO / (float) AudioConstants::NETWORK_FRAME_USECS);
float lastCutoffRatio = _performanceThrottlingRatio; float lastCutoffRatio = _performanceThrottlingRatio;
bool hasRatioChanged = false; bool hasRatioChanged = false;
if (framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) { if (_framesSinceCutoffEvent >= TRAILING_AVERAGE_FRAMES) {
if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) { if (_trailingSleepRatio <= STRUGGLE_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD) {
// we're struggling - change our min required loudness to reduce some load // we're struggling - change our min required loudness to reduce some load
_performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio)); _performanceThrottlingRatio = _performanceThrottlingRatio + (0.5f * (1.0f - _performanceThrottlingRatio));
qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was" qDebug() << "Mixer is struggling, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio; << lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true; hasRatioChanged = true;
} else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) { } else if (_trailingSleepRatio >= BACK_OFF_TRIGGER_SLEEP_PERCENTAGE_THRESHOLD && _performanceThrottlingRatio != 0) {
// we've recovered and can back off the required loudness // we've recovered and can back off the required loudness
_performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF; _performanceThrottlingRatio = _performanceThrottlingRatio - RATIO_BACK_OFF;
if (_performanceThrottlingRatio < 0) { if (_performanceThrottlingRatio < 0) {
_performanceThrottlingRatio = 0; _performanceThrottlingRatio = 0;
}
qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
<< lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
} }
if (hasRatioChanged) {
// set out min audability threshold from the new ratio
_minAudibilityThreshold = LOUDNESS_TO_DISTANCE_RATIO / (2.0f * (1.0f - _performanceThrottlingRatio));
qDebug() << "Minimum audability required to be mixed is now" << _minAudibilityThreshold;
framesSinceCutoffEvent = 0; qDebug() << "Mixer is recovering, sleeping" << _trailingSleepRatio * 100 << "% of frame time. Old cutoff was"
} << lastCutoffRatio << "and is now" << _performanceThrottlingRatio;
hasRatioChanged = true;
} }
if (!hasRatioChanged) {
++framesSinceCutoffEvent;
}
quint64 now = usecTimestampNow();
if (now - _lastPerSecondCallbackTime > USECS_PER_SECOND) {
perSecondActions();
_lastPerSecondCallbackTime = now;
}
nodeList->eachNode([&](const SharedNodePointer& node) {
if (hasRatioChanged) {
// set out min audability threshold from the new ratio
_minAudibilityThreshold = LOUDNESS_TO_DISTANCE_RATIO / (2.0f * (1.0f - _performanceThrottlingRatio));
qDebug() << "Minimum audability required to be mixed is now" << _minAudibilityThreshold;
_framesSinceCutoffEvent = 0;
}
}
if (!hasRatioChanged) {
++_framesSinceCutoffEvent;
}
quint64 now = usecTimestampNow();
if (now - _lastPerSecondCallbackTime > USECS_PER_SECOND) {
perSecondActions();
_lastPerSecondCallbackTime = now;
}
nodeList->eachNode([&](const SharedNodePointer& node) {
if (node->getLinkedData()) { if (node->getLinkedData()) {
AudioMixerClientData* nodeData = (AudioMixerClientData*)node->getLinkedData(); AudioMixerClientData* nodeData = (AudioMixerClientData*)node->getLinkedData();
@ -807,8 +818,8 @@ void AudioMixer::run() {
char* mixDataAt; char* mixDataAt;
if (streamsMixed > 0) { if (streamsMixed > 0) {
// pack header // pack header
int numBytesMixPacketHeader = populatePacketHeader(clientMixBuffer, PacketTypeMixedAudio); int numBytesMixPacketHeader = populatePacketHeader(_clientMixBuffer, PacketTypeMixedAudio);
mixDataAt = clientMixBuffer + numBytesMixPacketHeader; mixDataAt = _clientMixBuffer + numBytesMixPacketHeader;
// pack sequence number // pack sequence number
quint16 sequence = nodeData->getOutgoingSequenceNumber(); quint16 sequence = nodeData->getOutgoingSequenceNumber();
@ -820,8 +831,8 @@ void AudioMixer::run() {
mixDataAt += AudioConstants::NETWORK_FRAME_BYTES_STEREO; mixDataAt += AudioConstants::NETWORK_FRAME_BYTES_STEREO;
} else { } else {
// pack header // pack header
int numBytesPacketHeader = populatePacketHeader(clientMixBuffer, PacketTypeSilentAudioFrame); int numBytesPacketHeader = populatePacketHeader(_clientMixBuffer, PacketTypeSilentAudioFrame);
mixDataAt = clientMixBuffer + numBytesPacketHeader; mixDataAt = _clientMixBuffer + numBytesPacketHeader;
// pack sequence number // pack sequence number
quint16 sequence = nodeData->getOutgoingSequenceNumber(); quint16 sequence = nodeData->getOutgoingSequenceNumber();
@ -833,12 +844,12 @@ void AudioMixer::run() {
memcpy(mixDataAt, &numSilentSamples, sizeof(quint16)); memcpy(mixDataAt, &numSilentSamples, sizeof(quint16));
mixDataAt += sizeof(quint16); mixDataAt += sizeof(quint16);
} }
// Send audio environment // Send audio environment
sendAudioEnvironmentPacket(node); sendAudioEnvironmentPacket(node);
// send mixed audio packet // send mixed audio packet
nodeList->writeDatagram(clientMixBuffer, mixDataAt - clientMixBuffer, node); nodeList->writeDatagram(_clientMixBuffer, mixDataAt - _clientMixBuffer, node);
nodeData->incrementOutgoingMixedAudioSequenceNumber(); nodeData->incrementOutgoingMixedAudioSequenceNumber();
// send an audio stream stats packet if it's time // send an audio stream stats packet if it's time
@ -852,22 +863,22 @@ void AudioMixer::run() {
} }
}); });
++_numStatFrames; ++_numStatFrames;
QCoreApplication::processEvents(); QCoreApplication::processEvents();
if (_isFinished) { if (_isFinished) {
break; return;
} }
usecToSleep = (++nextFrame * AudioConstants::NETWORK_FRAME_USECS) - timer.nsecsElapsed() / 1000; // ns to us _usecToSleep = (++_nextFrame * AudioConstants::NETWORK_FRAME_USECS) - _timer.nsecsElapsed() / 1000; // ns to us
if (usecToSleep > 0) { if (_usecToSleep > 0) {
usleep(usecToSleep); usleep(_usecToSleep);
}
} }
} }
void AudioMixer::perSecondActions() { void AudioMixer::perSecondActions() {
_sendAudioStreamStats = true; _sendAudioStreamStats = true;

View file

@ -32,6 +32,7 @@ public:
public slots: public slots:
/// threaded run of assignment /// threaded run of assignment
void run(); void run();
void insideLoop();
void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle void readPendingDatagrams() { }; // this will not be called since our datagram processing thread will handle
void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr); void readPendingDatagram(const QByteArray& receivedPacket, const HifiSockAddr& senderSockAddr);
@ -110,6 +111,15 @@ private:
MovingMinMaxAvg<quint64> _timeSpentPerHashMatchCallStats; // update with usecs spent inside each packetVersionAndHashMatch call MovingMinMaxAvg<quint64> _timeSpentPerHashMatchCallStats; // update with usecs spent inside each packetVersionAndHashMatch call
MovingMinMaxAvg<int> _readPendingCallsPerSecondStats; // update with # of readPendingDatagrams calls in the last second MovingMinMaxAvg<int> _readPendingCallsPerSecondStats; // update with # of readPendingDatagrams calls in the last second
// loop variables
QTimer* _idleTimer = nullptr;
int _nextFrame = 0;
QElapsedTimer _timer;
char _clientMixBuffer[MAX_PACKET_SIZE];
int _usecToSleep = AudioConstants::NETWORK_FRAME_USECS;
const int TRAILING_AVERAGE_FRAMES = 100;
int _framesSinceCutoffEvent = TRAILING_AVERAGE_FRAMES;
}; };
#endif // hifi_AudioMixer_h #endif // hifi_AudioMixer_h

View file

@ -221,6 +221,10 @@ protected:
LimitedNodeList(unsigned short socketListenPort = 0, unsigned short dtlsListenPort = 0); LimitedNodeList(unsigned short socketListenPort = 0, unsigned short dtlsListenPort = 0);
LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton LimitedNodeList(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton void operator=(LimitedNodeList const&); // Don't implement, needed to avoid copies of singleton
~LimitedNodeList() {
qDebug() << "XXXXXXXXXXXXXXXXXXXX ~LimitedNodeList called";
}
qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr, qint64 writeDatagram(const QByteArray& datagram, const HifiSockAddr& destinationSockAddr,
const QUuid& connectionSecret); const QUuid& connectionSecret);

View file

@ -30,6 +30,17 @@ void ThreadedAssignment::setFinished(bool isFinished) {
_isFinished = isFinished; _isFinished = isFinished;
if (_isFinished) { if (_isFinished) {
if (_domainServerTimer) {
_domainServerTimer->stop();
delete _domainServerTimer;
_domainServerTimer = nullptr;
}
if (_statsTimer) {
_statsTimer->stop();
delete _statsTimer;
_statsTimer = nullptr;
}
aboutToFinish(); aboutToFinish();
auto nodeList = DependencyManager::get<NodeList>(); auto nodeList = DependencyManager::get<NodeList>();
@ -63,15 +74,15 @@ void ThreadedAssignment::commonInit(const QString& targetName, NodeType_t nodeTy
// this is a temp fix for Qt 5.3 - rebinding the node socket gives us readyRead for the socket on this thread // this is a temp fix for Qt 5.3 - rebinding the node socket gives us readyRead for the socket on this thread
nodeList->rebindNodeSocket(); nodeList->rebindNodeSocket();
QTimer* domainServerTimer = new QTimer(this); _domainServerTimer = new QTimer(this);
connect(domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit())); connect(_domainServerTimer, SIGNAL(timeout()), this, SLOT(checkInWithDomainServerOrExit()));
domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS); _domainServerTimer->start(DOMAIN_SERVER_CHECK_IN_MSECS);
if (shouldSendStats) { if (shouldSendStats) {
// send a stats packet every 1 second // send a stats packet every 1 second
QTimer* statsTimer = new QTimer(this); _statsTimer = new QTimer(this);
connect(statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket); connect(_statsTimer, &QTimer::timeout, this, &ThreadedAssignment::sendStatsPacket);
statsTimer->start(1000); _statsTimer->start(1000);
} }
} }

View file

@ -28,8 +28,16 @@ public:
public slots: public slots:
/// threaded run of assignment /// threaded run of assignment
virtual void run() = 0; virtual void run() = 0;
Q_INVOKABLE void stop() { setFinished(true); }
virtual void readPendingDatagrams() = 0; virtual void readPendingDatagrams() = 0;
virtual void sendStatsPacket(); virtual void sendStatsPacket();
public slots:
virtual void aboutToQuit() {
// emit finished();
QMetaObject::invokeMethod(this, "stop");
}
signals: signals:
void finished(); void finished();
@ -38,6 +46,8 @@ protected:
void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true); void commonInit(const QString& targetName, NodeType_t nodeType, bool shouldSendStats = true);
bool _isFinished; bool _isFinished;
QThread* _datagramProcessingThread; QThread* _datagramProcessingThread;
QTimer* _domainServerTimer = nullptr;
QTimer* _statsTimer = nullptr;
private slots: private slots:
void checkInWithDomainServerOrExit(); void checkInWithDomainServerOrExit();