tweak the AudioInjectorManager for injector threading

This commit is contained in:
Stephen Birarda 2015-11-16 18:53:09 -08:00
parent aa77c4894c
commit 80115d38e9
4 changed files with 234 additions and 169 deletions

View file

@ -64,9 +64,10 @@ void AudioInjector::finish() {
}
}
void AudioInjector::injectAudio() {
void AudioInjector::setupInjection() {
if (!_hasStarted) {
_hasStarted = true;
// check if we need to offset the sound by some number of seconds
if (_options.secondOffset > 0.0f) {
@ -81,12 +82,9 @@ void AudioInjector::injectAudio() {
if (_options.localOnly) {
injectLocally();
} else {
qDebug() << "Calling inject to mixer from" << QThread::currentThread();
injectToMixer();
}
} else {
qCDebug(audio) << "AudioInjector::injectAudio called but already started.";
qCDebug(audio) << "AudioInjector::setupInjection called but already started.";
}
}
@ -109,7 +107,10 @@ void AudioInjector::restart() {
qDebug() << "Calling inject audio again to restart an injector";
// call inject audio to start injection over again
injectAudio();
setupInjection();
// emit our restarted signal, this allows the AudioInjectorManager to start considering us again
emit restartedWhileFinished();
}
}
@ -148,140 +149,148 @@ void AudioInjector::injectLocally() {
}
const uchar MAX_INJECTOR_VOLUME = 0xFF;
static const uint64_t NEXT_FRAME_DELTA_ERROR_OR_FINISHED = 0;
static const uint64_t NEXT_FRAME_DELTA_IMMEDIATELY = 1;
void AudioInjector::injectToMixer() {
if (_currentSendOffset < 0 ||
_currentSendOffset >= _audioData.size()) {
_currentSendOffset = 0;
uint64_t AudioInjector::injectNextFrame() {
if (_state == AudioInjector::State::Finished) {
qDebug() << "AudioInjector::injectNextFrame called but AudioInjector has finished and was not restarted. Returning.";
return NEXT_FRAME_DELTA_ERROR_OR_FINISHED;
}
auto nodeList = DependencyManager::get<NodeList>();
// make sure we actually have samples downloaded to inject
if (_audioData.size()) {
auto audioPacket = NLPacket::create(PacketType::InjectAudio);
// setup the packet for injected audio
QDataStream audioPacketStream(audioPacket.get());
// pack some placeholder sequence number for now
audioPacketStream << (quint16) 0;
// pack stream identifier (a generated UUID)
audioPacketStream << QUuid::createUuid();
// pack the stereo/mono type of the stream
audioPacketStream << _options.stereo;
// pack the flag for loopback
uchar loopbackFlag = (uchar) true;
audioPacketStream << loopbackFlag;
// pack the position for injected audio
int positionOptionOffset = audioPacket->pos();
audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.position),
sizeof(_options.position));
// pack our orientation for injected audio
audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.orientation),
sizeof(_options.orientation));
// pack zero for radius
float radius = 0;
audioPacketStream << radius;
// pack 255 for attenuation byte
int volumeOptionOffset = audioPacket->pos();
quint8 volume = MAX_INJECTOR_VOLUME * _options.volume;
audioPacketStream << volume;
audioPacketStream << _options.ignorePenumbra;
int audioDataOffset = audioPacket->pos();
QElapsedTimer timer;
timer.start();
int nextFrame = 0;
bool shouldLoop = _options.loop;
// loop to send off our audio in NETWORK_BUFFER_LENGTH_SAMPLES_PER_CHANNEL byte chunks
quint16 outgoingInjectedAudioSequenceNumber = 0;
while (_currentSendOffset < _audioData.size() && !_shouldStop) {
int bytesToCopy = std::min((_options.stereo ? 2 : 1) * AudioConstants::NETWORK_FRAME_BYTES_PER_CHANNEL,
_audioData.size() - _currentSendOffset);
// Measure the loudness of this frame
_loudness = 0.0f;
for (int i = 0; i < bytesToCopy; i += sizeof(int16_t)) {
_loudness += abs(*reinterpret_cast<int16_t*>(_audioData.data() + _currentSendOffset + i)) /
(AudioConstants::MAX_SAMPLE_VALUE / 2.0f);
}
_loudness /= (float)(bytesToCopy / sizeof(int16_t));
// if we haven't setup the packet to send then do so now
static int positionOptionOffset = -1;
static int volumeOptionOffset = -1;
static int audioDataOffset = -1;
static quint16 outgoingInjectedAudioSequenceNumber = 0;
static int nextFrame = 0;
static QElapsedTimer frameTimer;
if (!_currentPacket) {
if (_currentSendOffset < 0 ||
_currentSendOffset >= _audioData.size()) {
_currentSendOffset = 0;
}
// make sure we actually have samples downloaded to inject
if (_audioData.size()) {
audioPacket->seek(0);
nextFrame = 0;
frameTimer.restart();
// pack the sequence number
audioPacket->writePrimitive(outgoingInjectedAudioSequenceNumber);
_currentPacket = NLPacket::create(PacketType::InjectAudio);
audioPacket->seek(positionOptionOffset);
audioPacket->writePrimitive(_options.position);
audioPacket->writePrimitive(_options.orientation);
volume = MAX_INJECTOR_VOLUME * _options.volume;
audioPacket->seek(volumeOptionOffset);
audioPacket->writePrimitive(volume);
audioPacket->seek(audioDataOffset);
// copy the next NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL bytes to the packet
audioPacket->write(_audioData.data() + _currentSendOffset, bytesToCopy);
// set the correct size used for this packet
audioPacket->setPayloadSize(audioPacket->pos());
// grab our audio mixer from the NodeList, if it exists
SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer);
// setup the packet for injected audio
QDataStream audioPacketStream(_currentPacket.get());
if (audioMixer) {
// send off this audio packet
nodeList->sendUnreliablePacket(*audioPacket, *audioMixer);
outgoingInjectedAudioSequenceNumber++;
}
_currentSendOffset += bytesToCopy;
// send two packets before the first sleep so the mixer can start playback right away
if (_currentSendOffset != bytesToCopy && _currentSendOffset < _audioData.size()) {
if (_shouldStop) {
break;
}
// not the first packet and not done
// sleep for the appropriate time
int usecToSleep = (++nextFrame * AudioConstants::NETWORK_FRAME_USECS) - timer.nsecsElapsed() / 1000;
qDebug() << "AudioInjector" << this << "will sleep on thread" << QThread::currentThread() << "for" << usecToSleep;
if (usecToSleep > 0) {
usleep(usecToSleep);
}
}
if (shouldLoop && _currentSendOffset >= _audioData.size()) {
_currentSendOffset = 0;
}
// pack some placeholder sequence number for now
audioPacketStream << (quint16) 0;
// pack stream identifier (a generated UUID)
audioPacketStream << QUuid::createUuid();
// pack the stereo/mono type of the stream
audioPacketStream << _options.stereo;
// pack the flag for loopback
uchar loopbackFlag = (uchar) true;
audioPacketStream << loopbackFlag;
// pack the position for injected audio
positionOptionOffset = _currentPacket->pos();
audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.position),
sizeof(_options.position));
// pack our orientation for injected audio
audioPacketStream.writeRawData(reinterpret_cast<const char*>(&_options.orientation),
sizeof(_options.orientation));
// pack zero for radius
float radius = 0;
audioPacketStream << radius;
// pack 255 for attenuation byte
volumeOptionOffset = _currentPacket->pos();
quint8 volume = MAX_INJECTOR_VOLUME;
audioPacketStream << volume;
audioPacketStream << _options.ignorePenumbra;
audioDataOffset = _currentPacket->pos();
} else {
// no samples to inject, return immediately
qDebug() << "AudioInjector::injectNextFrame() called with no samples to inject. Returning.";
return NEXT_FRAME_DELTA_ERROR_OR_FINISHED;
}
}
finish();
int bytesToCopy = std::min((_options.stereo ? 2 : 1) * AudioConstants::NETWORK_FRAME_BYTES_PER_CHANNEL,
_audioData.size() - _currentSendOffset);
// Measure the loudness of this frame
_loudness = 0.0f;
for (int i = 0; i < bytesToCopy; i += sizeof(int16_t)) {
_loudness += abs(*reinterpret_cast<int16_t*>(_audioData.data() + _currentSendOffset + i)) /
(AudioConstants::MAX_SAMPLE_VALUE / 2.0f);
}
_loudness /= (float)(bytesToCopy / sizeof(int16_t));
_currentPacket->seek(0);
// pack the sequence number
_currentPacket->writePrimitive(outgoingInjectedAudioSequenceNumber);
_currentPacket->seek(positionOptionOffset);
_currentPacket->writePrimitive(_options.position);
_currentPacket->writePrimitive(_options.orientation);
quint8 volume = MAX_INJECTOR_VOLUME * _options.volume;
_currentPacket->seek(volumeOptionOffset);
_currentPacket->writePrimitive(volume);
_currentPacket->seek(audioDataOffset);
// copy the next NETWORK_BUFFER_LENGTH_BYTES_PER_CHANNEL bytes to the packet
_currentPacket->write(_audioData.data() + _currentSendOffset, bytesToCopy);
// set the correct size used for this packet
_currentPacket->setPayloadSize(_currentPacket->pos());
// grab our audio mixer from the NodeList, if it exists
auto nodeList = DependencyManager::get<NodeList>();
SharedNodePointer audioMixer = nodeList->soloNodeOfType(NodeType::AudioMixer);
if (audioMixer) {
// send off this audio packet
nodeList->sendUnreliablePacket(*_currentPacket, *audioMixer);
outgoingInjectedAudioSequenceNumber++;
}
_currentSendOffset += bytesToCopy;
if (_currentSendOffset >= _audioData.size()) {
// we're at the end of the audio data to send
if (_options.loop) {
// we were asked to loop, set our send offset to 0
_currentSendOffset = 0;
} else {
// we weren't to loop, say that we're done now
qDebug() << "AudioInjector::injectNextFrame has sent all data and was not asked to loop - calling finish().";
finish();
return NEXT_FRAME_DELTA_ERROR_OR_FINISHED;
}
}
if (_currentSendOffset == bytesToCopy) {
// ask AudioInjectorManager to call us right away again to
// immediately send the first two frames so the mixer can start using the audio right away
return NEXT_FRAME_DELTA_IMMEDIATELY;
} else {
return (++nextFrame * AudioConstants::NETWORK_FRAME_USECS) - frameTimer.nsecsElapsed() / 1000;
}
}
void AudioInjector::stop() {
@ -360,11 +369,11 @@ AudioInjector* AudioInjector::playSound(const QByteArray& buffer, const AudioInj
// grab the AudioInjectorManager
auto injectorManager = DependencyManager::get<AudioInjectorManager>();
// setup parameters required for injection
injector->setupInjection();
// attempt to thread the new injector
if (injectorManager->threadInjector(injector)) {
// call inject audio on the correct thread
QMetaObject::invokeMethod(injector, "injectAudio", Qt::QueuedConnection);
return injector;
} else {
// we failed to thread the new injector (we are at the max number of injector threads)

View file

@ -26,6 +26,8 @@
#include "Sound.h"
class AbstractAudioInterface;
class AudioInjectorManager;
class NLPacket;
// In order to make scripting cleaner for the AudioInjector, the script now holds on to the AudioInjector object
// until it dies.
@ -59,7 +61,6 @@ public:
static AudioInjector* playSound(const QString& soundUrl, const float volume, const float stretchFactor, const glm::vec3 position);
public slots:
void injectAudio();
void restart();
void stop();
@ -74,9 +75,11 @@ public slots:
signals:
void finished();
void restartedWhileFinished();
private:
void injectToMixer();
void setupInjection();
uint64_t injectNextFrame();
void injectLocally();
void finish();
@ -88,8 +91,11 @@ private:
bool _shouldStop = false;
float _loudness = 0.0f;
int _currentSendOffset = 0;
AbstractAudioInterface* _localAudioInterface = NULL;
AudioInjectorLocalBuffer* _localBuffer = NULL;
std::unique_ptr<NLPacket> _currentPacket { nullptr };
AbstractAudioInterface* _localAudioInterface { nullptr };
AudioInjectorLocalBuffer* _localBuffer { nullptr };
friend class AudioInjectorManager;
};

View file

@ -13,17 +13,28 @@
#include <QtCore/QCoreApplication>
#include <SharedUtil.h>
#include "AudioConstants.h"
#include "AudioInjector.h"
AudioInjectorManager::~AudioInjectorManager() {
_shouldStop = true;
std::unique_lock<std::mutex> lock(_injectorsMutex);
// make sure any still living injectors are stopped and deleted
for (auto injector : _injectors) {
injector->stopAndDeleteLater();
while (!_injectors.empty()) {
// grab the injector at the front
auto& timePointerPair = _injectors.front();
// ask it to stop and be deleted
timePointerPair.second->stopAndDeleteLater();
_injectors.pop();
}
// quit and wait on the injector thread, if we ever created it
// quit and wait on the manager thread, if we ever created it
if (_thread) {
_thread->quit();
_thread->wait();
@ -43,7 +54,46 @@ void AudioInjectorManager::createThread() {
void AudioInjectorManager::run() {
while (!_shouldStop) {
// process events in case we have been told to stop or our injectors have been told to stop
// wait until the next injector is ready, or until we get a new injector given to us
std::unique_lock<std::mutex> lock(_injectorsMutex);
if (_injectors.size() > 0) {
// when does the next injector need to send a frame?
// do we get to wait or should we just go for it now?
auto timeInjectorPair = _injectors.front();
auto nextTimestamp = timeInjectorPair.first;
int64_t difference = int64_t(nextTimestamp - usecTimestampNow());
if (difference > 0) {
_injectorReady.wait_for(lock, std::chrono::microseconds(difference));
}
// loop through the injectors in the map and send whatever frames need to go out
auto front = _injectors.front();
while (_injectors.size() > 0 && front.first <= usecTimestampNow()) {
// either way we're popping this injector off - get a copy first
auto injector = front.second;
_injectors.pop();
if (!injector.isNull()) {
// this is an injector that's ready to go, have it send a frame now
auto nextCallDelta = injector->injectNextFrame();
if (nextCallDelta > 0 && !injector->isFinished()) {
// re-enqueue the injector with the correct timing
_injectors.push({ usecTimestampNow() + nextCallDelta, injector });
}
}
front = _injectors.front();
}
} else {
// we have no current injectors, wait until we get at least one before we do anything
_injectorReady.wait(lock);
}
QCoreApplication::processEvents();
}
}
@ -54,23 +104,21 @@ bool AudioInjectorManager::threadInjector(AudioInjector* injector) {
// guard the injectors vector with a mutex
std::unique_lock<std::mutex> lock(_injectorsMutex);
// check if we'll be able to thread this injector (do we have < MAX concurrent injectors)
// check if we'll be able to thread this injector (do we have < max concurrent injectors)
if (_injectors.size() < MAX_INJECTORS_PER_THREAD) {
if (!_thread) {
createThread();
}
auto it = nextInjectorIterator();
qDebug() << "Inserting injector at" << it - _injectors.begin();
// store a QPointer to this injector
_injectors.insert(it, QPointer<AudioInjector>(injector));
qDebug() << "Moving injector to thread" << _thread;
// move the injector to the QThread
injector->moveToThread(_thread);
// handle a restart once the injector has finished
connect(injector, &AudioInjector::restartedWhileFinished, this, &AudioInjectorManager::restartFinishedInjector);
// store a QPointer to this injector
addInjectorToQueue(injector);
return true;
} else {
// unable to thread this injector, at the max
@ -80,17 +128,15 @@ bool AudioInjectorManager::threadInjector(AudioInjector* injector) {
}
}
AudioInjectorVector::iterator AudioInjectorManager::nextInjectorIterator() {
// find the next usable iterator for an injector
auto it = _injectors.begin();
while (it != _injectors.end()) {
if (it->isNull()) {
return it;
} else {
++it;
}
}
return it;
void AudioInjectorManager::restartFinishedInjector() {
auto injector = qobject_cast<AudioInjector*>(sender());
addInjectorToQueue(injector);
}
void AudioInjectorManager::addInjectorToQueue(AudioInjector* injector) {
// add the injector to the queue with a send timestamp of now
_injectors.emplace(usecTimestampNow(), InjectorQPointer { injector });
// notify our wait condition so we can inject two frames for this injector immediately
_injectorReady.notify_one();
}

View file

@ -14,7 +14,7 @@
#ifndef hifi_AudioInjectorManager_h
#define hifi_AudioInjectorManager_h
#include <list>
#include <queue>
#include <mutex>
#include <QtCore/QPointer>
@ -23,7 +23,9 @@
#include <DependencyManager.h>
class AudioInjector;
using AudioInjectorVector = std::vector<QPointer<AudioInjector>>;
using InjectorQPointer = QPointer<AudioInjector>;
using TimeInjectorPointerPair = std::pair<uint64_t, InjectorQPointer>;
using InjectorQueue = std::queue<TimeInjectorPointerPair>;
class AudioInjectorManager : public QObject, public Dependency {
Q_OBJECT
@ -34,18 +36,20 @@ private slots:
void run();
private:
bool threadInjector(AudioInjector* injector);
void restartFinishedInjector();
void addInjectorToQueue(AudioInjector* injector);
AudioInjectorManager() {};
AudioInjectorManager(const AudioInjectorManager&) = delete;
AudioInjectorManager& operator=(const AudioInjectorManager&) = delete;
void createThread();
AudioInjectorVector::iterator nextInjectorIterator();
QThread* _thread { nullptr };
bool _shouldStop { false };
AudioInjectorVector _injectors;
InjectorQueue _injectors;
std::mutex _injectorsMutex;
std::condition_variable _injectorReady;
friend class AudioInjector;
};