move buffer popping to packet processing

This commit is contained in:
Stephen Birarda 2018-08-28 16:58:04 -07:00
parent 7d8b15ed75
commit 6750d4a370
5 changed files with 10 additions and 42 deletions

View file

@ -285,10 +285,9 @@ void AudioMixer::sendStatsPacket() {
addTiming(_ticTiming, "tic"); addTiming(_ticTiming, "tic");
addTiming(_sleepTiming, "sleep"); addTiming(_sleepTiming, "sleep");
addTiming(_frameTiming, "frame"); addTiming(_frameTiming, "frame");
addTiming(_prepareTiming, "prepare"); addTiming(_packetsTiming, "packets");
addTiming(_mixTiming, "mix"); addTiming(_mixTiming, "mix");
addTiming(_eventsTiming, "events"); addTiming(_eventsTiming, "events");
addTiming(_packetsTiming, "packets");
#ifdef HIFI_AUDIO_MIXER_DEBUG #ifdef HIFI_AUDIO_MIXER_DEBUG
timingStats["ns_per_mix"] = (_stats.totalMixes > 0) ? (float)(_stats.mixTime / _stats.totalMixes) : 0; timingStats["ns_per_mix"] = (_stats.totalMixes > 0) ? (float)(_stats.mixTime / _stats.totalMixes) : 0;
@ -421,14 +420,6 @@ void AudioMixer::start() {
} }
nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) { nodeList->nestedEach([&](NodeList::const_iterator cbegin, NodeList::const_iterator cend) {
// prepare frames; pop off any new audio from their streams
{
auto prepareTimer = _prepareTiming.timer();
for_each(cbegin, cend, [&](const SharedNodePointer& node) {
_stats.sumStreams += prepareFrame(node, frame);
});
}
// mix across slave threads // mix across slave threads
{ {
auto mixTimer = _mixTiming.timer(); auto mixTimer = _mixTiming.timer();
@ -520,15 +511,6 @@ void AudioMixer::throttle(chrono::microseconds duration, int frame) {
} }
} }
int AudioMixer::prepareFrame(const SharedNodePointer& node, unsigned int frame) {
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data == nullptr) {
return 0;
}
return data->checkBuffersBeforeFrameSend();
}
void AudioMixer::clearDomainSettings() { void AudioMixer::clearDomainSettings() {
_numStaticJitterFrames = DISABLE_STATIC_JITTER_FRAMES; _numStaticJitterFrames = DISABLE_STATIC_JITTER_FRAMES;
_attenuationPerDoublingInDistance = DEFAULT_ATTENUATION_PER_DOUBLING_IN_DISTANCE; _attenuationPerDoublingInDistance = DEFAULT_ATTENUATION_PER_DOUBLING_IN_DISTANCE;

View file

@ -87,10 +87,6 @@ private:
std::chrono::microseconds timeFrame(p_high_resolution_clock::time_point& timestamp); std::chrono::microseconds timeFrame(p_high_resolution_clock::time_point& timestamp);
void throttle(std::chrono::microseconds frameDuration, int frame); void throttle(std::chrono::microseconds frameDuration, int frame);
// pop a frame from any streams on the node
// returns the number of available streams
int prepareFrame(const SharedNodePointer& node, unsigned int frame);
AudioMixerClientData* getOrCreateClientData(Node* node); AudioMixerClientData* getOrCreateClientData(Node* node);
QString percentageForMixStats(int counter); QString percentageForMixStats(int counter);

View file

@ -57,7 +57,7 @@ void AudioMixerClientData::queuePacket(QSharedPointer<ReceivedMessage> message,
_packetQueue.push(message); _packetQueue.push(message);
} }
void AudioMixerClientData::processPackets(ConcurrentAddedStreams& addedStreams) { int AudioMixerClientData::processPackets(ConcurrentAddedStreams& addedStreams) {
SharedNodePointer node = _packetQueue.node; SharedNodePointer node = _packetQueue.node;
assert(_packetQueue.empty() || node); assert(_packetQueue.empty() || node);
_packetQueue.node.clear(); _packetQueue.node.clear();
@ -105,6 +105,10 @@ void AudioMixerClientData::processPackets(ConcurrentAddedStreams& addedStreams)
_packetQueue.pop(); _packetQueue.pop();
} }
assert(_packetQueue.empty()); assert(_packetQueue.empty());
// now that we have processed all packets for this frame
// we can prepare the sources from this client to be ready for mixing
return checkBuffersBeforeFrameSend();
} }
bool isReplicatedPacket(PacketType packetType) { bool isReplicatedPacket(PacketType packetType) {
@ -292,8 +296,6 @@ void AudioMixerClientData::parseRadiusIgnoreRequest(QSharedPointer<ReceivedMessa
} }
AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() { AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() {
QReadLocker readLocker { &_streamsLock };
auto it = std::find_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){ auto it = std::find_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){
return stream->getStreamIdentifier().isNull(); return stream->getStreamIdentifier().isNull();
}); });
@ -307,8 +309,6 @@ AvatarAudioStream* AudioMixerClientData::getAvatarAudioStream() {
} }
void AudioMixerClientData::removeAgentAvatarAudioStream() { void AudioMixerClientData::removeAgentAvatarAudioStream() {
QWriteLocker writeLocker { &_streamsLock };
auto it = std::remove_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){ auto it = std::remove_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){
return stream->getStreamIdentifier().isNull(); return stream->getStreamIdentifier().isNull();
}); });
@ -394,8 +394,6 @@ void AudioMixerClientData::processStreamPacket(ReceivedMessage& message, Concurr
|| packetType == PacketType::MicrophoneAudioNoEcho || packetType == PacketType::MicrophoneAudioNoEcho
|| packetType == PacketType::SilentAudioFrame) { || packetType == PacketType::SilentAudioFrame) {
QWriteLocker writeLocker { &_streamsLock };
auto micStreamIt = std::find_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){ auto micStreamIt = std::find_if(_audioStreams.begin(), _audioStreams.end(), [](const SharedStreamPointer& stream){
return stream->getStreamIdentifier().isNull(); return stream->getStreamIdentifier().isNull();
}); });
@ -442,8 +440,6 @@ void AudioMixerClientData::processStreamPacket(ReceivedMessage& message, Concurr
} else { } else {
matchingStream = *micStreamIt; matchingStream = *micStreamIt;
} }
writeLocker.unlock();
} else if (packetType == PacketType::InjectAudio) { } else if (packetType == PacketType::InjectAudio) {
// this is injected audio // this is injected audio
@ -453,8 +449,6 @@ void AudioMixerClientData::processStreamPacket(ReceivedMessage& message, Concurr
QUuid streamIdentifier = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID)); QUuid streamIdentifier = QUuid::fromRfc4122(message.readWithoutCopy(NUM_BYTES_RFC4122_UUID));
QWriteLocker writeLock { &_streamsLock };
auto streamIt = std::find_if(_audioStreams.begin(), _audioStreams.end(), [&streamIdentifier](const SharedStreamPointer& stream) { auto streamIt = std::find_if(_audioStreams.begin(), _audioStreams.end(), [&streamIdentifier](const SharedStreamPointer& stream) {
return stream->getStreamIdentifier() == streamIdentifier; return stream->getStreamIdentifier() == streamIdentifier;
}); });
@ -478,8 +472,6 @@ void AudioMixerClientData::processStreamPacket(ReceivedMessage& message, Concurr
} else { } else {
matchingStream = *streamIt; matchingStream = *streamIt;
} }
writeLock.unlock();
} }
// seek to the beginning of the packet so that the next reader is in the right spot // seek to the beginning of the packet so that the next reader is in the right spot
@ -501,8 +493,6 @@ void AudioMixerClientData::processStreamPacket(ReceivedMessage& message, Concurr
} }
int AudioMixerClientData::checkBuffersBeforeFrameSend() { int AudioMixerClientData::checkBuffersBeforeFrameSend() {
QWriteLocker writeLocker { &_streamsLock };
auto it = _audioStreams.begin(); auto it = _audioStreams.begin();
while (it != _audioStreams.end()) { while (it != _audioStreams.end()) {
SharedStreamPointer stream = *it; SharedStreamPointer stream = *it;

View file

@ -50,7 +50,7 @@ public:
using AudioStreamVector = std::vector<SharedStreamPointer>; using AudioStreamVector = std::vector<SharedStreamPointer>;
void queuePacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer node); void queuePacket(QSharedPointer<ReceivedMessage> packet, SharedNodePointer node);
void processPackets(ConcurrentAddedStreams& addedStreams); int processPackets(ConcurrentAddedStreams& addedStreams); // returns the number of available streams this frame
AudioStreamVector& getAudioStreams() { return _audioStreams; } AudioStreamVector& getAudioStreams() { return _audioStreams; }
AvatarAudioStream* getAvatarAudioStream(); AvatarAudioStream* getAvatarAudioStream();
@ -165,8 +165,7 @@ private:
}; };
PacketQueue _packetQueue; PacketQueue _packetQueue;
QReadWriteLock _streamsLock; AudioStreamVector _audioStreams; // microphone stream from avatar has a null stream ID
AudioStreamVector _audioStreams; // microphone stream from avatar is stored under key of null UUID
void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node); void optionallyReplicatePacket(ReceivedMessage& packet, const Node& node);

View file

@ -55,7 +55,8 @@ inline float computeAzimuth(const AvatarAudioStream& listeningNodeStream, const
void AudioMixerSlave::processPackets(const SharedNodePointer& node) { void AudioMixerSlave::processPackets(const SharedNodePointer& node) {
AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData(); AudioMixerClientData* data = (AudioMixerClientData*)node->getLinkedData();
if (data) { if (data) {
data->processPackets(_sharedData.addedStreams); // process packets and collect the number of streams available for this frame
stats.sumStreams += data->processPackets(_sharedData.addedStreams);
} }
} }