some concurrency repairs for AudioMixerClientData streams

This commit is contained in:
Stephen Birarda 2016-02-09 10:29:02 -08:00
parent 73690fe4cb
commit 1ce1a73f92
3 changed files with 49 additions and 29 deletions

View file

@ -298,12 +298,10 @@ bool AudioMixer::prepareMixForListeningNode(Node* node) {
AudioMixerClientData* otherNodeClientData = (AudioMixerClientData*) otherNode->getLinkedData();
// enumerate the ARBs attached to the otherNode and add all that should be added to mix
const QHash<QUuid, PositionalAudioStream*>& otherNodeAudioStreams = otherNodeClientData->getAudioStreams();
QHash<QUuid, PositionalAudioStream*>::ConstIterator i;
for (i = otherNodeAudioStreams.constBegin(); i != otherNodeAudioStreams.constEnd(); i++) {
PositionalAudioStream* otherNodeStream = i.value();
QUuid streamUUID = i.key();
auto streamsCopy = otherNodeClientData->getAudioStreams();
for (auto& streamPair : streamsCopy) {
auto otherNodeStream = streamPair.second;
auto streamUUID = streamPair.first;
if (otherNodeStream->getType() == PositionalAudioStream::Microphone) {
streamUUID = otherNode->getUUID();

View file

@ -22,13 +22,15 @@
AudioMixerClientData::AudioMixerClientData() :
_audioStreams(),
_outgoingMixedAudioSequenceNumber(0),
_downstreamAudioStreamStats()
{
}
AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() const {
AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() {
QReadLocker readLocker { &_streamsLock };
auto it = _audioStreams.find(QUuid());
if (it != _audioStreams.end()) {
return dynamic_cast<AvatarAudioStream*>(it->second.get());
@ -52,7 +54,7 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
return message.getPosition();
} else {
PositionalAudioStream* matchingStream = NULL;
SharedStreamPointer matchingStream;
bool isMicStream = false;
@ -60,6 +62,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
|| packetType == PacketType::MicrophoneAudioNoEcho
|| packetType == PacketType::SilentAudioFrame) {
QWriteLocker writeLocker { &_streamsLock };
auto micStreamIt = _audioStreams.find(QUuid());
if (micStreamIt == _audioStreams.end()) {
// we don't have a mic stream yet, so add it
@ -80,7 +84,9 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
micStreamIt = emplaced.first;
}
matchingStream = micStreamIt->second.get();
matchingStream = micStreamIt->second;
writeLocker.unlock();
isMicStream = true;
} else if (packetType == PacketType::InjectAudio) {
@ -93,6 +99,8 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
bool isStereo;
message.readPrimitive(&isStereo);
QWriteLocker writeLock { &_streamsLock };
auto streamIt = _audioStreams.find(streamIdentifier);
if (streamIt == _audioStreams.end()) {
@ -105,7 +113,9 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
streamIt = emplaced.first;
}
matchingStream = streamIt->second.get();
matchingStream = streamIt->second;
writeLock.unlock();
}
// seek to the beginning of the packet so that the next reader is in the right spot
@ -126,9 +136,11 @@ int AudioMixerClientData::parseData(ReceivedMessage& message) {
}
void AudioMixerClientData::checkBuffersBeforeFrameSend() {
QReadLocker readLocker { &_streamsLock };
auto it = _audioStreams.cbegin();
while (it != _audioStreams.cend()) {
PositionalAudioStream* stream = it->second.get();
SharedStreamPointer stream = it->second;
if (stream->popFrames(1, true) > 0) {
stream->updateLastPopOutputLoudnessAndTrailingLoudness();
@ -146,6 +158,8 @@ void AudioMixerClientData::removeDeadInjectedStreams() {
// never even reaches its desired size, which means it will never start.
const int INJECTOR_CONSECUTIVE_NOT_MIXED_THRESHOLD = 1000;
QWriteLocker writeLocker { &_streamsLock };
auto it = _audioStreams.begin();
while (it != _audioStreams.end()) {
PositionalAudioStream* audioStream = it->second.get();
@ -180,9 +194,11 @@ void AudioMixerClientData::sendAudioStreamStatsPackets(const SharedNodePointer&
// it receives a packet with an appendFlag of 0. This prevents the buildup of dead audio stream stats in the client.
quint8 appendFlag = 0;
auto streamsCopy = getAudioStreams();
// pack and send stream stats packets until all audio streams' stats are sent
int numStreamStatsRemaining = _audioStreams.size();
auto it = _audioStreams.cbegin();
int numStreamStatsRemaining = streamsCopy.size();
auto it = streamsCopy.cbegin();
while (numStreamStatsRemaining > 0) {
auto statsPacket = NLPacket::create(PacketType::AudioStreamStats);
@ -216,7 +232,7 @@ void AudioMixerClientData::sendAudioStreamStatsPackets(const SharedNodePointer&
}
}
QJsonObject AudioMixerClientData::getAudioStreamStats() const {
QJsonObject AudioMixerClientData::getAudioStreamStats() {
QJsonObject result;
QJsonObject downstreamStats;
@ -266,15 +282,15 @@ QJsonObject AudioMixerClientData::getAudioStreamStats() const {
result["upstream"] = "mic unknown";
}
QHash<QUuid, PositionalAudioStream*>::ConstIterator i;
QJsonArray injectorArray;
for(auto& injectorPair : _audioStreams) {
auto streamsCopy = getAudioStreams();
for (auto& injectorPair : streamsCopy) {
if (injectorPair.second->getType() == PositionalAudioStream::Injector) {
QJsonObject upstreamStats;
AudioStreamStats streamStats = i.value()->getAudioStreamStats();
AudioStreamStats streamStats = injectorPair.second->getAudioStreamStats();
upstreamStats["inj.desired"] = streamStats._desiredJitterBufferFrames;
upstreamStats["desired_calc"] = i.value()->getCalculatedJitterBufferFrames();
upstreamStats["desired_calc"] = injectorPair.second->getCalculatedJitterBufferFrames();
upstreamStats["available_avg_10s"] = streamStats._framesAvailableAverage;
upstreamStats["available"] = (double) streamStats._framesAvailable;
upstreamStats["starves"] = (double) streamStats._starveCount;
@ -299,10 +315,12 @@ QJsonObject AudioMixerClientData::getAudioStreamStats() const {
return result;
}
void AudioMixerClientData::printUpstreamDownstreamStats() const {
void AudioMixerClientData::printUpstreamDownstreamStats() {
auto streamsCopy = getAudioStreams();
// print the upstream (mic stream) stats if the mic stream exists
auto it = _audioStreams.find(QUuid());
if (it != _audioStreams.end()) {
auto it = streamsCopy.find(QUuid());
if (it != streamsCopy.end()) {
printf("Upstream:\n");
printAudioStreamStats(it->second->getAudioStreamStats());
}

View file

@ -21,13 +21,16 @@
#include "AvatarAudioStream.h"
class AudioMixerClientData : public NodeData {
Q_OBJECT
public:
AudioMixerClientData();
using AudioStreamMap = std::unordered_map<QUuid, std::unique_ptr<PositionalAudioStream>>;
const AudioStreamMap& getAudioStreams() const { return _audioStreams; }
AvatarAudioStream* getAvatarAudioStream() const;
using SharedStreamPointer = std::shared_ptr<PositionalAudioStream>;
using AudioStreamMap = std::unordered_map<QUuid, SharedStreamPointer>;
// locks the mutex to make a copy
AudioStreamMap getAudioStreams() { QReadLocker readLock { &_streamsLock }; return _audioStreams; }
AvatarAudioStream* getAvatarAudioStream();
int parseData(ReceivedMessage& message);
@ -35,14 +38,14 @@ public:
void removeDeadInjectedStreams();
QJsonObject getAudioStreamStats() const;
QJsonObject getAudioStreamStats();
void sendAudioStreamStatsPackets(const SharedNodePointer& destinationNode);
void incrementOutgoingMixedAudioSequenceNumber() { _outgoingMixedAudioSequenceNumber++; }
quint16 getOutgoingSequenceNumber() const { return _outgoingMixedAudioSequenceNumber; }
void printUpstreamDownstreamStats() const;
void printUpstreamDownstreamStats();
signals:
void injectorStreamFinished(const QUuid& streamIdentifier);
@ -51,7 +54,8 @@ private:
void printAudioStreamStats(const AudioStreamStats& streamStats) const;
private:
AudioStreamMap _audioStreams; // mic stream stored under key of null UUID
QReadWriteLock _streamsLock;
AudioStreamMap _audioStreams; // microphone stream from avatar is stored under key of null UUID
quint16 _outgoingMixedAudioSequenceNumber;